Office 365 Data Group builder using F5 Python SDK
Problem this snippet solves:
Python script which pulls O365 URL's/IP's from microsoft published XML formatted list, parses into python dictionaries formatted for F5 data group, and uses F5 Python SDK to create or update data groups on BIG-IP for each MS product. The script will check if the data group already exists, and if so update based on changes. Once the data groups are installed they can used/referenced by any F5 feature/configuration that can reference a data group.
How to use this snippet:
Python version: 3.6
Python Modules Required: * f5-sdk * requests * f5-icontrol-rest * xmltodict
Update script variables for your environment: BIGIP_ADDRESS = ' ' BIGIP_USER = ' ' BIGIP_PASS = ' '
to execute: python3 o365_dg_builder.py
Code :
import requests import xmltodict from collections import OrderedDict from f5.bigip import ManagementRoot from icontrol.exceptions import iControlUnexpectedHTTPError import argparse MS_URL = 'https://support.content.office.net/en-us/static/O365IPAddresses.xml' BIGIP_ADDRESS = '192.168.15.165' BIGIP_USER = 'admin' BIGIP_PASS = 'admin' def load_bigip(f5_host, f5_user, f5_pass): bigip = ManagementRoot(f5_host, f5_user, f5_pass) return bigip def get_o365_data(): o365data = requests.get(MS_URL) records = [] pdata = xmltodict.parse(o365data.content) for product in pdata['products']['product']: prod_name = product['@name'] if isinstance(product['addresslist'],list): for addrlist in product['addresslist']: record = {} if 'address' in addrlist.keys(): record['name'] = "{}_{}_{}".format('MS',prod_name, addrlist['@type']) if '@type' in addrlist.keys(): if addrlist['@type'] == 'URL': record['type'] = 'string' elif (addrlist['@type'] == 'IPv4' or addrlist['@type'] == 'IPv6'): record['type'] = 'ip' # MS XML has duplicate entries, unify the list before adding to record new_addr_l = list(set(addrlist['address'])) record['records'] = new_addr_l records.append(record) if isinstance(product['addresslist'],OrderedDict): if len(product['addresslist']['address']) > 0: record = {} record['name'] = "{}_{}_{}".format('MS', prod_name, product['addresslist']['@type']) if product['addresslist']['@type'] == 'URL': record['type'] = 'string' elif product['addresslist']['@type'] == 'IPv4' or product['addresslist']['@type'] == 'IPv6': record['type'] = 'ip' record['records'] = list(set(product['addresslist']['address'])) records.append(record) return records def compare_dg_entries(record, old_dg_l): # let's get the values to compare, and see what updates we need to make #compare lists addrs_to_rem = [x for x in old_dg_l if x not in record] addrs_to_add = [x for x in record if x not in old_dg_l] return addrs_to_rem, addrs_to_add def create_dg(bigip, record): if not bigip.tm.ltm.data_group.internals.internal.exists(name=record['name']): bigip.tm.ltm.data_group.internals.internal.create(**record) create_status = 'SUCCESS' else: old_dg_entries = bigip.tm.ltm.data_group.internals.internal.load(name=record['name']) # pull into a list old_dg_l = [] for entry in old_dg_entries.records: old_dg_l.append(entry['name']) addrs_to_rem, addrs_to_add = compare_dg_entries(record['records'], old_dg_l) if len(addrs_to_add)==0 and len(addrs_to_rem)==0: create_status = 'SKIPPED' else: if len(addrs_to_rem) > 0: for addr in addrs_to_rem: old_dg_entries.records.remove(next(x for x in old_dg_entries.records if x['name'] == addr)) if len(addrs_to_add) > 0: for addr in addrs_to_add: old_dg_entries.records.append({'name':addr}) old_dg_entries.update() # if you get time create log of updated entries create_status = 'UPDATED' return create_status # ----- try: bigip = load_bigip(BIGIP_ADDRESS, BIGIP_USER, BIGIP_PASS) except iControlUnexpectedHTTPError as e: print(e) exit(1) if bigip: records = get_o365_data() for record in records: try: status = create_dg(bigip, record) print("BIGIP: {}, Data Group: {}, Create Status {}".format(BIGIP_ADDRESS, record['name'], status)) except iControlUnexpectedHTTPError as e: status = 'FAILED' print("BIGIP: {}, Data Group: {}, Create Status {}, Error {}".format(BIGIP_ADDRESS, record['name'], status, e))
Tested this on version:
13.0