For more information regarding the security incident at F5, the actions we are taking to address it, and our ongoing efforts to protect our customers, click here.

Logfile to ASM Policy Entities

Problem this snippet solves:

Script that ingests a log file (CLF, W3C Extended or URL-per-line text file) and creates URL (and optionally File Type and Parameter) entities in a BIG-IP ASM Security Policy!

How to use this snippet:

./logfile_to_asm_entities.py usage: logfile_to_asm_entities.py [-h] --bigip BIGIP --user USER --logfile LOGFILE --policy POLICY [--protocol {https,http}] [--format {clf,w3ce,urls}] [--addfiletypes] [--addparameters] [--add200] [--add301] [--add302] [--add304] [--noprompt] (--updatepolicy | --report)

Code :

#!/usr/bin/python

# logfile_to_asm_entities.py.py
# Author: Chad Jenison (c.jenison at f5.com)
# Version 1.0
# Version 1.1 - Minor fixes (re: regex for finding status code in w3ce format) and output tweaks (newUrl -> Parsed URL)
# Version 1.2 - Added Support for Parsing Parameters in URL for CLF format and adding them to policy; made File Type and Parameter adding optional via a command line argument
#
# Script that parses web log files and determines URLs that can be added to an F5 BIG-IP ASM Security Policy as Allowed URL entities
# Todo: Determine how URL Parameters appear in W3CE log format; parse them out and support adding to policy

import argparse
import sys
import requests
import json
import getpass
import re

# Taken from http://code.activestate.com/recipes/577058/
def query_yes_no(question, default="no"):
    valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
    if default == None:
        prompt = " [y/n] "
    elif default == "yes":
        prompt = " [Y/n] "
    elif default == "no":
        prompt = " [y/N] "
    else:
        raise ValueError("invalid default answer: '%s'" % default)
    while 1:
        sys.stdout.write(question + prompt)
        choice = raw_input().lower() 
        if default is not None and choice == '':
            return valid[default]
        elif choice in valid.keys():
            return valid[choice]
        else:
            sys.stdout.write("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")

#Setup command line arguments using Python argparse
parser = argparse.ArgumentParser(description='A tool to parse web log files and add them to an ASM security policy')
parser.add_argument('--bigip', '-b', help='IP or hostname of BIG-IP Management or Self IP', required=True)
parser.add_argument('--user', '-u', help='username to use for authentication', required=True)
parser.add_argument('--logfile', '-l', help='log filename in cwd', required=True)
parser.add_argument('--policy', '-p', help='ASM security policy name', required=True)
parser.add_argument('--protocol', default='https', choices=['https', 'http'], help='protocol for url entities (https or http)')
#Input File Type
parser.add_argument('--format', '-f', choices=['clf', 'w3ce', 'urls'], help='Log File Format: Common Log Format, W3C Extended, URL per Line')

#options for adding entities
parser.add_argument('--addfiletypes', '-af', action='store_true')
parser.add_argument('--addparameters', '-ap', action='store_true')

#responses_to_include
statuscodes = parser.add_argument_group(title='Log file status codes to include')
statuscodes.add_argument('--add200', help='add URLs if log shows 200 response status', action='store_true')
statuscodes.add_argument('--add301', help='add URLs if log shows 301 response status', action='store_true')
statuscodes.add_argument('--add302', help='add URLs if log shows 302 response status', action='store_true')
statuscodes.add_argument('--add304', help='add URLs if log shows 302 response status', action='store_true')

#Safety Checks
safety = parser.add_argument_group(title='Options for disabling Safety Checks')
safety.add_argument('--noprompt', '-n', action='store_true', help='do not prompt to confirm removal of each node')

#Mode
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument('--updatepolicy', help='add URLs to BIG-IP ASM security Policy', action='store_true')
mode.add_argument('--report', action='store_true', help='Summarize URLs found in log file that would be added; do not touch BIG-IP')


args = parser.parse_args()
contentTypeJsonHeader = {'Content-Type': 'application/json'}

