Logfile to ASM Policy Entities

Problem this snippet solves:

Script that ingests a log file (CLF, W3C Extended or URL-per-line text file) and creates URL (and optionally File Type and Parameter) entities in a BIG-IP ASM Security Policy!

How to use this snippet:

./logfile_to_asm_entities.py usage: logfile_to_asm_entities.py [-h] --bigip BIGIP --user USER --logfile LOGFILE --policy POLICY [--protocol {https,http}] [--format {clf,w3ce,urls}] [--addfiletypes] [--addparameters] [--add200] [--add301] [--add302] [--add304] [--noprompt] (--updatepolicy | --report)

Code :


# logfile_to_asm_entities.py.py
# Author: Chad Jenison (c.jenison at f5.com)
# Version 1.0
# Version 1.1 - Minor fixes (re: regex for finding status code in w3ce format) and output tweaks (newUrl -> Parsed URL)
# Version 1.2 - Added Support for Parsing Parameters in URL for CLF format and adding them to policy; made File Type and Parameter adding optional via a command line argument
# Script that parses web log files and determines URLs that can be added to an F5 BIG-IP ASM Security Policy as Allowed URL entities
# Todo: Determine how URL Parameters appear in W3CE log format; parse them out and support adding to policy

import argparse
import sys
import requests
import json
import getpass
import re

# Taken from http://code.activestate.com/recipes/577058/
def query_yes_no(question, default="no"):
    valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
    if default == None:
        prompt = " [y/n] "
    elif default == "yes":
        prompt = " [Y/n] "
    elif default == "no":
        prompt = " [y/N] "
        raise ValueError("invalid default answer: '%s'" % default)
    while 1:
        sys.stdout.write(question + prompt)
        choice = raw_input().lower() 
        if default is not None and choice == '':
            return valid[default]
        elif choice in valid.keys():
            return valid[choice]
            sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")

#Setup command line arguments using Python argparse
parser = argparse.ArgumentParser(description='A tool to parse web log files and add them to an ASM security policy')
parser.add_argument('--bigip', '-b', help='IP or hostname of BIG-IP Management or Self IP', required=True)
parser.add_argument('--user', '-u', help='username to use for authentication', required=True)
parser.add_argument('--logfile', '-l', help='log filename in cwd', required=True)
parser.add_argument('--policy', '-p', help='ASM security policy name', required=True)
parser.add_argument('--protocol', default='https', choices=['https', 'http'], help='protocol for url entities (https or http)')
#Input File Type
parser.add_argument('--format', '-f', choices=['clf', 'w3ce', 'urls'], help='Log File Format: Common Log Format, W3C Extended, URL per Line')

#options for adding entities
parser.add_argument('--addfiletypes', '-af', action='store_true')
parser.add_argument('--addparameters', '-ap', action='store_true')

statuscodes = parser.add_argument_group(title='Log file status codes to include')
statuscodes.add_argument('--add200', help='add URLs if log shows 200 response status', action='store_true')
statuscodes.add_argument('--add301', help='add URLs if log shows 301 response status', action='store_true')
statuscodes.add_argument('--add302', help='add URLs if log shows 302 response status', action='store_true')
statuscodes.add_argument('--add304', help='add URLs if log shows 302 response status', action='store_true')

#Safety Checks
safety = parser.add_argument_group(title='Options for disabling Safety Checks')
safety.add_argument('--noprompt', '-n', action='store_true', help='do not prompt to confirm removal of each node')

mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument('--updatepolicy', help='add URLs to BIG-IP ASM security Policy', action='store_true')
mode.add_argument('--report', action='store_true', help='Summarize URLs found in log file that would be added; do not touch BIG-IP')

args = parser.parse_args()
contentTypeJsonHeader = {'Content-Type': 'application/json'}

#adapted from https://devcentral.f5.com/s/articles/demystifying-icontrol-rest-6-token-based-authentication 
def get_auth_token():
    payload = {}
    payload['username'] = args.user
    payload['password'] = passwd
    payload['loginProviderName'] = 'tmos'
    authurl = 'https://%s/mgmt/shared/authn/login' % args.bigip
    token = bip.post(authurl, headers=contentTypeJsonHeader, auth=(args.user, passwd), data=json.dumps(payload)).json()['token']['token']
    return token

def get_asm_policy_id_from_name(name):
    policies = bip.get('%s/asm/policies/' % (url_base)).json()
    for policy in policies['items']:
        if policy['name'] == name:
    id = policy['id']
            print ('Found policy: %s' % (id))
    return id

def add_url_to_policy(url, policyId, protocol):
    urlEntityPayload = json.dumps({'name':url, 'performStaging':'true', 'protocol':protocol})
    addUrl = bip.post('%s/asm/policies/%s/urls' % (url_base, policyId), headers=postHeaders, data = urlEntityPayload)
    if addUrl.status_code == 201:
        print('Successfully Created URL: %s' % (url))
        print('Unsuccessful attempt to create URL: %s - Status Code: %s' % (url, addUrl.status_code))

def add_filetype_to_policy(filetype, policyId):
    filetypePayload = json.dumps({'name':filetype, 'performStaging':'true'})
    addFiletype = bip.post('%s/asm/policies/%s/filetypes' % (url_base, policyId), headers=postHeaders, data = filetypePayload)
    if addFiletype.status_code == 201:
        print('Successfully Created File Type: %s' % (filetype))
        print('Unsuccessful attempt to create File Type: %s - Status Code: %s' % (filetype, addFiletype.status_code))

