Logfile to ASM Policy Entities
Problem this snippet solves:
Script that ingests a log file (CLF, W3C Extended or URL-per-line text file) and creates URL (and optionally File Type and Parameter) entities in a BIG-IP ASM Security Policy!
How to use this snippet:
./logfile_to_asm_entities.py usage: logfile_to_asm_entities.py [-h] --bigip BIGIP --user USER --logfile LOGFILE --policy POLICY [--protocol {https,http}] [--format {clf,w3ce,urls}] [--addfiletypes] [--addparameters] [--add200] [--add301] [--add302] [--add304] [--noprompt] (--updatepolicy | --report)
Code :
#!/usr/bin/python # logfile_to_asm_entities.py.py # Author: Chad Jenison (c.jenison at f5.com) # Version 1.0 # Version 1.1 - Minor fixes (re: regex for finding status code in w3ce format) and output tweaks (newUrl -> Parsed URL) # Version 1.2 - Added Support for Parsing Parameters in URL for CLF format and adding them to policy; made File Type and Parameter adding optional via a command line argument # # Script that parses web log files and determines URLs that can be added to an F5 BIG-IP ASM Security Policy as Allowed URL entities # Todo: Determine how URL Parameters appear in W3CE log format; parse them out and support adding to policy import argparse import sys import requests import json import getpass import re # Taken from http://code.activestate.com/recipes/577058/ def query_yes_no(question, default="no"): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default == None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while 1: sys.stdout.write(question + prompt) choice = raw_input().lower() if default is not None and choice == '': return valid[default] elif choice in valid.keys(): return valid[choice] else: sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n") #Setup command line arguments using Python argparse parser = argparse.ArgumentParser(description='A tool to parse web log files and add them to an ASM security policy') parser.add_argument('--bigip', '-b', help='IP or hostname of BIG-IP Management or Self IP', required=True) parser.add_argument('--user', '-u', help='username to use for authentication', required=True) parser.add_argument('--logfile', '-l', help='log filename in cwd', required=True) parser.add_argument('--policy', '-p', help='ASM security policy name', required=True) parser.add_argument('--protocol', default='https', choices=['https', 'http'], help='protocol for url entities (https or http)') #Input File Type parser.add_argument('--format', '-f', choices=['clf', 'w3ce', 'urls'], help='Log File Format: Common Log Format, W3C Extended, URL per Line') #options for adding entities parser.add_argument('--addfiletypes', '-af', action='store_true') parser.add_argument('--addparameters', '-ap', action='store_true') #responses_to_include statuscodes = parser.add_argument_group(title='Log file status codes to include') statuscodes.add_argument('--add200', help='add URLs if log shows 200 response status', action='store_true') statuscodes.add_argument('--add301', help='add URLs if log shows 301 response status', action='store_true') statuscodes.add_argument('--add302', help='add URLs if log shows 302 response status', action='store_true') statuscodes.add_argument('--add304', help='add URLs if log shows 302 response status', action='store_true') #Safety Checks safety = parser.add_argument_group(title='Options for disabling Safety Checks') safety.add_argument('--noprompt', '-n', action='store_true', help='do not prompt to confirm removal of each node') #Mode mode = parser.add_mutually_exclusive_group(required=True) mode.add_argument('--updatepolicy', help='add URLs to BIG-IP ASM security Policy', action='store_true') mode.add_argument('--report', action='store_true', help='Summarize URLs found in log file that would be added; do not touch BIG-IP') args = parser.parse_args() contentTypeJsonHeader = {'Content-Type': 'application/json'} #adapted from https://devcentral.f5.com/s/articles/demystifying-icontrol-rest-6-token-based-authentication def get_auth_token(): payload = {} payload['username'] = args.user payload['password'] = passwd payload['loginProviderName'] = 'tmos' authurl = 'https://%s/mgmt/shared/authn/login' % args.bigip token = bip.post(authurl, headers=contentTypeJsonHeader, auth=(args.user, passwd), data=json.dumps(payload)).json()['token']['token'] return token def get_asm_policy_id_from_name(name): policies = bip.get('%s/asm/policies/' % (url_base)).json() for policy in policies['items']: if policy['name'] == name: id = policy['id'] print ('Found policy: %s' % (id)) return id def add_url_to_policy(url, policyId, protocol): urlEntityPayload = json.dumps({'name':url, 'performStaging':'true', 'protocol':protocol}) addUrl = bip.post('%s/asm/policies/%s/urls' % (url_base, policyId), headers=postHeaders, data = urlEntityPayload) if addUrl.status_code == 201: print('Successfully Created URL: %s' % (url)) else: print('Unsuccessful attempt to create URL: %s - Status Code: %s' % (url, addUrl.status_code)) def add_filetype_to_policy(filetype, policyId): filetypePayload = json.dumps({'name':filetype, 'performStaging':'true'}) addFiletype = bip.post('%s/asm/policies/%s/filetypes' % (url_base, policyId), headers=postHeaders, data = filetypePayload) if addFiletype.status_code == 201: print('Successfully Created File Type: %s' % (filetype)) else: print('Unsuccessful attempt to create File Type: %s - Status Code: %s' % (filetype, addFiletype.status_code)) def add_parameter_to_policy(parameter, policyId): parameterPayload = json.dumps({'name':parameter, 'performStaging':'true'}) addParameter = bip.post('%s/asm/policies/%s/parameters' % (url_base, policyId), headers=postHeaders, data = parameterPayload) if addParameter.status_code == 201: print('Successfully Created Parameter: %s' % (parameter)) else: print('Unsuccessful attempt to create Parameter: %s - Status Code: %s' % (parameter, addParameter.status_code)) url_base = ('https://%s/mgmt/tm' % (args.bigip)) user = args.user passwd = getpass.getpass("Password for " + user + ":") bip = requests.session() bip.verify = False requests.packages.urllib3.disable_warnings() authtoken = get_auth_token() authheader = {'X-F5-Auth-Token': authtoken} bip.headers.update(authheader) policyId = get_asm_policy_id_from_name(args.policy) print ('Policy Name: %s ; Policy ID: %s' % (args.policy, policyId)) existingUrls = set() policyUrls = bip.get('%s/asm/policies/%s/urls' % (url_base, policyId)).json() for url in policyUrls['items']: existingUrls.add(url['name']) existingFiletypes = set() policyFiletypes = bip.get('%s/asm/policies/%s/filetypes' % (url_base, policyId)).json() for filetype in policyFiletypes['items']: existingFiletypes.add(filetype['name']) existingParameters = set() policyParameters = bip.get('%s/asm/policies/%s/parameters' % (url_base, policyId)).json() for parameter in policyParameters['items']: existingParameters.add(parameter['name']) print ('**Processing Log File**') newUrls = set() newUrlParameterStrings = set() with open(args.logfile, "r") as file: if args.format == 'urls': for line in file: newUrl = line.rstrip('\r\n') newUrls.add(newUrl) print ('Parsed Url: %s' % (newUrl)) elif args.format == 'clf' or args.format == 'w3ce': for line in file: if args.format == 'clf': newUrlSplit = line.split("\"")[1].split()[1].split("?") newUrl = newUrlSplit[0] statusCode = line.split("\"")[2].split()[0] print ('Parsed Url: %s - statusCode: %s' % (newUrl, statusCode)) if len(newUrlSplit) != 1: newUrlParameterString = newUrlSplit[1] print ('Parsed Parameters: %s' % (newUrlParameterString)) elif args.format == 'w3ce': newUrl = re.split(' (GET|POST|HEAD|PUT|PATCH) ', line)[2].split()[0] statusCode = re.split(' (GET|POST|HEAD|PUT|PATCH) ', line)[2].split()[-3] print ('Parsed Url: %s - statusCode: %s' % (newUrl, statusCode)) if statusCode == '200'and args.add200: newUrls.add(newUrl) if len(newUrlSplit) != 1: newUrlParameterStrings.add(newUrlSplit[1]) elif statusCode == '301' and args.add301: newUrls.add(newUrl) if len(newUrlSplit) != 1: newUrlParameterStrings.add(newUrlSplit[1]) elif statusCode == '302' and args.add302: newUrls.add(newUrl) if len(newUrlSplit) != 1: newUrlParameterStrings.add(newUrlSplit[1]) elif statusCode == '304' and args.add304: newUrls.add(newUrl) if len(newUrlSplit) != 1: newUrlParameterStrings.add(newUrlSplit[1]) print ('added params') elif statusCode == '404': print ('URL: %s not eligible for adding to policy because status code 404 is ignored' % (newUrl)) else: print ('URL: %s not added - status code %s (possibly not enabled due to missing arguments)' % (newUrl, statusCode)) print ('**Finished Processing Log File**') newFiletypes = set() for url in set(newUrls): filetype = url.split("/")[-1].split(".")[-1] if filetype != '': newFiletypes.add(filetype) newParameters = set() for urlParameterString in set(newUrlParameterStrings): urlParameterPairs = urlParameterString.split("&") for urlParameterPair in urlParameterPairs: newParameters.add(urlParameterPair.split("=")[0]) # combine two Python Dicts (our auth token and the Content-type json header) in preparation for doing POSTs postHeaders = authheader postHeaders.update(contentTypeJsonHeader) for url in set(newUrls): if url in existingUrls: print ('URL: %s already defined in policy' % (url)) else: if args.updatepolicy: if args.noprompt: add_url_to_policy(url, policyId, args.protocol) else: queryString = ('Add URL: %s to policy?' % (url)) if query_yes_no(queryString, default="yes"): add_url_to_policy(url, policyId, args.protocol) else: print('Skipping URL: %s' % (url)) else: print('Report Only - New URL: %s' % (url)) if args.addfiletypes: for filetype in set(newFiletypes): if filetype in existingFiletypes: print ('Filetype: %s already defined in policy' % (filetype)) else: if args.updatepolicy: if args.noprompt: add_filetype_to_policy(filetype, policyId) else: queryString = ('Add Filetype: %s to policy?' % (filetype)) if query_yes_no(queryString, default="yes"): add_filetype_to_policy(filetype, policyId) else: print('Skipping Filetype: %s' % (filetype)) else: print('Report Only - New File Type: %s' % (filetype)) if args.addparameters: for parameter in set(newParameters): if parameter in existingParameters: print ('Parameter: %s already defined in policy' % (parameter)) else: if args.updatepolicy: if args.noprompt: add_parameter_to_policy(parameter, policyId) else: queryString = ('Add Parameter: %s to policy?' % (parameter)) if query_yes_no(queryString, default="yes"): add_parameter_to_policy(parameter, policyId) else: print('Skipping Parameter: %s' % (parameter)) else: print('Report Only - New Parameter: %s' % (parameter))
Tested this on version:
13.0