#adapted from https://devcentral.f5.com/s/articles/demystifying-icontrol-rest-6-token-based-authentication 
def get_auth_token():
    payload = {}
    payload['username'] = args.user
    payload['password'] = passwd
    payload['loginProviderName'] = 'tmos'
    authurl = 'https://%s/mgmt/shared/authn/login' % args.bigip
    token = bip.post(authurl, headers=contentTypeJsonHeader, auth=(args.user, passwd), data=json.dumps(payload)).json()['token']['token']
    return token

def get_asm_policy_id_from_name(name):
    policies = bip.get('%s/asm/policies/' % (url_base)).json()
    for policy in policies['items']:
        if policy['name'] == name:
    id = policy['id']
            print ('Found policy: %s' % (id))
    return id

def add_url_to_policy(url, policyId, protocol):
    urlEntityPayload = json.dumps({'name':url, 'performStaging':'true', 'protocol':protocol})
    addUrl = bip.post('%s/asm/policies/%s/urls' % (url_base, policyId), headers=postHeaders, data = urlEntityPayload)
    if addUrl.status_code == 201:
        print('Successfully Created URL: %s' % (url))
    else:
        print('Unsuccessful attempt to create URL: %s - Status Code: %s' % (url, addUrl.status_code))

def add_filetype_to_policy(filetype, policyId):
    filetypePayload = json.dumps({'name':filetype, 'performStaging':'true'})
    addFiletype = bip.post('%s/asm/policies/%s/filetypes' % (url_base, policyId), headers=postHeaders, data = filetypePayload)
    if addFiletype.status_code == 201:
        print('Successfully Created File Type: %s' % (filetype))
    else:
        print('Unsuccessful attempt to create File Type: %s - Status Code: %s' % (filetype, addFiletype.status_code))

def add_parameter_to_policy(parameter, policyId):
    parameterPayload = json.dumps({'name':parameter, 'performStaging':'true'})
    addParameter = bip.post('%s/asm/policies/%s/parameters' % (url_base, policyId), headers=postHeaders, data = parameterPayload)
    if addParameter.status_code == 201:
        print('Successfully Created Parameter: %s' % (parameter))
    else:
        print('Unsuccessful attempt to create Parameter: %s - Status Code: %s' % (parameter, addParameter.status_code))

url_base = ('https://%s/mgmt/tm' % (args.bigip))
user = args.user
passwd = getpass.getpass("Password for " + user + ":")
bip = requests.session()
bip.verify = False
requests.packages.urllib3.disable_warnings()
authtoken = get_auth_token()
authheader = {'X-F5-Auth-Token': authtoken}
bip.headers.update(authheader)

policyId = get_asm_policy_id_from_name(args.policy)
print ('Policy Name: %s ; Policy ID: %s' % (args.policy, policyId))

existingUrls = set()
policyUrls = bip.get('%s/asm/policies/%s/urls' % (url_base, policyId)).json()
for url in policyUrls['items']:
    existingUrls.add(url['name'])

existingFiletypes = set()
policyFiletypes = bip.get('%s/asm/policies/%s/filetypes' % (url_base, policyId)).json()
for filetype in policyFiletypes['items']:
    existingFiletypes.add(filetype['name'])

existingParameters = set()
policyParameters = bip.get('%s/asm/policies/%s/parameters' % (url_base, policyId)).json()
for parameter in policyParameters['items']:
    existingParameters.add(parameter['name'])

print ('**Processing Log File**')
newUrls = set()
newUrlParameterStrings = set()
with open(args.logfile, "r") as file:
    if args.format == 'urls':
        for line in file:
            newUrl = line.rstrip('\r\n')
            newUrls.add(newUrl)
            print ('Parsed Url: %s' % (newUrl))
    elif args.format == 'clf' or args.format == 'w3ce':
        for line in file:
            if args.format == 'clf':
                newUrlSplit = line.split("\"")[1].split()[1].split("?")
                newUrl = newUrlSplit[0]
                statusCode = line.split("\"")[2].split()[0]
                print ('Parsed Url: %s - statusCode: %s' % (newUrl, statusCode))
                if len(newUrlSplit) != 1:
                    newUrlParameterString = newUrlSplit[1]
                    print ('Parsed Parameters: %s' % (newUrlParameterString))
            elif args.format == 'w3ce': 