def add_parameter_to_policy(parameter, policyId):
    parameterPayload = json.dumps({'name':parameter, 'performStaging':'true'})
    addParameter = bip.post('%s/asm/policies/%s/parameters' % (url_base, policyId), headers=postHeaders, data = parameterPayload)
    if addParameter.status_code == 201:
        print('Successfully Created Parameter: %s' % (parameter))
        print('Unsuccessful attempt to create Parameter: %s - Status Code: %s' % (parameter, addParameter.status_code))

url_base = ('https://%s/mgmt/tm' % (args.bigip))
user = args.user
passwd = getpass.getpass("Password for " + user + ":")
bip = requests.session()
bip.verify = False
authtoken = get_auth_token()
authheader = {'X-F5-Auth-Token': authtoken}

policyId = get_asm_policy_id_from_name(args.policy)
print ('Policy Name: %s ; Policy ID: %s' % (args.policy, policyId))

existingUrls = set()
policyUrls = bip.get('%s/asm/policies/%s/urls' % (url_base, policyId)).json()
for url in policyUrls['items']:

existingFiletypes = set()
policyFiletypes = bip.get('%s/asm/policies/%s/filetypes' % (url_base, policyId)).json()
for filetype in policyFiletypes['items']:

existingParameters = set()
policyParameters = bip.get('%s/asm/policies/%s/parameters' % (url_base, policyId)).json()
for parameter in policyParameters['items']:

print ('**Processing Log File**')
newUrls = set()
newUrlParameterStrings = set()
with open(args.logfile, "r") as file:
    if args.format == 'urls':
        for line in file:
            newUrl = line.rstrip('\r\n')
            print ('Parsed Url: %s' % (newUrl))
    elif args.format == 'clf' or args.format == 'w3ce':
        for line in file:
            if args.format == 'clf':
                newUrlSplit = line.split("\"")[1].split()[1].split("?")
                newUrl = newUrlSplit[0]
                statusCode = line.split("\"")[2].split()[0]
                print ('Parsed Url: %s - statusCode: %s' % (newUrl, statusCode))
                if len(newUrlSplit) != 1:
                    newUrlParameterString = newUrlSplit[1]
                    print ('Parsed Parameters: %s' % (newUrlParameterString))
            elif args.format == 'w3ce': 
newUrl = re.split(' (GET|POST|HEAD|PUT|PATCH) ', line)[2].split()[0]
                statusCode = re.split(' (GET|POST|HEAD|PUT|PATCH) ', line)[2].split()[-3]
                print ('Parsed Url: %s - statusCode: %s' % (newUrl, statusCode))
            if statusCode == '200'and args.add200:
                if len(newUrlSplit) != 1:
            elif statusCode == '301' and args.add301:
                if len(newUrlSplit) != 1:
            elif statusCode == '302' and args.add302:
                if len(newUrlSplit) != 1:
            elif statusCode == '304' and args.add304:
                if len(newUrlSplit) != 1:
                    print ('added params')
            elif statusCode == '404':
                print ('URL: %s not eligible for adding to policy because status code 404 is ignored' % (newUrl))
                print ('URL: %s not added - status code %s (possibly not enabled due to missing arguments)' % (newUrl, statusCode))
print ('**Finished Processing Log File**')

newFiletypes = set()
for url in set(newUrls):
    filetype = url.split("/")[-1].split(".")[-1]
    if filetype != '':

newParameters = set()
for urlParameterString in set(newUrlParameterStrings):
    urlParameterPairs = urlParameterString.split("&")
    for urlParameterPair in urlParameterPairs:

# combine two Python Dicts (our auth token and the Content-type json header) in preparation for doing POSTs
postHeaders = authheader

for url in set(newUrls):
    if url in existingUrls:
        print ('URL: %s already defined in policy' % (url))
        if args.updatepolicy:
            if args.noprompt:
                add_url_to_policy(url, policyId, args.protocol)
                queryString = ('Add URL: %s to policy?' % (url))
                if query_yes_no(queryString, default="yes"):
                    add_url_to_policy(url, policyId, args.protocol)
                    print('Skipping URL: %s' % (url))
            print('Report Only - New URL: %s' % (url))
if args.addfiletypes:
    for filetype in set(newFiletypes):
        if filetype in existingFiletypes:
            print ('Filetype: %s already defined in policy' % (filetype))
            if args.updatepolicy:
                if args.noprompt:
                    add_filetype_to_policy(filetype, policyId)
                    queryString = ('Add Filetype: %s to policy?' % (filetype))
                    if query_yes_no(queryString, default="yes"):
                        add_filetype_to_policy(filetype, policyId)
                        print('Skipping Filetype: %s' % (filetype))
                print('Report Only - New File Type: %s' % (filetype))

if args.addparameters:
    for parameter in set(newParameters):
        if parameter in existingParameters:
            print ('Parameter: %s already defined in policy' % (parameter))
            if args.updatepolicy:
                if args.noprompt:
                    add_parameter_to_policy(parameter, policyId)
                    queryString = ('Add Parameter: %s to policy?' % (parameter))
                    if query_yes_no(queryString, default="yes"):
                        add_parameter_to_policy(parameter, policyId)
                        print('Skipping Parameter: %s' % (parameter))
                print('Report Only - New Parameter: %s' % (parameter))

Tested this on version:

Published Feb 16, 2018
Version 1.0

Was this article helpful?

No CommentsBe the first to comment