newUrl = re.split(' (GET|POST|HEAD|PUT|PATCH) ', line)[2].split()[0]
                statusCode = re.split(' (GET|POST|HEAD|PUT|PATCH) ', line)[2].split()[-3]
                print ('Parsed Url: %s - statusCode: %s' % (newUrl, statusCode))
            if statusCode == '200'and args.add200:
                newUrls.add(newUrl)
                if len(newUrlSplit) != 1:
                    newUrlParameterStrings.add(newUrlSplit[1])
            elif statusCode == '301' and args.add301:
                newUrls.add(newUrl)
                if len(newUrlSplit) != 1:
                    newUrlParameterStrings.add(newUrlSplit[1])
            elif statusCode == '302' and args.add302:
                newUrls.add(newUrl)
                if len(newUrlSplit) != 1:
                    newUrlParameterStrings.add(newUrlSplit[1])
            elif statusCode == '304' and args.add304:
                newUrls.add(newUrl)
                if len(newUrlSplit) != 1:
                    newUrlParameterStrings.add(newUrlSplit[1])
                    print ('added params')
            elif statusCode == '404':
                print ('URL: %s not eligible for adding to policy because status code 404 is ignored' % (newUrl))
            else:
                print ('URL: %s not added - status code %s (possibly not enabled due to missing arguments)' % (newUrl, statusCode))
print ('**Finished Processing Log File**')

newFiletypes = set()
for url in set(newUrls):
    filetype = url.split("/")[-1].split(".")[-1]
    if filetype != '':
        newFiletypes.add(filetype)

newParameters = set()
for urlParameterString in set(newUrlParameterStrings):
    urlParameterPairs = urlParameterString.split("&")
    for urlParameterPair in urlParameterPairs:
newParameters.add(urlParameterPair.split("=")[0])

# combine two Python Dicts (our auth token and the Content-type json header) in preparation for doing POSTs
postHeaders = authheader
postHeaders.update(contentTypeJsonHeader)

for url in set(newUrls):
    if url in existingUrls:
        print ('URL: %s already defined in policy' % (url))
    else:
        if args.updatepolicy:
            if args.noprompt:
                add_url_to_policy(url, policyId, args.protocol)
            else:
                queryString = ('Add URL: %s to policy?' % (url))
                if query_yes_no(queryString, default="yes"):
                    add_url_to_policy(url, policyId, args.protocol)
                else:
                    print('Skipping URL: %s' % (url))
        else:
            print('Report Only - New URL: %s' % (url))
                    
if args.addfiletypes:
    for filetype in set(newFiletypes):
        if filetype in existingFiletypes:
            print ('Filetype: %s already defined in policy' % (filetype))
        else:
            if args.updatepolicy:
                if args.noprompt:
                    add_filetype_to_policy(filetype, policyId)
                else:
                    queryString = ('Add Filetype: %s to policy?' % (filetype))
                    if query_yes_no(queryString, default="yes"):
                        add_filetype_to_policy(filetype, policyId)
                    else:
                        print('Skipping Filetype: %s' % (filetype))
            else:
                print('Report Only - New File Type: %s' % (filetype))

if args.addparameters:
    for parameter in set(newParameters):
        if parameter in existingParameters:
            print ('Parameter: %s already defined in policy' % (parameter))
        else:
            if args.updatepolicy:
                if args.noprompt:
                    add_parameter_to_policy(parameter, policyId)
                else:
                    queryString = ('Add Parameter: %s to policy?' % (parameter))
                    if query_yes_no(queryString, default="yes"):
                        add_parameter_to_policy(parameter, policyId)
                    else:
                        print('Skipping Parameter: %s' % (parameter))
            else:
                print('Report Only - New Parameter: %s' % (parameter))

Tested this on version:

13.0
Published Feb 16, 2018
Version 1.0
No CommentsBe the first to comment