application delivery
43303 TopicsHow Application Teams Can Add Nonprofit Verification to Secure Donation Workflows
This article explains how application teams can add nonprofit verification into secure donation, fundraising, and fintech workflows. It covers where verification fits in onboarding, API gateway flows, backend services, DevOps checks, fraud prevention, and audit logging, while keeping the process practical, neutral, and useful for technical teams.37Views0likes0CommentsAutomatic Certificate Management with ACMEv2 in F5 BIG-IP
One of the most anticipated features of F5 BIG-IP is integration with ACMEv2. With the General Availability of BIG-IP 21.1.0 on May/26, this feature came into being. In this tutorial, we are going to configure it, using Let's Encrypt as the CA. The domain for which we are generating/renewing certificates is carlosf5lab.lat. The official docs for this feature are located in SSL Certificate Management | BIG-IP Documentation. Pre-requisite 1: DNS Resolver that can reach the internet (at least the CA endpoints). In this case, we are using the native DNS Resolver that comes with BIG-IP. Pre-requisite 2: The internal proxy that will make the connection with the CA. Pre-requisite 3: a self signed SSL certificate that the ACMEv2 protocol uses as the identifier for a device account. You don't have to fill the Subject Alternative Name. For the Common Name, an e-mail contact is advised. Now, we are going to create the ACME Provider object. Give it a name, and select the internal proxy previously created. For the CA Certificate to enable the secure connection with the Directory URL, you can use the default ca-bundle.crt. The Directory URL is the endpoint for the ACMEv2 protocol. In Let's Encrypt case, it is https://acme-v02.api.letsencrypt.org/directory For the Account Key, choose the previously created self-signed certificate. For the trickier part of all, the field "Contacts" is mandatory, and it must be an URL. That’s why you must use the format mailto:email_address. Check the Terms and Conditions, and the Create Account boxes. After a while, the Account Status must read as "Valid". To prove you own the domain whose certificate Let's Encrypt is going to create/renew, it must be pointing to an IP (A Record) where you must have your Virtual Server listening on Port 80 configured to respond to the ACMEv2 Challenge. (In this specific lab, the domain carlosf5lab.lat points to a Public IP mapped to an internal IP). Now you can order your first certificate via ACMEv2 on BIG-IP: After a while, the Key tab should read something like: Which means your certificate was generated: To track the ACME Provider, you can check its statistics: That's it, my friend! If it helped you, give a thumbs up to this post!292Views3likes5CommentsMigrating FCP Licenses from F5 BIG-IQ to MyF5 Portal — A Python Tool for the Real World
If you’ve been managing F5 BIG-IP licenses through F5 BIG-IQ Centralized Management under a Flexible Consumption Program (FCP) contract and you’re now moving to the My F5 portal model, you’ll know the problem: BIG-IQ knows about your devices, the portal knows about your registration keys, and neither system knows about the other. There's no built-in migration path, no API bridge, and no automated way to match a pool license grant to a portal key and push a new license to a BIG-IP without doing it manually, one device at a time. This article describes a Python tool — f5_license_tool.py — that we built to solve exactly this problem. The full source code is included below. It runs on BIG-IQ itself (Python 2.7.5, no external dependencies beyond requests) or on any Linux or macOS workstation with access to the BIG-IP management network. ## The Problem in More Detail BIG-IQ pool licensing works by having BIG-IQ act as the license server. BIG-IP devices check in with BIG-IQ, BIG-IQ holds the pool key and grants licenses from it. When you move to the My F5 portal model, each BIG-IP gets its own individual registration key. To activate that key, BIG-IP has to generate a dossier — an encrypted blob that encodes the platform identity — and submit it to activate.f5.com. The portal validates the dossier against the registration key and returns a signed license file, which then gets written to /config/bigip.license and applied with reloadlic. The challenge at scale is everything around that core process: figuring out which portal key belongs to which device, collecting dossiers from potentially hundreds of BIG-IPs, handling environments where the BIG-IP network has no internet access, pushing licenses back in a controlled way where the customer wants to approve each device, and then recording in the portal which device is consuming which key for asset management the tool handles all of that. ## What the Tool Does f5_license_tool.py talks to BIG-IP devices via iControl REST — the same API that Postman, Ansible, Terraform, and all modern F5 automation uses. It generates dossiers by calling get_dossier on the BIG-IP through the REST bash utility endpoint, activates keys via the same SOAP interface that F5's own Ansible bigip_device_license module uses, and pushes licenses back via the same REST path. Nothing here is undocumented or unsupported at the API level — we're just combining steps that would otherwise require manual work or separate tools. Authentication uses token-based auth first (POST /mgmt/shared/authn/login) with a fallback to HTTP Basic, handles self-signed certificates gracefully for lab environments, and supports shared credentials across a batch of devices with per-device fallback prompting if a credential fails. The tool is fully compatible with Python 2.7.5 as shipped on BIG-IQ 8.x, and also works on Python 3.x. The only dependency is the requests library. ## The Six Modes ### summary The starting point for any migration. Point it at one or more BIG-IQ pool usage JSON exports and it reads every device record, strips out anything with a revoked timestamp or a cancelled/expired status, and prints a count of active devices per SKU. This gives you an accurate inventory of what you actually need to license before you touch anything. Records are excluded at the JSON level if they contain a non-empty revoked field — the BIG-IQ export format uses a timestamp value like "2026-06-01T14:03:04Z" for revoked grants, and any record with that field populated is ignored entirely. This matters because a single BIG-IQ pool can contain both active and revoked grants for the same SKU, and you don't want to waste portal keys on devices that have already had their license removed. ### map This is the interactive matching step. It reads the BIG-IQ JSON export alongside a CSV exported from the My F5 portal (My Products and Plans > Licenses > Export) and walks you through assigning a portal registration key to each device record. For each device it shows you any existing matches it found — keys where the portal's Chargeback field already contains the device's UID, hostname, or IP address — and a list of candidate keys that are eligible for new assignment based on their status. Keys with a Revoked, Cancelled, or Expired status in the portal are never shown as candidates regardless of other settings. You can also search across all keys by product name, capacity, or registration key substring, or enter a key manually if you need to. The output is a mapping CSV with one row per device, containing the IP address, hostname, registration key, SKU, and pre-populated Chargeback fields. Every subsequent mode reads this CSV as its input. ### harvest For environments where the BIG-IP network has no internet access. Connects to each BIG-IP in the mapping CSV via iControl REST, calls get_dossier on-device for the assigned registration key, and saves the result as hostname_ip.dossier in a local folder. No internet access is required at this stage — it's purely BIG-IP to tool. The naming convention matters: files are named after the device (bigip-london.customer.com_10.0.0.1.dossier) rather than the registration key, so that when the dossier folder is handed off for activation and the license files come back, there's an unambiguous mapping between each license file and the BIG-IP it needs to go to. Once harvest is complete, you take the dossiers folder to a machine that can reach activate.f5.com and run preflight. ### preflight The activation step that requires internet access but no BIG-IP connection. Reads the dossier files from the folder produced by harvest, calls activate.f5.com via SOAP for each registration key, handles the EULA acceptance automatically, and saves the returned license text as hostname_ip.license. The SOAP interface is the same one F5's own tooling uses — the Ansible bigip_device_license module, the on-box SOAPLicenseClient binary, and BIG-IQ itself all use this same endpoint. The tool handles the SOAP response structure correctly, including the href/multiRef indirection that F5's RPC-encoded responses use, and maps fault codes to meaningful error messages. Error 51092 (key already activated on a different unit) stops processing for that key immediately and tells you to contact licensingsupport@f5.com. Error 51089 (internal development key) does the same with an explanation. Preflight is idempotent — if a license file already exists for a device it is skipped, so you can re-run after a partial failure without re-activating keys that already succeeded. Use --force to override this. It also produces portal_updates.csv, which contains the pre-formatted customer tag value for each registration key (UID=;HN=hostname;IP=address), ready to paste into the My F5 portal Chargeback field. Until the portal API is available for automated tag updates, this file is what you use to record device identity against each key in the portal. Take the licenses folder back to the customer environment and run batch. ### batch The final delivery step. Reads the mapping CSV, connects to each BIG-IP in sequence, and asks the operator to confirm each device before doing anything — "License this device? [Y/n]". The operator can work through two or three devices now and resume the rest tomorrow, or skip a device entirely if it's not ready. For each confirmed device it checks whether the correct registration key is already active (in which case it skips the device entirely), then either uses a pre-generated license file from the licenses folder or generates the dossier on-device and calls SOAP itself if no pre-generated file exists. The license is written to /config/bigip.license via the REST bash utility with the content base64-encoded to avoid quoting issues, and reloadlic is called to apply it. After every successful license push the remaining device list is written to mapping_remaining.csv. This means if you stop halfway through a batch of 200 devices, you can resume from mapping_remaining.csv tomorrow and only the unlicensed devices are in scope. The list shrinks to zero when the job is done. The batch results CSV records the outcome for every device — success, already licensed, skipped by operator, or failed — along with the active registration key confirmed after reloadlic. ### activate Single-device mode, equivalent to what you'd do manually. Connects to one BIG-IP, collects device identity (hostname, management IP, UUID, platform, TMOS version), prompts for a registration key, generates the dossier, activates it (online SOAP or offline paste depending on your environment), pushes the license, and saves a JSON asset record. This is the mode to use for one-off activations or for testing the tool against a lab BIG-IP before running a full batch. ## The Air-Gap Workflow For customers whose BIG-IP management network has no internet access, the full workflow is: On the BIG-IQ or management workstation (no internet needed): python f5_license_tool.py --mode summary --json pool_export.json python f5_license_tool.py --mode map --json pool_export.json --keys portal_export.csv --out mapping.csv python f5_license_tool.py --mode harvest --csv mapping.csv --dossiers-dir ./dossiers Transfer the dossiers folder to an internet-connected machine. Run preflight there: python f5_license_tool.py --mode preflight --csv mapping.csv --dossiers-dir ./dossiers --licenses-dir ./licenses Transfer the licenses folder back to the customer environment. Run batch: python f5_license_tool.py --mode batch --csv mapping.csv --licenses-dir ./licenses ## The Direct Workflow If the machine running the tool has internet access and can also reach the BIG-IP management interfaces, the whole process collapses to two commands: python f5_license_tool.py --mode map --json pool_export.json --keys portal_export.csv --out mapping.csv python f5_license_tool.py --mode batch --csv mapping.csv ## A Few Practical Notes The interactive mode menu appears when you run the script with no arguments — you pick a number or mode name and it prompts for whatever it needs. At the end of each mode it offers to chain directly into the next one and pre-fills the file paths it already knows about from the current session, so you don't have to retype paths between steps. The BIG-IQ JSON export format uses a revoked field with a timestamp value to indicate revoked grants. Any record with a non-empty revoked field is excluded before any processing happens. This is checked at the JSON loader level so it affects both summary counts and map mode candidates. For batch operations the tool asks for shared credentials once at the start. If a device rejects those credentials it drops into a per-device credential prompt for that device specifically before moving on, so a single misconfigured device doesn't block the rest of the batch. Failed devices are retried with exponential backoff (configurable with --retries, default 3) before being skipped. All file outputs use a hostname_ip naming convention rather than registration key names, because hostnames and IPs are what operators and customers recognise. The mapping CSV keeps all the original columns from both source files plus the outcome columns added by each stage, so you always have a complete audit trail in a single spreadsheet. ## Getting Started Install the dependency: pip install requests urllib3 Copy f5_license_tool.py to the BIG-IQ or your workstation. If running on BIG-IQ, ensure the admin account has bash shell access: tmsh modify auth user admin shell bash tmsh save sys config Run with no arguments to see the mode menu: python f5_license_tool.py The full source code follows below. #!/usr/bin/env python # -*- coding: utf-8 -*- """ f5_license_tool.py v3 ====================== Runs on ANY machine with Python 2.7 or 3.x + requests. Talks to a BIG-IP remotely via iControl REST (HTTPS, port 443). Modes (--mode flag, default: activate) ------- activate Connect to a BIG-IP, generate a dossier, activate at activate.f5.com (online SOAP or offline paste), push the license back, save an asset record JSON. map File-based only — no BIG-IP connection required. Read BIG-IQ pool-usage JSON export(s) and a My-F5 portal CSV export, then interactively assign portal registration keys to each device record. Writes a mapping CSV. summary File-based only. Read BIG-IQ JSON export(s) and print a count of devices per SKU. No CSV / portal file needed. Requirements ------------ pip install requests urllib3 Usage ----- # Activate a BIG-IP license (default mode) python f5_license_tool.py python f5_license_tool.py --host 10.0.1.1 --user admin python f5_license_tool.py --config mylab.json python f5_license_tool.py --offline # manual dossier paste python f5_license_tool.py --no-install # get license, don't push # Map BIG-IQ JSON records to portal CSV keys python f5_license_tool.py --mode map \\ --json bigiq_export.json --keys portal_keys.csv --out mapping.csv # Summarise SKU counts from BIG-IQ JSON python f5_license_tool.py --mode summary --json bigiq_export.json # Environment variables for activate mode F5_HOST=10.0.1.1 F5_USER=admin F5_PASS=secret python f5_license_tool.py """ from __future__ import print_function # Python 2/3 input() compatibility — MUST be before any input() call. # In Python 2, input() evaluates the entered text as Python code (dangerous). # raw_input() reads it as a plain string — which is what we want. # In Python 3 raw_input() doesn't exist; input() already reads plain strings. try: input = raw_input # noqa: F821 (raw_input only exists in Python 2) except NameError: pass # Python 3 — input() is already correct import argparse import csv import getpass import io import json import os import re import sys import textwrap import time import subprocess import xml.etree.ElementTree as ET try: import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except ImportError: print("ERROR: 'requests' library not found. Run: pip install requests urllib3") sys.exit(1) # ── Colour helpers ──────────────────────────────────────────────────────────── def _c(code, text): if sys.stdout.isatty(): return "\033[{}m{}\033[0m".format(code, text) return text bold = lambda t: _c("1", t) green = lambda t: _c("1;32", t) yellow = lambda t: _c("1;33", t) red = lambda t: _c("1;31", t) cyan = lambda t: _c("1;36", t) dim = lambda t: _c("2", t) # ── Constants ───────────────────────────────────────────────────────────────── ACTIVATE_ENDPOINT = ( "https://activate.f5.com/license/services/" "urn:com.f5.license.v5b.ActivationService" ) SOAP_ENVELOPE = """\ <?xml version="1.0" encoding="UTF-8"?> <SOAP-ENV:Envelope xmlns:ns3="http://www.w3.org/2001/XMLSchema" xmlns:ns1="https://activate.f5.com/license/services/urn:com.f5.license.v5b.ActivationService" xmlns:ns2="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/1999/XMLSchema-instance" xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> <ns2:Body> <ns1:getLicense> <dossier xsi:type="ns3:string">{dossier}</dossier> <eula xsi:type="ns3:string">{eula}</eula> <email xsi:type="ns3:string"></email> <firstName xsi:type="ns3:string"></firstName> <lastName xsi:type="ns3:string"></lastName> <companyName xsi:type="ns3:string"></companyName> <phone xsi:type="ns3:string"></phone> <address xsi:type="ns3:string"></address> <city xsi:type="ns3:string"></city> <stateProvince xsi:type="ns3:string"></stateProvince> <postalCode xsi:type="ns3:string"></postalCode> <country xsi:type="ns3:string"></country> </ns1:getLicense> </ns2:Body> </SOAP-ENV:Envelope>""" BANNER = """ ╔══════════════════════════════════════════════════════════════╗ ║ F5 BIG-IP Remote License Activation & Asset Capture ║ ║ Runs from any workstation — talks to BIG-IP via REST ║ ╚══════════════════════════════════════════════════════════════╝ """ # ── REST session factory + token auth ──────────────────────────────────────── def make_session(host, user, password): """ Build a requests.Session and authenticate against the BIG-IP. Strategy (tries each in order, stops at first success): 1. Token auth — POST /mgmt/shared/authn/login → X-F5-Auth-Token header This is the modern, preferred method and works on all TMOS >= 11.6. 2. Basic auth — standard HTTP Basic (Authorization: Basic …) Older TMOS or when token auth is disabled. The session stores .base (https://<host>) and .bigip_host for logging. """ s = requests.Session() s.verify = False # lab BIG-IPs always have self-signed certs s.base = "https://{}".format(host) s.bigip_host= host s.headers.update({"Content-Type": "application/json"}) # ── Attempt 1: token auth ───────────────────────────────────────── token_url = "{}/mgmt/shared/authn/login".format(s.base) try: r = requests.post( token_url, json={"username": user, "password": password, "loginProviderName": "tmos"}, verify=False, timeout=15, ) if r.status_code == 200: token = r.json().get("token", {}).get("token", "") if token: s.headers.update({"X-F5-Auth-Token": token}) # Remove Basic auth — token takes precedence s.auth = None print(green(" ✓ Authenticated via token (X-F5-Auth-Token)")) return s # 400 can mean loginProviderName is wrong — try without it r2 = requests.post( token_url, json={"username": user, "password": password}, verify=False, timeout=15, ) if r2.status_code == 200: token = r2.json().get("token", {}).get("token", "") if token: s.headers.update({"X-F5-Auth-Token": token}) s.auth = None print(green(" ✓ Authenticated via token (X-F5-Auth-Token)")) return s except requests.exceptions.RequestException: pass # network error — fall through to Basic # ── Attempt 2: Basic auth ───────────────────────────────────────── s.auth = (user, password) test_url = "{}/mgmt/tm/sys/clock".format(s.base) try: r = s.get(test_url, timeout=15) if r.status_code == 200: print(green(" ✓ Authenticated via HTTP Basic auth")) return s elif r.status_code == 401: print(red(" ✗ 401 Unauthorized — both token and Basic auth failed.")) print(yellow(" Things to check:")) print(yellow(" • Username / password correct?")) print(yellow(" • On the BIG-IP run: tmsh modify auth user admin shell bash")) print(yellow(" • REST enabled? tmsh modify sys httpd allow replace-all-with { ALL }")) print(yellow(" • Try: curl -sk -u admin:pass https://{}/mgmt/tm/sys/clock".format(host))) sys.exit(1) else: print(red(" Unexpected HTTP {} during auth test.".format(r.status_code))) sys.exit(1) except requests.exceptions.ConnectionError: print(red(" Cannot reach https://{}".format(host))) print(yellow(" Is the IP correct? Is port 443 open? Try:")) print(yellow(" curl -sk https://{}/mgmt/tm/sys/clock".format(host))) sys.exit(1) def rest_get(session, path, fatal=True): url = session.base + path try: r = session.get(url, timeout=15) r.raise_for_status() return r.json() except requests.exceptions.ConnectionError: print(red(" Cannot reach {} — check IP and that port 443 is open.".format(session.base))) if fatal: sys.exit(1) return {} except requests.exceptions.HTTPError: if r.status_code == 401: print(red(" 401 on {} — token may have expired. Re-run the script.".format(path))) else: print(red(" HTTP {} on {}: {}".format(r.status_code, path, r.text[:200]))) if fatal: sys.exit(1) return {} except ValueError: # Non-JSON response print(red(" Non-JSON response from {}: {}".format(path, r.text[:200]))) if fatal: sys.exit(1) return {} def rest_post(session, path, body, fatal=True): url = session.base + path try: r = session.post(url, json=body, timeout=60) r.raise_for_status() return r.json() except requests.exceptions.HTTPError: if r.status_code == 401: print(red(" 401 on {} — token may have expired. Re-run the script.".format(path))) else: print(red(" HTTP {} on {}: {}".format(r.status_code, path, r.text[:300]))) if fatal: sys.exit(1) return {} except ValueError: print(red(" Non-JSON response from {}: {}".format(path, r.text[:200]))) if fatal: sys.exit(1) return {} # ── Step 1 — BIG-IP connection details ─────────────────────────────────────── def prompt_connection(args_ns): print(cyan("\n[1/6] BIG-IP connection details")) host = (os.environ.get("F5_HOST") or (args_ns.host if args_ns.host else None) or input(" BIG-IP management IP or hostname: ").strip()) user = (os.environ.get("F5_USER") or (args_ns.user if args_ns.user else None) or input(" Username [admin]: ").strip() or "admin") password = (os.environ.get("F5_PASS") or (args_ns.password if args_ns.password else None) or getpass.getpass(" Password: ")) return host, user, password # ── Step 2 — Test connection & collect device identity ─────────────────────── def collect_device_info(session): print(cyan("\n[2/6] Connecting to BIG-IP and collecting device identity ...")) # Hostname d = rest_get(session, "/mgmt/tm/sys/global-settings?$select=hostname") hostname = d.get("hostname", "unknown") # Management IP (first entry) d = rest_get(session, "/mgmt/tm/sys/management-ip", fatal=False) items = d.get("items", []) mgmt_ip_cidr = items[0].get("name", "unknown") if items else "unknown" mgmt_ip = mgmt_ip_cidr.split("/")[0] # TMOS version d = rest_get(session, "/mgmt/tm/sys/version") entries = d.get("entries", {}) version = "unknown" for k, v in entries.items(): nested = v.get("nestedStats", {}).get("entries", {}) if "Version" in nested: version = nested["Version"].get("description", "unknown") break # ── UUID — try three endpoints in order ─────────────────────────── # 1. /mgmt/shared/device-availability (most reliable, returns UUID as key) uuid = "unknown" d = rest_get(session, "/mgmt/shared/device-availability", fatal=False) avail = d.get("deviceAvailability", {}) # Keys in deviceAvailability are the UUIDs; find first that looks like a UUID for candidate in avail.keys(): if re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', candidate, re.I): uuid = candidate break # 2. /mgmt/shared/identified-devices/config/device-info (VE / newer TMOS) if uuid == "unknown": d = rest_get(session, "/mgmt/shared/identified-devices/config/device-info", fatal=False) uuid = d.get("machineId", "unknown") # 3. /mgmt/tm/sys/hardware chassisId (physical appliances) if uuid == "unknown": d = rest_get(session, "/mgmt/tm/sys/hardware", fatal=False) hw_entries = d.get("entries", {}) for k, v in hw_entries.items(): nested = v.get("nestedStats", {}).get("entries", {}) if "chassisId" in nested: uuid = nested["chassisId"].get("description", "unknown") break else: # Still read hardware for platform info d = rest_get(session, "/mgmt/tm/sys/hardware", fatal=False) hw_entries = d.get("entries", {}) # ── Platform / model ────────────────────────────────────────────── # Try marketingName first (gives "BIG-IP Virtual Edition", "BIG-IP i5800" etc.) platform = "unknown" d2 = rest_get(session, "/mgmt/shared/identified-devices/config/device-info", fatal=False) platform = d2.get("marketingName", "") or d2.get("platform", "") if not platform or platform == "unknown": # Fallback to sys/hardware platform entry hw_entries = rest_get(session, "/mgmt/tm/sys/hardware", fatal=False).get("entries", {}) for k, v in hw_entries.items(): nested = v.get("nestedStats", {}).get("entries", {}) if "platform" in nested: platform = nested["platform"].get("description", "unknown") break # ── Current active license key ──────────────────────────────────── lic_d = rest_get(session, "/mgmt/tm/sys/license", fatal=False) lic_entries = lic_d.get("entries", {}) reg_key_active = "none" for k, v in lic_entries.items(): nested = v.get("nestedStats", {}).get("entries", {}) if "registrationKey" in nested: reg_key_active = nested["registrationKey"].get("description", "none") break info = { "hostname": hostname, "mgmt_ip": mgmt_ip, "uuid": uuid, "platform": platform or "unknown", "tmos_version": version, "current_reg_key": reg_key_active, "captured_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } print(green(" ✓ Hostname : {}".format(hostname))) print(green(" ✓ Mgmt IP : {}".format(mgmt_ip))) print(green(" ✓ UUID : {}".format(uuid))) print(green(" ✓ Platform : {}".format(platform or "unknown"))) print(green(" ✓ TMOS version : {}".format(version))) print(dim (" Active reg key : {}".format(reg_key_active))) return info # ── Step 3 — Registration key ───────────────────────────────────────────────── def ask_reg_key(): print(cyan("\n[3/6] Registration key")) while True: key = input(" Enter base registration key (XXXXX-XXXXX-XXXXX-XXXXX-XXXXXXX): ").strip().upper() if re.match(r'^[A-Z0-9]{5}-[A-Z0-9]{5}-[A-Z0-9]{5}-[A-Z0-9]{5}-[A-Z0-9]{7}$', key): return key print(yellow(" Format not recognised — expected 5 groups (5-5-5-5-7) separated by hyphens.")) # ── Step 4 — Generate dossier via REST bash utility ─────────────────────────── def generate_dossier(session, reg_key): print(cyan("\n[4/6] Generating dossier on BIG-IP via iControl REST ...")) print(dim (" POST /mgmt/tm/util/bash → get_dossier -b {}".format(reg_key))) body = { "command": "run", "utilCmdArgs": "-c 'get_dossier -b {}'".format(reg_key), } result = rest_post(session, "/mgmt/tm/util/bash", body) dossier = result.get("commandResult", "").strip() if not dossier: print(red(" get_dossier returned nothing — is the reg key valid?")) sys.exit(1) # Sanity: dossier should be a long base64-ish string if len(dossier) < 100: print(red(" Unexpected dossier output: {}".format(dossier))) sys.exit(1) print(green(" ✓ Dossier generated ({} chars)".format(len(dossier)))) return dossier # ── Step 5a — Online SOAP activation ───────────────────────────────────────── def _parse_soap_response(xml_text): """ Parse F5's getLicenseResponse SOAP envelope. F5 uses SOAP RPC-encoding with href/multiRef indirection: <getLicenseReturn href="#id0"/> ← points to the transaction <multiRef id="id0"> ... <state href="#id2"/> ... </multiRef> <multiRef id="id2">EULA_REQUIRED</multiRef> ← bare text = the state value All values are reached by following href="#idN" → multiRef id="idN".text Tags may also carry namespace prefixes (ns2:, ns3:, soapenc:) which are stripped when comparing local names. Returns dict: state, eula, license, fault_num, fault_text """ result = dict(state='', eula='', license='', fault_num='', fault_text='') try: root = ET.fromstring(xml_text) except ET.ParseError as e: result['state'] = 'ERROR' result['fault_text'] = 'XML parse error: {}'.format(e) return result # ── 1. Build id → element map for every multiRef ────────────────── refs = {} for el in root.iter(): local = el.tag.split('}')[-1] if '}' in el.tag else el.tag if local == 'multiRef': eid = el.get('id') if eid: refs[eid] = el def local_name(el): return el.tag.split('}')[-1] if '}' in el.tag else el.tag def deref(el): """Follow href to a multiRef and return its text; or return element's own text.""" if el is None: return '' href = (el.get('href') or '').lstrip('#') if href and href in refs: return (refs[href].text or '').strip() return (el.text or '').strip() def first_child(parent, name): for c in parent: if local_name(c) == name: return c return None # ── 2. Find the root LicenseTransaction element ──────────────────── # It is always the multiRef pointed to by <getLicenseReturn href="#id0"/> txn = refs.get('id0') if txn is None: # Fallback: find by xsi:type containing LicenseTransaction XSI = 'http://www.w3.org/1999/XMLSchema-instance' for el in refs.values(): xtype = el.get('{%s}type' % XSI, '') if 'LicenseTransaction' in xtype: txn = el break if txn is None: result['state'] = 'ERROR' result['fault_text'] = 'LicenseTransaction element not found in SOAP response' return result # ── 3. Extract fields, dereferencing hrefs as needed ────────────── result['state'] = deref(first_child(txn, 'state')) result['eula'] = deref(first_child(txn, 'eula')) result['license'] = deref(first_child(txn, 'license')) fault_el = first_child(txn, 'fault') if fault_el is not None: fault_ref = refs.get((fault_el.get('href') or '').lstrip('#')) if fault_ref is not None: result['fault_num'] = deref(first_child(fault_ref, 'faultNumber')) result['fault_text'] = deref(first_child(fault_ref, 'faultText')) if not result['state']: result['state'] = 'ERROR' result['fault_text'] = result['fault_text'] or 'Empty state in SOAP response' return result # Known terminal fault codes — no point retrying these _FATAL_FAULTS = { '51089': "Internal/PD key cannot be used in the Production environment", '51092': "Key already activated on a different unit — contact licensingsupport@f5.com", '51093': "Registration key not found", '51094': "Registration key has been revoked", } def _soap_call(dossier, eula="", debug=False): """ Single SOAP POST to activate.f5.com. Returns (state, eula_text, license_text, fault_msg). Possible state values returned: EULA_REQUIRED re-submit with eula= set to the EULA text we return LICENSE_RETURNED success — license_text contains the license FAULT_RETURNED hard fault from F5 — fault_msg has the detail ERROR connection error or unparseable response """ body = SOAP_ENVELOPE.format(dossier=dossier, eula=eula) headers = { "Content-Type": "text/xml; charset=utf-8", "SOAPAction": '""', } try: r = requests.post( ACTIVATE_ENDPOINT, data=body.encode("utf-8"), headers=headers, verify=True, timeout=60, ) except requests.exceptions.RequestException as e: return "ERROR", "", "", "Connection error: {}".format(e) if debug: print(dim("\n --- RAW SOAP RESPONSE (HTTP {}) ---".format(r.status_code))) print(dim(r.text[:3000])) print(dim(" --- END SOAP RESPONSE ---\n")) p = _parse_soap_response(r.text) fault_msg = " — ".join(filter(None, [p['fault_num'], p['fault_text']])).strip() return p['state'], p['eula'], p['license'], fault_msg def activate_online(dossier, reg_key): print(cyan("\n[5/6] Online activation → activate.f5.com SOAP")) eula_accepted = "" first_call = True for attempt in range(1, 11): print(" Attempt {}/10 ...".format(attempt), end=" ") sys.stdout.flush() state, eula_text, license_text, fault = _soap_call( dossier, eula_accepted, debug=first_call ) first_call = False print(dim("state={}".format(state))) # ── Success ──────────────────────────────────────────────────── if state == "LICENSE_RETURNED": if not license_text: print(yellow(" State is LICENSE_RETURNED but license text is empty — retrying.")) time.sleep(3) continue print(green(" ✓ License returned successfully!")) return license_text # ── EULA challenge — auto-accept and resubmit immediately ───── elif state == "EULA_REQUIRED": if eula_text: print(dim(" Auto-accepting EULA ({} lines).".format(len(eula_text.splitlines())))) eula_accepted = eula_text continue # resubmit immediately, no sleep # ── Hard fault from F5 (FAULT_RETURNED state) ───────────────── elif state == "FAULT_RETURNED": # Extract fault number for lookup fault_num = fault.split(' ')[0].strip().rstrip('—').strip() print(red("\n ✗ Activation fault: {}".format(fault))) if fault_num in _FATAL_FAULTS: print(red(" This error is not recoverable via script:")) print(red(" {}".format(_FATAL_FAULTS[fault_num]))) if fault_num == '51089': print(yellow("\n Hint: error 51089 means you used an internal F5 dev/lab")) print(yellow(" registration key (starts with 'I' or issued for internal use).")) print(yellow(" Use a real production or eval key from your F5 account.")) elif fault_num == '51092': print(yellow(" Email licensingsupport@f5.com with your reg key and chassis serial.")) sys.exit(1) # ── Transient / connection error — back off and retry ────────── elif state == "ERROR": print(red(" Error: {}".format(fault))) wait = min(2 ** attempt, 30) print(yellow(" Retrying in {}s ...".format(wait))) time.sleep(wait) else: print(yellow(" Unrecognised state '{}' — retrying in 3s ...".format(state))) time.sleep(3) print(red(" Activation failed after 10 attempts.")) sys.exit(1) # ── Step 5b — Offline ───────────────────────────────────────────────────────── def activate_offline(dossier, reg_key): print(cyan("\n[5/6] Offline activation")) # Save dossier file dossier_path = "{}.dossier".format(reg_key) with open(dossier_path, "w") as f: f.write(dossier) # ── Print dossier so the user can copy it directly from the terminal ── print(yellow("\n " + "─"*60)) print(bold(" DOSSIER — copy everything between the lines and paste")) print(bold(" it into https://activate.f5.com/license/")) print(yellow(" " + "─"*60)) print() print(dossier) print() print(yellow(" " + "─"*60)) print(dim(" (Also saved to file: {})".format(dossier_path))) print(""" Steps: 1. Copy the dossier text above. 2. Browse to https://activate.f5.com/license/ Paste into the dossier box, click Next, accept the EULA. 3. Copy the entire license text from the result page. 4. Paste it below and press Enter on a blank line. Alternatively, from any internet-connected machine run: curl -s -X POST \\ -F "dossier=$(cat {dp})" \\ -F "submit=Next" \\ https://activate.f5.com/license/license.do \\ | grep -oP '(?<=<textarea[^>]*name="license"[^>]*>).*?(?=</textarea>)' \\ | sed 's/"/\"/g' """.format(dp=dossier_path)) # ── Wait for the user to paste the license back ─────────────────── print(yellow(" " + "─"*60)) print(bold(" LICENSE — paste the full license text below,")) print(bold(" then press Enter on a blank line to continue.")) print(yellow(" " + "─"*60)) lines = [] while True: try: line = input("") except EOFError: break if line == "" and lines: break lines.append(line) license_text = "\n".join(lines).strip() if not license_text: print(red(" No license text entered — exiting without installing.")) sys.exit(1) print(green(" ✓ License text received ({} chars).".format(len(license_text)))) return license_text # ── Step 6 — Push license back to BIG-IP via REST ──────────────────────────── def install_license(session, license_text, reg_key, device_info): print(cyan("\n[6/6] Pushing license to BIG-IP ...")) # Save license locally first lic_path = "{}.license".format(reg_key) with open(lic_path, "w") as f: f.write(license_text) print(green(" License saved locally: {}".format(lic_path))) # Write license text to /config/bigip.license via REST bash utility # We use a heredoc-style echo through the bash util # License text can contain special chars so we base64-encode the transfer import base64 b64 = base64.b64encode(license_text.encode("utf-8")).decode("ascii") write_cmd = ( "echo '{b64}' | base64 -d > /config/bigip.license" ).format(b64=b64) print(dim(" Writing /config/bigip.license via REST bash util ...")) body = {"command": "run", "utilCmdArgs": "-c '{}'".format(write_cmd)} rest_post(session, "/mgmt/tm/util/bash", body) # Run reloadlic print(dim(" Running reloadlic ...")) body = {"command": "run", "utilCmdArgs": "-c 'reloadlic'"} result = rest_post(session, "/mgmt/tm/util/bash", body, fatal=False) reload_out = result.get("commandResult", "").strip() if reload_out: print(dim(" reloadlic output: {}".format(reload_out))) # Verify — poll license endpoint for up to 30s print(dim(" Verifying license is active ...")) for i in range(6): time.sleep(5) lic_d = rest_get(session, "/mgmt/tm/sys/license", fatal=False) entries = lic_d.get("entries", {}) for k, v in entries.items(): nested = v.get("nestedStats", {}).get("entries", {}) if "registrationKey" in nested: active_key = nested["registrationKey"].get("description", "") if active_key and active_key != "none": print(green(" ✓ License active — registered key: {}".format(active_key))) return lic_path, active_key print(dim(" ... waiting for license to apply ({}/6) ...".format(i + 1))) print(yellow(" Could not confirm license via REST — check 'tmsh show sys license' on the box.")) return lic_path, "unconfirmed" # ── Asset record ────────────────────────────────────────────────────────────── def save_asset_record(device_info, reg_key, lic_path): record = dict(device_info) record["reg_key"] = reg_key record["license_file"] = os.path.abspath(lic_path) safe_host = device_info["hostname"].replace(".", "_").replace(" ", "_") save_path = "f5_asset_{}.json".format(safe_host) with open(save_path, "w") as f: json.dump(record, f, indent=2) print(green("\n Asset record saved: {}".format(save_path))) print(cyan("\n Asset summary:")) col_w = max(len(k) for k in record) + 2 for k, v in sorted(record.items()): print(" {:{w}}: {}".format(k, v, w=col_w)) return save_path, record # ── Postman reference card ──────────────────────────────────────────────────── def print_postman_card(host, reg_key): base = "https://{}".format(host) print(cyan("\n" + "="*64)) print(bold(" Postman / REST Reference Card")) print("="*64) calls = [ ("Auth — token (preferred)", "POST {base}/mgmt/shared/authn/login".format(base=base), '{"username":"admin","password":"...","loginProviderName":"tmos"}', "Use returned .token.token as X-F5-Auth-Token header on all calls"), ("Auth — Basic (fallback)", "Authorization: Basic <base64(user:pass)>", "Content-Type: application/json", None), ("1. Get hostname", "GET {base}/mgmt/tm/sys/global-settings?$select=hostname".format(base=base), None, None), ("2. Get management IP", "GET {base}/mgmt/tm/sys/management-ip".format(base=base), None, None), ("3. Get TMOS version", "GET {base}/mgmt/tm/sys/version".format(base=base), None, None), ("4. Get current license", "GET {base}/mgmt/tm/sys/license".format(base=base), None, "entries[].nestedStats.entries.registrationKey.description"), ("5. Get UUID (preferred)", "GET {base}/mgmt/shared/device-availability".format(base=base), None, "Keys of .deviceAvailability are UUIDs e.g. 2527ff8a-9ec4-..."), ("6. Get platform / model", "GET {base}/mgmt/shared/identified-devices/config/device-info".format(base=base), None, ".marketingName e.g. 'BIG-IP Virtual Edition'"), ("7. Generate dossier", "POST {base}/mgmt/tm/util/bash".format(base=base), '{{"command":"run","utilCmdArgs":"-c \'get_dossier -b {key}\'"}} '.format(key=reg_key), "commandResult contains the dossier text"), ("8. SOAP call to activate.f5.com", "POST https://activate.f5.com/license/services/urn:com.f5.license.v5b.ActivationService", "Content-Type: text/xml; charset=utf-8 | SOAPAction: \"\"", "Body: SOAP_ENVELOPE from script with <dossier> and <eula> filled in"), ("9. Write license to BIG-IP", "POST {base}/mgmt/tm/util/bash".format(base=base), '{"command":"run","utilCmdArgs":"-c \'echo <b64> | base64 -d > /config/bigip.license\'"}', "base64-encode the license text to avoid quoting issues"), ("10. Reload license", "POST {base}/mgmt/tm/util/bash".format(base=base), '{"command":"run","utilCmdArgs":"-c \'reloadlic\'"}', None), ("11. Verify active license", "GET {base}/mgmt/tm/sys/license".format(base=base), None, "entries[].nestedStats.entries.registrationKey.description"), ] for item in calls: title, line1, line2, note = item print(yellow("\n " + title)) print(" " + line1) if line2: print(" " + line2) if note: print(dim(" ↳ " + note)) print() # ── Config file support ─────────────────────────────────────────────────────── def load_config(path): if path and os.path.exists(path): with open(path) as f: return json.load(f) return {} # ╔══════════════════════════════════════════════════════════════════════════════╗ # ║ MAPPER — BIG-IQ JSON ↔ My-F5 portal CSV ║ # ║ Modes: map (interactive assignment) | summary (SKU counts) ║ # ╚══════════════════════════════════════════════════════════════════════════════╝ _DEFAULT_ALLOWED = ["Ready to activate", "Ready to reassign"] # Chargeback field regexes _CB_UID = re.compile(r'UID=([^;|]+)') _CB_HN = re.compile(r'HN=([^;|]+)') _CB_IP = re.compile(r'IP=([^;|]+)') _STATUS_PRIORITY = { "active": 0, "ready to activate": 1, "ready to reassign": 2, } _REASON_LABELS = { "uid_field": "Matches UID column", "chargeback_uid": "Matches Chargeback UID", "explicit_hn_ip": "Matches hostname/IP columns", "chargeback_hn_ip":"Matches Chargeback hostname/IP", } _FATAL_FAULTS = { '51089': "Internal/PD key cannot be used in the Production environment", '51092': "Key already activated on a different unit — contact licensingsupport@f5.com", '51093': "Registration key not found", '51094': "Registration key has been revoked", } # ── Py2/3 CSV helpers ───────────────────────────────────────────────────────── def _csv_open_read(path): """Open a CSV file for reading in a Py2/3 compatible way.""" if sys.version_info[0] >= 3: return open(path, "r", newline="", encoding="utf-8-sig") else: return open(path, "rb") def _csv_open_write(path): """Open a CSV file for writing in a Py2/3 compatible way.""" if sys.version_info[0] >= 3: return open(path, "w", newline="", encoding="utf-8") else: # Python 2: open in binary mode so csv module handles line endings return open(path, "wb") # ── JSON loader ─────────────────────────────────────────────────────────────── def mapper_load_json(path): """Load a BIG-IQ pool-usage JSON export and return a flat list of device dicts.""" if sys.version_info[0] >= 3: fh = open(path, "r", encoding="utf-8") else: fh = open(path, "rb") with fh: data = json.load(fh) records = data.get("records") or [] pool_regkey = data.get("poolRegkey", data.get("regkey", "")) pool_name = data.get("poolName", "") version = data.get("version", "") devices = [] revoked_count = 0 for idx, rec in enumerate(records): # Skip revoked records. BIG-IQ uses two patterns: # 1. A "revoked" key with a timestamp value e.g. "revoked": "2026-06-01T14:03:04Z" # 2. A "status"/"licenseStatus" field set to "revoked", "cancelled" etc. # Either pattern means the grant was withdrawn and must be ignored. rec_revoked = (rec.get("revoked") or "").strip() rec_status = (rec.get("status") or rec.get("licenseStatus") or "").lower() rec_sku = (rec.get("sku") or "").lower() if rec_revoked or \ rec_status in ("revoked", "cancelled", "terminated", "expired") or \ "revoked" in rec_sku or "cancelled" in rec_sku: revoked_count += 1 continue devices.append({ "json_source": os.path.basename(path), "json_path": path, "json_index": idx, "pool_regkey": pool_regkey, "pool_name": pool_name, "product_version":version, "id": (rec.get("id") or ""), "address": (rec.get("address") or ""), "hostname": (rec.get("hostname") or ""), "sku": (rec.get("sku") or ""), "type": (rec.get("type") or ""), "uom": (rec.get("uom") or ""), "granted": (rec.get("granted") or ""), }) if revoked_count: print(yellow(" [{f}] Skipped {n} revoked/cancelled record(s).".format( f=os.path.basename(path), n=revoked_count))) return devices def mapper_expand_json_inputs(raw_list): """Expand a list of file/dir paths to individual .json file paths.""" paths = [] for entry in (raw_list or []): for token in entry.split(","): token = token.strip() if not token: continue if os.path.isdir(token): for root, _, files in os.walk(token): for name in files: if name.lower().endswith(".json"): paths.append(os.path.join(root, name)) else: paths.append(token) seen, ordered = set(), [] for p in paths: ap = os.path.abspath(p) if ap not in seen: seen.add(ap) ordered.append(ap) return ordered # ── Portal CSV loader ───────────────────────────────────────────────────────── def _extract_cb(chargeback): uid = hn = ip = "" if chargeback: m = _CB_UID.search(chargeback); uid = m.group(1).strip() if m else "" m = _CB_HN.search(chargeback); hn = m.group(1).strip() if m else "" m = _CB_IP.search(chargeback); ip = m.group(1).strip() if m else "" return uid, hn, ip def _norm_key_row(raw): """Normalise a raw CSV/block dict into a standard key row dict.""" norm = {} for k, v in (raw or {}).items(): if k is not None: norm[k.strip()] = (v or "").strip() def get(*names): for n in names: for actual in norm: if actual.lower() == n.lower(): return norm[actual] return "" chargeback = get("Chargeback") or get("chargeback_new","Chargeback New") or get("chargeback_existing") row = { "Registration Key": get("Registration Key", "Registration Key:"), "Subscription ID": get("Subscription ID"), "Product": get("Product"), "Capacity": get("Capacity"), "Chargeback": chargeback, "Status": get("Status"), "Expiration": get("Expiration"), "AddOns": get("AddOns", "Add Ons", "Add-Ons"), "_uid_field": get("uid", "UID"), "_hostname_field": get("hostname", "Hostname"), "_ip_field": get("ip", "IP", "IP Address", "IPAddress"), } # Back-fill UID/HN/IP from chargeback when explicit columns are empty cb_uid, cb_hn, cb_ip = _extract_cb(chargeback) if not row["_uid_field"] and cb_uid: row["_uid_field"] = cb_uid if not row["_hostname_field"] and cb_hn: row["_hostname_field"] = cb_hn if not row["_ip_field"] and cb_ip: row["_ip_field"] = cb_ip return row def mapper_load_csv(path): """Load a My-F5 portal CSV export, return deduplicated list of key dicts.""" rows = [] with _csv_open_read(path) as fh: # Py2: DictReader on bytes file; Py3: on text file reader = csv.DictReader(fh) for raw in reader: rows.append(_norm_key_row(raw)) # Deduplicate by Registration Key seen, deduped = set(), [] for row in rows: reg = row.get("Registration Key", "") if reg and reg not in seen: deduped.append(row) seen.add(reg) return deduped # ── Key helpers ─────────────────────────────────────────────────────────────── def _device_id(uid, hostname, ip): if uid: return uid if hostname and ip: return "HN:{}|IP:{}".format(hostname, ip) if hostname: return "HN:{}".format(hostname) if ip: return "IP:{}".format(ip) return "UNKNOWN-DEVICE" def _build_chargeback(existing, uid, hostname, ip, limit=255): core = "UID={};HN={};IP={}".format(uid or "", hostname or "", ip or "") existing = (existing or "").strip() if core and existing and core in existing: return existing[:limit] if not existing: return core[:limit] combined = "{} | {}".format(existing, core) if len(combined) <= limit: return combined keep = limit - len(core) - 3 return ("{}".format(existing[:keep]) + " | " + core)[:limit] if keep > 0 else core[:limit] def _prepare_used_map(keys): used = {} for key in keys: reg = key.get("Registration Key", "") if not reg: continue owners = set() uid = key.get("_uid_field", ""); hn = key.get("_hostname_field", "") ip = key.get("_ip_field", ""); cb = key.get("Chargeback", "") cb_uid, cb_hn, cb_ip = _extract_cb(cb) if uid: owners.add(uid) if hn and ip: owners.add("HN:{}|IP:{}".format(hn, ip)) if cb_uid: owners.add(cb_uid) if cb_hn and cb_ip: owners.add("HN:{}|IP:{}".format(cb_hn, cb_ip)) if owners: used.setdefault(reg, set()).update(owners) return used # Statuses that are never offered regardless of --allow-status _ALWAYS_SKIP_STATUSES = {"revoked", "cancelled", "expired", "terminated"} def _filter_eligible(keys, allowed_statuses): """ Return keys eligible for new assignment. Rules (applied in order): 1. Any key whose Status is in _ALWAYS_SKIP_STATUSES is silently dropped — revoked/cancelled keys must never be offered even if accidentally listed in --allow-status. 2. Key must match one of the allowed_statuses (case-insensitive). 3. Keys that already have UID/hostname/IP populated are treated as in-use and skipped UNLESS their status is exactly "ready to reassign". """ allow = set(s.lower() for s in (allowed_statuses or [])) seen, out = set(), [] for key in keys: reg = key.get("Registration Key", "") if not reg or reg in seen: continue sl = key.get("Status", "").lower() # Rule 1 — hard skip for revoked/cancelled etc. if sl in _ALWAYS_SKIP_STATUSES: seen.add(reg) continue # Rule 2 — must be in the allowed list if sl not in allow: seen.add(reg) continue # Rule 3 — skip already-assigned unless reassignable if any(key.get(f) for f in ("_uid_field", "_hostname_field", "_ip_field")) \ and sl != "ready to reassign": seen.add(reg) continue out.append(key) seen.add(reg) return out def _find_existing_matches(keys, uid, hostname, ip): matches = [] for key in keys: reg = key.get("Registration Key", "") if not reg: continue cb = key.get("Chargeback", "") cb_uid, cb_hn, cb_ip = _extract_cb(cb) f_uid = key.get("_uid_field", ""); f_hn = key.get("_hostname_field", "") f_ip = key.get("_ip_field", "") reason = None if uid and f_uid and f_uid == uid: reason = "uid_field" elif uid and cb_uid and cb_uid == uid: reason = "chargeback_uid" elif hostname and ip and f_hn==hostname and f_ip==ip: reason = "explicit_hn_ip" elif hostname and ip and cb_hn==hostname and cb_ip==ip: reason = "chargeback_hn_ip" if reason: matches.append((key, reason)) matches.sort(key=lambda x: _STATUS_PRIORITY.get(x[0].get("Status","").lower(), 99)) seen, unique = set(), [] for key, reason in matches: reg = key.get("Registration Key","") if reg and reg not in seen: unique.append((key, reason)) seen.add(reg) return unique # ── Interactive display helpers ─────────────────────────────────────────────── def _print_key_line(label, key, used_map, device_id, note=None): reg = key.get("Registration Key","") parts = [p for p in [reg, key.get("Product",""), key.get("Capacity",""), key.get("Status","")] if p] summary = " | ".join(parts) cb = key.get("Chargeback","") cb_snip = (" | CB: " + cb[:80] + ("..." if len(cb)>80 else "")) if cb else "" owners = used_map.get(reg, set()) other = [o for o in owners if o != device_id] owner_txt = "" if other: owner_txt = "assigned to {}".format(", ".join(sorted(other))) if note: label_txt = _REASON_LABELS.get(note, note) owner_txt = ("{}; {}".format(owner_txt, label_txt)) if owner_txt else label_txt tail = " [{}]".format(owner_txt) if owner_txt else "" print(" {}) {}{}{}".format(label, summary, cb_snip, tail)) def _confirm_reuse(key, device_id, used_map): reg = key.get("Registration Key","") others = [o for o in used_map.get(reg, set()) if o != device_id] if others: ans = input(" WARNING: {} already mapped to {}. Use anyway? [y/N]: " .format(reg, ", ".join(sorted(others)))).strip().lower() return ans in ("y","yes") return True def _find_by_regkey(keys, regkey): t = regkey.strip() for k in keys: if k.get("Registration Key","") == t: return k return None def _search_keys(all_keys, used_map, device_id): term = input(" Search (product / capacity / status / reg key): ").strip().lower() if not term: return None, None matches, seen = [], set() for key in all_keys: reg = key.get("Registration Key","") if not reg or reg in seen: continue hay = " ".join([reg, key.get("Product",""), key.get("Capacity",""), key.get("Status",""), key.get("Subscription ID",""), key.get("Chargeback","")]).lower() if term in hay: matches.append(key); seen.add(reg) if not matches: print(" No matches for '{}'.".format(term)) return None, None print(" Search results:") for i, k in enumerate(matches, 1): _print_key_line("S{}".format(i), k, used_map, device_id) sel = input(" Select S# or exact reg key (Enter to cancel): ").strip() if not sel: return None, None lo = sel.lower() if lo.startswith("s") and lo[1:].isdigit(): idx = int(lo[1:]) - 1 if 0 <= idx < len(matches): k = matches[idx] return (k, "manual_search") if _confirm_reuse(k, device_id, used_map) else (None, None) k = _find_by_regkey(all_keys, sel) if k and _confirm_reuse(k, device_id, used_map): return k, "manual_direct" print(" No key selected.") return None, None def _manual_entry(all_keys, used_map, device_id): reg = input(" Enter registration key (blank to cancel): ").strip() if not reg: return None k = _find_by_regkey(all_keys, reg) if k: return (k, "manual_direct") if _confirm_reuse(k, device_id, used_map) else None ans = input(" Key not in portal export. Create placeholder? [y/N]: ").strip().lower() if ans not in ("y","yes"): return None placeholder = {"Registration Key": reg, "Subscription ID": "", "Product": "", "Capacity": "", "Chargeback": "", "Status": "manual", "_uid_field": "", "_hostname_field": "", "_ip_field": ""} return placeholder, "manual_entry" def _interactive_select(device, existing_matches, eligible_keys, all_keys, used_map): uid = device.get("id","") hostname = device.get("hostname","") ip = device.get("address","") device_id = _device_id(uid, hostname, ip) existing_regs = set(pair[0].get("Registration Key","") for pair in existing_matches) candidate_keys, seen = [], set() for k in eligible_keys: reg = k.get("Registration Key","") if reg and reg not in seen and reg not in existing_regs: candidate_keys.append(k); seen.add(reg) while True: print(cyan("\n " + "─"*60)) print(" Device UID : {}".format(uid or "(none)")) print(" Hostname : {}".format(hostname or "(none)")) print(" Address : {}".format(ip or "(none)")) print(" SKU : {}".format(device.get("sku","") or "(none)")) print(" Source : {} (record #{})".format( device.get("json_source",""), device.get("json_index",0))) if existing_matches: print(yellow("\n Existing matches in portal data:")) for i, (k, reason) in enumerate(existing_matches, 1): _print_key_line("E{}".format(i), k, used_map, device_id, note=reason) else: print(dim(" No existing portal mappings detected.")) if candidate_keys: statuses = sorted(set(k.get("Status","") for k in candidate_keys)) print(yellow("\n Candidate keys ({}):".format(", ".join(statuses)))) for i, k in enumerate(candidate_keys, 1): _print_key_line(str(i), k, used_map, device_id) else: print(dim(" No eligible candidate keys. Use search or manual entry.")) print("\n E# keep existing | # select candidate | s search | m manual | Enter skip") choice = input(" > ").strip() if not choice: return None, "skipped", {} lo = choice.lower() if lo.startswith("e") and lo[1:].isdigit(): idx = int(lo[1:]) - 1 if 0 <= idx < len(existing_matches): k, reason = existing_matches[idx] if _confirm_reuse(k, device_id, used_map): return k, "kept_existing", {"existing_reason": reason} continue print(" Invalid existing selection.") continue if choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(candidate_keys): k = candidate_keys[idx] if _confirm_reuse(k, device_id, used_map): return k, "new_assignment", {} continue print(" Invalid selection.") continue if lo == "s": k, mode_label = _search_keys(all_keys, used_map, device_id) if k: return k, mode_label, {} continue if lo == "m": result = _manual_entry(all_keys, used_map, device_id) if result: k, mode_label = result if _confirm_reuse(k, device_id, used_map): return k, mode_label, {} continue # Direct reg key typed k = _find_by_regkey(all_keys, choice) if k: if _confirm_reuse(k, device_id, used_map): return k, "manual_direct", {} continue print(" Unrecognised input.") # ── Output writer ───────────────────────────────────────────────────────────── _OUTPUT_FIELDS = [ "json_source", "json_index", "pool_regkey", "pool_name", "uid", "hostname", "ip", "sku", "type", "uom", "granted", "reg_key", "product", "capacity", "status", "subscription_id", "chargeback_existing", "chargeback_new", "mapping_mode", "existing_reason", ] def _write_mapping_csv(path, rows): with _csv_open_write(path) as fh: writer = csv.DictWriter(fh, fieldnames=_OUTPUT_FIELDS) writer.writeheader() for row in rows: writer.writerow({f: row.get(f, "") for f in _OUTPUT_FIELDS}) # ── Summary mode ────────────────────────────────────────────────────────────── def _run_summary(args): json_paths = mapper_expand_json_inputs(args.json) if not json_paths: print(red(" No JSON files found.")) sys.exit(1) devices = [] for p in json_paths: if not os.path.isfile(p): print(yellow(" Skipping missing file: {}".format(p))) continue devices.extend(mapper_load_json(p)) if not devices: print(red(" No device records found in JSON files.")) sys.exit(1) counts = {} for d in devices: sku = d.get("sku") or "UNKNOWN" counts[sku] = counts.get(sku, 0) + 1 print(bold("\n SKU usage summary ({} devices across {} files):".format( len(devices), len(json_paths)))) print(cyan(" {:<40} {}".format("SKU", "Count"))) print(dim(" " + "─"*50)) for sku in sorted(counts): print(" {:<40} {}".format(sku, counts[sku])) print(dim(" " + "─"*50)) print(bold(" {:<40} {}".format("TOTAL", len(devices)))) print() # Pass json paths so next stage (map) can pre-fill --json json_hint = ",".join(args.json) if args.json else None _next_step_prompt("summary", json_path=json_hint) # ── Map mode ────────────────────────────────────────────────────────────────── def _run_map(args): # Validate required args if not args.json: print(red(" --json is required in map mode.")) sys.exit(1) if not args.keys: print(red(" --keys (portal CSV) is required in map mode.")) sys.exit(1) if not args.out: print(red(" --out (output CSV path) is required in map mode.")) sys.exit(1) json_paths = mapper_expand_json_inputs(args.json) missing = [p for p in json_paths if not os.path.isfile(p)] if missing: print(red(" Missing JSON files:\n {}".format("\n ".join(missing)))) sys.exit(1) if not os.path.isfile(args.keys): print(red(" Keys file not found: {}".format(args.keys))) sys.exit(1) allow_statuses = args.allow_status or _DEFAULT_ALLOWED # Load data print(cyan("\n Loading BIG-IQ JSON records ...")) devices = [] for p in json_paths: batch = mapper_load_json(p) print(dim(" {} → {} records".format(os.path.basename(p), len(batch)))) devices.extend(batch) print(cyan("\n Loading portal CSV keys ...")) all_keys = mapper_load_csv(args.keys) eligible = _filter_eligible(all_keys, allow_statuses) used_map = _prepare_used_map(all_keys) print(green(" {} device records loaded from {} JSON file(s).".format( len(devices), len(json_paths)))) print(green(" {} registration keys loaded ({} eligible).".format( len(all_keys), len(eligible)))) print(dim(" Eligible statuses: {}".format(", ".join(allow_statuses)))) output_rows = [] for device in devices: uid = device.get("id","") hostname = device.get("hostname","") ip = device.get("address","") device_id = _device_id(uid, hostname, ip) existing = _find_existing_matches(all_keys, uid, hostname, ip) selection, mode_label, info = _interactive_select( device, existing, eligible, all_keys, used_map ) if selection: reg_key = selection.get("Registration Key","") cb_exist = selection.get("Chargeback","") cb_new = _build_chargeback(cb_exist, uid, hostname, ip) used_map.setdefault(reg_key, set()).add(device_id) ex_reason = _REASON_LABELS.get( (info or {}).get("existing_reason",""), (info or {}).get("existing_reason","")) else: reg_key = cb_exist = cb_new = ex_reason = "" output_rows.append({ "json_source": device.get("json_source",""), "json_index": device.get("json_index",""), "pool_regkey": device.get("pool_regkey",""), "pool_name": device.get("pool_name",""), "uid": uid, "hostname": hostname, "ip": ip, "sku": device.get("sku",""), "type": device.get("type",""), "uom": device.get("uom",""), "granted": device.get("granted",""), "reg_key": reg_key, "product": selection.get("Product","") if selection else "", "capacity": selection.get("Capacity","") if selection else "", "status": selection.get("Status","") if selection else "", "subscription_id": selection.get("Subscription ID","") if selection else "", "chargeback_existing": cb_exist, "chargeback_new": cb_new, "mapping_mode": mode_label, "existing_reason": ex_reason, }) _write_mapping_csv(args.out, output_rows) assigned = sum(1 for r in output_rows if r.get("reg_key")) skipped = len(output_rows) - assigned print(green("\n Wrote {} ({} assigned, {} skipped).".format( args.out, assigned, skipped))) _next_step_prompt("map", csv_path=args.out) # ── Next-step / resume prompt ───────────────────────────────────────────────── _CHAIN_NEXT = [None] # [0] = sys.argv to use for next run, or None _NEXT_STEPS = { "summary": ("map", "Map BIG-IQ records to portal reg keys"), "map": ("harvest", "Harvest dossiers from BIG-IPs (no internet needed)"), "harvest": ("preflight", "Activate dossiers at F5 portal, save license files"), "preflight": ("batch", "Push license files to BIG-IPs (confirm each)"), "batch": (None, "All done — check portal_updates.csv for My-F5 tags"), } # ── Next-step / resume prompt ───────────────────────────────────────────────── _NEXT_STEPS = { "summary": ("map", "Map BIG-IQ records to portal reg keys"), "map": ("harvest", "Harvest dossiers from BIG-IPs (no internet needed)"), "harvest": ("preflight", "Activate dossiers at F5 portal, save license files"), "preflight": ("batch", "Push license files to BIG-IPs (confirm each)"), "batch": (None, "All done — check portal_updates.csv for My-F5 tags"), } # Required args for each mode and how to prompt for them # Each entry: (flag, prompt text, default_value_or_None) _MODE_REQUIRED_ARGS = { "map": [ ("--json", "Path to BIG-IQ JSON export file(s)", None), ("--keys", "Path to My-F5 portal CSV export", None), ("--out", "Output mapping CSV filename", "mapping.csv"), ], "harvest": [ ("--csv", "Path to mapping CSV (from map mode)", None), ("--dossiers-dir", "Folder to save dossier files", "dossiers"), ], "preflight": [ ("--csv", "Path to mapping CSV", None), ("--dossiers-dir", "Folder containing dossier files", "dossiers"), ("--licenses-dir", "Folder to save license files", "licenses"), ], "batch": [ ("--csv", "Path to mapping CSV (or _remaining.csv to resume)", None), ("--licenses-dir", "Folder containing pre-generated license files (Enter to skip)", ""), ], "activate": [], "summary": [ ("--json", "Path to BIG-IQ JSON export file(s)", None), ], } def _next_step_prompt(current_mode, csv_path=None, dossiers_dir=None, licenses_dir=None, json_path=None): """ After a mode completes, offer to proceed to the next stage. Collects any missing required arguments interactively. Only shown when stdin is a tty (skipped in scripts/cron). """ if not sys.stdin.isatty(): return None next_mode, description = _NEXT_STEPS.get(current_mode, (None, "")) print("") print(cyan(" ─── What next? " + "─"*45)) if not next_mode: print(green(" " + description)) print(cyan(" " + "─"*60)) return None print(green(" Suggested next step: --mode {}".format(next_mode))) print(dim (" {}".format(description))) print("") ans = input(" Run {} mode now? [Y/n]: ".format(next_mode)).strip().lower() if ans in ("n", "no"): print(cyan(" " + "─"*60)) return None # ── Seed known values from this session ─────────────────────────── known = {} if csv_path: known["--csv"] = csv_path if dossiers_dir: known["--dossiers-dir"] = dossiers_dir if licenses_dir: known["--licenses-dir"] = licenses_dir if json_path: known["--json"] = json_path # ── Special case: if next mode is map, check for an existing mapping ── if next_mode == "map": # Derive the default output filename the same way we would prompt for it json_hint = known.get("--json", "") if json_hint: jbase = os.path.splitext( os.path.basename(json_hint.split(",")[0]))[0] default_out = "{}_mapping.csv".format(jbase) else: default_out = "mapping.csv" # Also check the plain "mapping.csv" fallback candidates = [default_out, "mapping.csv"] existing_map = next((p for p in candidates if os.path.isfile(p)), None) if existing_map: print(yellow(" Found existing mapping file: {}".format(existing_map))) reuse = input(" Use it and skip map mode? [Y/n]: ").strip().lower() if reuse not in ("n", "no"): known["--csv"] = existing_map next_mode = "harvest" print(dim(" Skipping map — using {}".format(existing_map))) print(dim(" Jumping straight to --mode harvest")) # ── Collect any still-missing required args ──────────────────────── required = _MODE_REQUIRED_ARGS.get(next_mode, []) for flag, prompt_text, default in required: # Already have it if flag in known and known[flag]: print(dim(" {} = {}".format(flag, known[flag]))) continue # Suggest a sensible default for --out based on --json basename if flag == "--out" and "--json" in known: jbase = os.path.splitext( os.path.basename(known["--json"].split(",")[0]))[0] default = "{}_mapping.csv".format(jbase) # Show default display_default = " [{}]".format(default) if default else "" val = input(" {}{}: ".format(prompt_text, display_default)).strip() if not val and default is not None: val = default if val: known[flag] = val # ── Build the command ────────────────────────────────────────────── # sys.argv[0] is already the script path (e.g. "f5_license_tool.py" # or "/home/admin/f5_license_tool.py"). argparse reads sys.argv[1:] # so we set sys.argv = [script_path, "--mode", next_mode, ...flags...] # The display string shown to the user includes "python" for readability # but the actual sys.argv must NOT include "python" as element [0]. script_path = sys.argv[0] new_argv = [script_path, "--mode", next_mode] for flag, _, _ in _MODE_REQUIRED_ARGS.get(next_mode, []): val = known.get(flag, "") if val: new_argv += [flag, val] # Display line for the user (readable, includes python) display_cmd = "python " + " ".join(new_argv) print("") print(dim(" Running: " + display_cmd)) print(cyan(" " + "─"*60)) _CHAIN_NEXT[0] = new_argv return "run_next" # ── Shared filename helper ──────────────────────────────────────────────────── def _device_filename(hostname, ip, ext): """ Build a filesystem-safe filename from hostname + IP. Format: <hostname>_<ip>.<ext> Falls back to whichever field is available. Characters unsafe for filenames are replaced with '-'. """ hn = re.sub(r'[\\/:*?"<>|]', '-', (hostname or "").strip()) ip = re.sub(r'[\\/:*?"<>|]', '-', (ip or "").strip()) if hn and ip: stem = "{}_{}".format(hn, ip) elif hn: stem = hn elif ip: stem = ip else: stem = "unknown" return "{}.{}".format(stem, ext) # ── Harvest mode ────────────────────────────────────────────────────────────── # Connects to every BIG-IP in the mapping CSV, generates a dossier for its # reg key, and saves <hostname>_<ip>.dossier to a folder. # No internet access required — purely device-to-script. # # The dossier folder can then be: # a) Passed to F5 / taken to an internet-connected machine # b) Fed into preflight mode which calls activate.f5.com and saves .license files # c) The license folder then fed back into batch mode with --licenses-dir # # harvest also writes harvest_manifest.csv so preflight/batch know which # dossier file maps to which reg key and device. def _run_harvest(args): if not args.csv: print(red(" --csv is required in harvest mode.")) sys.exit(1) if not os.path.isfile(args.csv): print(red(" CSV not found: {}".format(args.csv))) sys.exit(1) dossiers_dir = args.dossiers_dir or "dossiers" if not os.path.isdir(dossiers_dir): os.makedirs(dossiers_dir) print(dim(" Created dossier folder: {}".format(dossiers_dir))) retries = args.retries if args.retries is not None else 3 dry_run = args.dry_run print(cyan("\n Loading CSV: {}".format(args.csv))) all_rows = _batch_load_csv(args.csv) actionable = [] skipped_no_key = 0 skipped_no_host = 0 for row in all_rows: rk = row.get("reg_key", "").strip() host = row.get("ip", "") or row.get("hostname", "") if not rk: skipped_no_key += 1; continue if not host: skipped_no_host += 1; continue actionable.append(row) total = len(actionable) print(green(" {} actionable rows ({} no key, {} no host).".format( total, skipped_no_key, skipped_no_host))) if not total: print(yellow(" Nothing to do.")); return if dry_run: print(yellow("\n DRY RUN — no BIG-IP connections will be made.")) for row in actionable: hn = row.get("hostname", "") ip = row.get("ip", "") rk = row.get("reg_key", "") fname = _device_filename(hn, ip, "dossier") print(dim(" {} → {}".format(rk, fname))) return shared_user, shared_pass = _batch_prompt_creds(args) counts = {"success": 0, "failed": 0, "skipped": 0} manifest = [] # rows for harvest_manifest.csv for idx, row in enumerate(actionable, 1): host = row.get("ip", "") or row.get("hostname", "") ip = row.get("ip", "") hostname = row.get("hostname", "") reg_key = row.get("reg_key", "") fname = _device_filename(hostname, ip, "dossier") fpath = os.path.join(dossiers_dir, fname) print(cyan("\n [{}/{}] {} reg_key={}".format(idx, total, host, reg_key))) # Already harvested — skip unless --force if os.path.isfile(fpath) and not args.force: size = os.path.getsize(fpath) print(dim(" ↷ Dossier already exists ({} bytes) — skipping".format(size))) manifest.append(_harvest_manifest_row(row, fname, fpath, "already_exists", "")) counts["skipped"] += 1 continue # Connect session = None for attempt in range(1, retries + 1): if attempt > 1: wait = 2 ** attempt print(yellow(" Retry {}/{} in {}s ...".format(attempt, retries, wait))) time.sleep(wait) session = _batch_connect(host, shared_user, shared_pass, idx, total) if session: break if not session: msg = "Auth/connection failed after {} attempts".format(retries) print(red(" ✗ {}".format(msg))) manifest.append(_harvest_manifest_row(row, fname, "", "failed", msg)) counts["failed"] += 1 continue # Collect live identity to enrich the manifest try: device_info = collect_device_info(session) hostname = hostname or device_info.get("hostname", "") ip = ip or device_info.get("mgmt_ip", "") # Recalculate filename now we may have richer identity fname = _device_filename(hostname, ip, "dossier") fpath = os.path.join(dossiers_dir, fname) except SystemExit: pass # non-fatal — carry on with what we have # Generate dossier try: dossier = generate_dossier(session, reg_key) except SystemExit: msg = "get_dossier failed" print(red(" ✗ {}".format(msg))) manifest.append(_harvest_manifest_row(row, fname, "", "failed", msg)) counts["failed"] += 1 continue with open(fpath, "w") as fh: fh.write(dossier) print(green(" ✓ Saved → {}".format(fpath))) manifest.append(_harvest_manifest_row(row, fname, fpath, "success", "")) counts["success"] += 1 # ── Write manifest ───────────────────────────────────────────────── manifest_path = os.path.join(dossiers_dir, "harvest_manifest.csv") _write_harvest_manifest(manifest_path, manifest) print(cyan("\n" + "="*64)) print(bold(" Harvest complete — {} device(s)".format(total))) print(" {:20s}: {}".format("Dossiers saved", counts["success"])) print(" {:20s}: {}".format("Already existed", counts["skipped"])) print(" {:20s}: {}".format("Failed", counts["failed"])) print(" {:20s}: {}".format("Dossier folder", dossiers_dir)) print(" {:20s}: {}".format("Manifest", manifest_path)) print("="*64) print(dim("\n Next step (internet-connected machine):")) print(dim(" python f5_license_tool.py --mode preflight \\")) print(dim(" --csv {} --dossiers-dir {} --licenses-dir ./licenses".format( args.csv, dossiers_dir))) _next_step_prompt("harvest", csv_path=args.csv, dossiers_dir=dossiers_dir) def _harvest_manifest_row(csv_row, fname, fpath, status, message): return { "reg_key": csv_row.get("reg_key", ""), "hostname": csv_row.get("hostname", ""), "ip": csv_row.get("ip", ""), "sku": csv_row.get("sku", ""), "dossier_file": fname, "dossier_path": fpath, "status": status, "message": message, } def _write_harvest_manifest(path, rows): fields = ["reg_key", "hostname", "ip", "sku", "dossier_file", "dossier_path", "status", "message"] with _csv_open_write(path) as fh: w = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore") w.writeheader() for row in rows: w.writerow({f: row.get(f, "") for f in fields}) # ── Preflight mode ──────────────────────────────────────────────────────────── # Reads dossier files from --dossiers-dir (produced by harvest) OR dossier # column in the CSV, calls activate.f5.com SOAP, saves # <hostname>_<ip>.license to --licenses-dir. # No BIG-IP connection needed. # # Also writes portal_updates.csv for manual customer-tag updates in My-F5. def _run_preflight(args): if not args.csv: print(red(" --csv is required in preflight mode.")) sys.exit(1) if not os.path.isfile(args.csv): print(red(" CSV not found: {}".format(args.csv))) sys.exit(1) licenses_dir = args.licenses_dir or "licenses" dossiers_dir = args.dossiers_dir or "" if not os.path.isdir(licenses_dir): os.makedirs(licenses_dir) print(dim(" Created license folder: {}".format(licenses_dir))) print(cyan("\n Loading CSV: {}".format(args.csv))) all_rows = _batch_load_csv(args.csv) # Deduplicate by reg_key seen_keys = set() actionable = [] skipped_no_key = 0 for row in all_rows: rk = row.get("reg_key", "").strip() if not rk: skipped_no_key += 1; continue if rk in seen_keys: continue seen_keys.add(rk) actionable.append(row) total = len(actionable) print(green(" {} unique reg keys ({} rows skipped — no key).".format( total, skipped_no_key))) if not total: print(yellow(" Nothing to do.")); return if args.dry_run: print(yellow("\n DRY RUN — no SOAP calls will be made.")) for row in actionable: hn = row.get("hostname",""); ip = row.get("ip","") rk = row.get("reg_key","") print(dim(" {} → {}".format(rk, _device_filename(hn, ip, "license")))) return counts = {"success": 0, "already_exists": 0, "failed": 0} result_map = {} # reg_key → result dict portal_rows = [] # for portal_updates.csv for idx, row in enumerate(actionable, 1): reg_key = row.get("reg_key", "") hostname = row.get("hostname", "") ip = row.get("ip", "") sku = row.get("sku", "") lic_fname = _device_filename(hostname, ip, "license") lic_path = os.path.join(licenses_dir, lic_fname) host_hint = hostname or ip or "?" print(cyan("\n [{}/{}] {} key={} → {}".format( idx, total, host_hint, reg_key, lic_fname))) # Already exists — skip unless --force if os.path.isfile(lic_path) and not args.force: size = os.path.getsize(lic_path) msg = "License file already exists ({} bytes)".format(size) print(dim(" ↷ {}".format(msg))) result_map[reg_key] = {"status": "already_exists", "message": msg, "license_file": lic_path} counts["already_exists"] += 1 portal_rows.append(_portal_row(reg_key, hostname, ip, sku, lic_path)) continue # Find dossier — priority: dossiers_dir file → inline CSV column dossier = "" if dossiers_dir: dos_fname = _device_filename(hostname, ip, "dossier") dos_path = os.path.join(dossiers_dir, dos_fname) if os.path.isfile(dos_path): with open(dos_path) as fh: dossier = fh.read().strip() print(dim(" Reading dossier: {}".format(dos_path))) if not dossier: dossier = row.get("dossier", "").strip() if not dossier: msg = ("No dossier found. Run --mode harvest first, or ensure " "the CSV dossier column is populated.") print(yellow(" ⚠ SKIP: {}".format(msg))) result_map[reg_key] = {"status": "skipped_no_dossier", "message": msg, "license_file": ""} counts["failed"] += 1 continue # SOAP activation print(dim(" Calling activate.f5.com ...")) try: license_text = activate_online(dossier, reg_key) except SystemExit: msg = "SOAP activation failed" print(red(" ✗ {}".format(msg))) result_map[reg_key] = {"status": "failed", "message": msg, "license_file": ""} counts["failed"] += 1 continue with open(lic_path, "w") as fh: fh.write(license_text) msg = "Saved → {}".format(lic_path) print(green(" ✓ {}".format(msg))) result_map[reg_key] = {"status": "success", "message": msg, "license_file": lic_path} counts["success"] += 1 portal_rows.append(_portal_row(reg_key, hostname, ip, sku, lic_path)) # ── Summary ──────────────────────────────────────────────────────── print(cyan("\n" + "="*64)) print(bold(" Preflight complete — {} key(s)".format(total))) print(" {:25s}: {}".format("License files created", counts["success"])) print(" {:25s}: {}".format("Already existed (skipped)", counts["already_exists"])) print(" {:25s}: {}".format("Failed / no dossier", counts["failed"])) print(" {:25s}: {}".format("Output folder", licenses_dir)) print("="*64) # ── Results CSV ──────────────────────────────────────────────────── results_csv = os.path.join(licenses_dir, "preflight_results.csv") with _csv_open_write(results_csv) as fh: fields = ["reg_key", "ip", "hostname", "sku", "status", "message", "license_file"] writer = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore") writer.writeheader() for row in all_rows: rk = row.get("reg_key", "") res = result_map.get(rk, {"status": "not_processed", "message": "", "license_file": ""}) writer.writerow({ "reg_key": rk, "ip": row.get("ip", ""), "hostname": row.get("hostname", ""), "sku": row.get("sku", ""), "status": res["status"], "message": res["message"], "license_file": res["license_file"], }) print(green(" Results: {}".format(results_csv))) # ── Portal updates CSV ───────────────────────────────────────────── if portal_rows: portal_csv = os.path.join(licenses_dir, "portal_updates.csv") _write_portal_updates(portal_csv, portal_rows) print(green(" Portal updates: {}".format(portal_csv))) print(dim(" Open portal_updates.csv to copy-paste customer tags into My-F5.")) print(dim("\n To push licenses to BIG-IPs when ready:")) print(dim(" python f5_license_tool.py --mode batch \\")) print(dim(" --csv {} --licenses-dir {}".format(args.csv, licenses_dir))) _next_step_prompt("preflight", csv_path=args.csv, dossiers_dir=getattr(args,"dossiers_dir",None), licenses_dir=licenses_dir) def _portal_row(reg_key, hostname, ip, sku, license_file): """Build one row for portal_updates.csv.""" customer_tag = "UID=;HN={};IP={}".format(hostname, ip) return { "reg_key": reg_key, "hostname": hostname, "ip": ip, "sku": sku, "license_file": license_file, "customer_tag": customer_tag, "notes": "Paste customer_tag into My-F5 portal Chargeback field for this reg key", } def _write_portal_updates(path, rows): fields = ["reg_key", "hostname", "ip", "sku", "license_file", "customer_tag", "notes"] with _csv_open_write(path) as fh: writer = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore") writer.writeheader() for row in rows: writer.writerow({f: row.get(f, "") for f in fields}) # ── Batch mode ──────────────────────────────────────────────────────────────── # Reads the CSV produced by --mode map (or any CSV with ip/hostname + reg_key # columns) and activates each BIG-IP in sequence. # # CSV columns used: # ip management IP of the BIG-IP (preferred) # hostname used as fallback if ip is blank # reg_key registration key to activate # (all other columns are preserved in the results CSV) # # Results CSV adds: # batch_status success | skipped | failed | already_licensed # batch_message human-readable detail # batch_active_key reg key confirmed active after reloadlic _BATCH_RESULTS_FIELDS = [ "ip", "hostname", "reg_key", "batch_status", "batch_message", "batch_active_key", ] def _batch_load_csv(path): """Read the mapping CSV. Returns list of row dicts.""" rows = [] with _csv_open_read(path) as fh: reader = csv.DictReader(fh) for row in reader: # Normalise key names to lowercase stripped norm = {k.strip().lower(): (v or "").strip() for k, v in row.items() if k is not None} rows.append(norm) return rows def _batch_prompt_creds(args): """ Ask for shared credentials once. Returns (user, password). Empty string means 'ask per device'. """ print(cyan("\n Batch credentials")) print(" Press Enter to leave blank and be prompted per device instead.\n") user = (os.environ.get("F5_USER") or (args.user if args.user else None) or input(" Shared username [admin]: ").strip() or "admin") password = (os.environ.get("F5_PASS") or (args.password if args.password else None) or getpass.getpass(" Shared password (Enter to prompt per device): ")) return user, password def _batch_connect(host, shared_user, shared_pass, device_num, total): """ Try to make a REST session with shared creds. If that fails (401/connection) and shared creds were provided, fall back to prompting the operator for this specific device. Returns session or None on unrecoverable failure. """ def _try(user, pw): try: return make_session(host, user, pw) except SystemExit: return None session = _try(shared_user, shared_pass) if session: return session # Shared creds failed — prompt for this device print(yellow(" Shared credentials failed for {} — enter device-specific credentials.".format(host))) user = input(" Username [admin]: ").strip() or "admin" pw = getpass.getpass(" Password: ") return _try(user, pw) def _batch_write_results(path, all_rows, result_map): """ Write a results CSV merging original row data with batch outcomes. Preserves all original columns and appends batch_ columns. """ if not all_rows: return # Collect all column names from original rows orig_fields = [] seen_f = set() for row in all_rows: for k in row.keys(): if k not in seen_f: orig_fields.append(k) seen_f.add(k) # Ensure batch columns are at the end, not duplicated extra = [f for f in _BATCH_RESULTS_FIELDS if f not in seen_f] fieldnames = orig_fields + extra with _csv_open_write(path) as fh: writer = csv.DictWriter(fh, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() for row in all_rows: key = row.get("ip") or row.get("hostname") or "" outcome = result_map.get(key, {}) merged = dict(row) merged["batch_status"] = outcome.get("status", "not_processed") merged["batch_message"] = outcome.get("message", "") merged["batch_active_key"] = outcome.get("active_key", "") writer.writerow({f: merged.get(f, "") for f in fieldnames}) def _run_batch(args): if not args.csv: print(red(" --csv is required in batch mode.")) sys.exit(1) if not os.path.isfile(args.csv): print(red(" CSV not found: {}".format(args.csv))) sys.exit(1) retries = args.retries if args.retries is not None else 3 out_path = args.out or args.csv.replace(".csv", "_batch_results.csv") remaining_path = args.csv.replace(".csv", "_remaining.csv") dry_run = args.dry_run licenses_dir = args.licenses_dir or "" print(cyan("\n Loading mapping CSV: {}".format(args.csv))) all_rows = _batch_load_csv(args.csv) actionable = [] skipped_no_key = 0 skipped_no_host = 0 for row in all_rows: reg_key = row.get("reg_key", "") host = row.get("ip", "") or row.get("hostname", "") if not reg_key: skipped_no_key += 1; continue if not host: skipped_no_host += 1; continue actionable.append(row) total = len(actionable) print(green(" {} rows total — {} actionable, {} no key, {} no host.".format( len(all_rows), total, skipped_no_key, skipped_no_host))) if licenses_dir: print(dim(" License folder: {} (pre-generated files used where available)".format( licenses_dir))) if not actionable: print(yellow(" Nothing to do.")); return if dry_run: print(yellow("\n DRY RUN — no connections will be made.")) for row in actionable: h = row.get("ip","") or row.get("hostname","") rk = row.get("reg_key","") hn = row.get("hostname",""); ip = row.get("ip","") lic_tag = "" if licenses_dir: lp = os.path.join(licenses_dir, _device_filename(hn, ip, "license")) lic_tag = " [pre-licensed]" if os.path.isfile(lp) else " [SOAP needed]" print(dim(" {} -> {}{}".format(h, rk, lic_tag))) return shared_user, shared_pass = _batch_prompt_creds(args) result_map = {} counts = {"success": 0, "failed": 0, "already_licensed": 0, "skipped": 0} completed = [] portal_rows = [] for idx, row in enumerate(actionable, 1): host = row.get("ip", "") or row.get("hostname", "") ip = row.get("ip", "") hostname = row.get("hostname", "") reg_key = row.get("reg_key", "") sku = row.get("sku", "") print(cyan("\n[{}/{}] {} -> {}".format(idx, total, host, reg_key))) # Per-device confirmation ans = input(" License this device? [Y/n]: ").strip().lower() if ans in ("n", "no"): print(yellow(" Skipped by operator.")) result_map[host] = {"status": "skipped_by_operator", "message": "Skipped by operator", "active_key": ""} counts["skipped"] += 1 continue # Connect with retries session = None for attempt in range(1, retries + 1): if attempt > 1: wait = 2 ** attempt print(yellow(" Retry {}/{} in {}s ...".format(attempt, retries, wait))) time.sleep(wait) session = _batch_connect(host, shared_user, shared_pass, idx, total) if session: break if not session: msg = "Auth/connection failed after {} attempts".format(retries) print(red(" X {}".format(msg))) result_map[host] = {"status": "failed", "message": msg, "active_key": ""} counts["failed"] += 1 continue # Collect device info try: device_info = collect_device_info(session) hostname = hostname or device_info.get("hostname", "") ip = ip or device_info.get("mgmt_ip", "") current_key = device_info.get("current_reg_key", "") if current_key and current_key == reg_key: msg = "Already licensed with this key" print(green(" OK {}".format(msg))) result_map[host] = {"status": "already_licensed", "message": msg, "active_key": current_key} counts["already_licensed"] += 1 completed.append(row) portal_rows.append(_portal_row(reg_key, hostname, ip, sku, "")) _write_remaining_csv(remaining_path, [r for r in actionable if r not in completed], all_rows) continue if current_key and current_key not in ("none", ""): print(yellow(" Current key: {} (will be replaced)".format(current_key))) except SystemExit: msg = "Failed to collect device info" print(red(" X {}".format(msg))) result_map[host] = {"status": "failed", "message": msg, "active_key": ""} counts["failed"] += 1 continue # Get license text — pre-generated file first, then SOAP license_text = "" source = "" if licenses_dir: lic_fname = _device_filename(hostname, ip, "license") lic_file = os.path.join(licenses_dir, lic_fname) if os.path.isfile(lic_file): with open(lic_file) as fh: license_text = fh.read().strip() source = "pre-generated ({})".format(lic_fname) print(dim(" Using: {}".format(lic_fname))) if not license_text: try: dossier = generate_dossier(session, reg_key) except SystemExit: msg = "Dossier generation failed" print(red(" X {}".format(msg))) result_map[host] = {"status": "failed", "message": msg, "active_key": ""} counts["failed"] += 1 continue try: license_text = activate_online(dossier, reg_key) source = "SOAP" except SystemExit: msg = "SOAP activation failed" print(red(" X {}".format(msg))) result_map[host] = {"status": "failed", "message": msg, "active_key": ""} counts["failed"] += 1 continue # Push license try: _, active_key = install_license(session, license_text, reg_key, device_info) msg = "Licensed via {}".format(source) print(green(" OK {} (active: {})".format(msg, active_key))) result_map[host] = {"status": "success", "message": msg, "active_key": active_key} counts["success"] += 1 completed.append(row) portal_rows.append(_portal_row(reg_key, hostname, ip, sku, "")) # Update remaining CSV after every success remaining = [r for r in actionable if r not in completed] _write_remaining_csv(remaining_path, remaining, all_rows) print(dim(" Remaining: {} device(s) -> {}".format( len(remaining), remaining_path))) except SystemExit: msg = "License push failed" print(red(" X {}".format(msg))) result_map[host] = {"status": "failed", "message": msg, "active_key": ""} counts["failed"] += 1 # Summary remaining_count = total - counts["success"] - counts["already_licensed"] print(cyan("\n" + "="*64)) print(bold(" Batch complete — {} device(s) processed".format(total))) print(" {:25s}: {}".format("Successfully licensed", counts["success"])) print(" {:25s}: {}".format("Already correct", counts["already_licensed"])) print(" {:25s}: {}".format("Skipped by operator", counts["skipped"])) print(" {:25s}: {}".format("Failed", counts["failed"])) print(" {:25s}: {}".format("Still remaining", remaining_count)) print("="*64) _batch_write_results(out_path, all_rows, result_map) print(green(" Results: {}".format(out_path))) if remaining_count > 0: print(green(" Remaining: {} (re-run with this to continue)".format( remaining_path))) if portal_rows: portal_csv = out_path.replace(".csv", "_portal_updates.csv") _write_portal_updates(portal_csv, portal_rows) print(green(" Portal: {}".format(portal_csv))) print(dim(" Paste customer_tag column into My-F5 Chargeback field per reg key.")) _next_step_prompt("batch", csv_path=getattr(args,"out",None) or args.csv) def _write_remaining_csv(path, remaining_rows, all_rows): if not all_rows: return fields = [] seen = set() for row in all_rows: for k in row.keys(): if k not in seen: fields.append(k); seen.add(k) with _csv_open_write(path) as fh: writer = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore") writer.writeheader() for row in remaining_rows: writer.writerow({f: row.get(f, "") for f in fields}) # ── Main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="F5 BIG-IP License Tool — activate | harvest | preflight | batch | map | summary", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent("""\ Modes: activate (default) Connect to one BIG-IP, generate dossier, activate, push. harvest Connect to every BIG-IP in CSV, save <hostname_ip>.dossier files. No internet needed. Hand folder to F5 or take to connected machine. preflight Read dossier files, call activate.f5.com, save <hostname_ip>.license. No BIG-IP connection needed. Customer chooses when to apply. batch Connect to every BIG-IP, push licenses one at a time (confirm each). Uses --licenses-dir files if available, else generates dossier + SOAP. Writes remaining CSV after each success so you can stop and resume. map Interactively map BIG-IQ JSON device records to portal CSV keys. summary Print SKU counts from BIG-IQ JSON exports. Full air-gap workflow (no internet on BIG-IP network): 1. map -> mapping.csv 2. harvest --csv mapping.csv --dossiers-dir ./dossiers [take dossiers folder to internet-connected machine] 3. preflight --csv mapping.csv --dossiers-dir ./dossiers --licenses-dir ./licenses [take licenses folder back to customer] 4. batch --csv mapping.csv --licenses-dir ./licenses [confirm each device, remaining CSV shrinks to zero] -> portal_updates.csv for manual My-F5 customer tag entry Direct workflow (BIG-IP network has internet access): 1. map -> 2. batch (dossier + SOAP + push all in one pass) Env vars: F5_HOST F5_USER F5_PASS Examples: python f5_license_tool.py python f5_license_tool.py --host 10.0.1.1 --reg-key XXXXX-... python f5_license_tool.py --mode harvest --csv mapping.csv --dossiers-dir ./dossiers python f5_license_tool.py --mode preflight --csv mapping.csv \\ --dossiers-dir ./dossiers --licenses-dir ./licenses python f5_license_tool.py --mode batch --csv mapping.csv --licenses-dir ./licenses python f5_license_tool.py --mode batch --csv mapping_remaining.csv --licenses-dir ./licenses python f5_license_tool.py --mode map --json export.json --keys portal.csv --out map.csv python f5_license_tool.py --mode summary --json export.json """), ) # ── Common ─────────────────────────────────────────────────────────── parser.add_argument("--mode", choices=["activate","harvest","preflight","batch","map","summary"], default="activate", help="Tool mode (default: activate)") # ── Credentials (activate + harvest + batch) ───────────────────────── parser.add_argument("--host", help="BIG-IP management IP or hostname") parser.add_argument("--user", help="BIG-IP username (shared across batch/harvest)") parser.add_argument("--password", help="BIG-IP password (prefer F5_PASS env var)") parser.add_argument("--config", help="JSON config file {host, user, password}") # ── activate-only ───────────────────────────────────────────────────── parser.add_argument("--reg-key", help="Base registration key (activate, skip prompt)") parser.add_argument("--offline", action="store_true", help="Offline dossier mode: manually paste license back") parser.add_argument("--no-install", action="store_true", help="Save license file locally, do NOT push to BIG-IP") # ── harvest / preflight / batch shared ─────────────────────────────── parser.add_argument("--csv", metavar="PATH", help="Mapping CSV (from map mode) — input for harvest/preflight/batch") parser.add_argument("--dossiers-dir", metavar="PATH", dest="dossiers_dir", help="Folder for .dossier files (harvest output / preflight input)") parser.add_argument("--licenses-dir", metavar="PATH", dest="licenses_dir", help="Folder for .license files (preflight output / batch input)") parser.add_argument("--force", action="store_true", help="Re-harvest/re-activate even if output file already exists") parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making any connections") parser.add_argument("--retries", type=int, default=3, metavar="N", help="Per-device retry attempts (harvest/batch, default: 3)") # ── map / summary ───────────────────────────────────────────────────── parser.add_argument("--json", action="append", metavar="PATH", help="BIG-IQ JSON export (map/summary, repeat or comma-separate)") parser.add_argument("--keys", metavar="PATH", help="My-F5 portal CSV export (map mode)") parser.add_argument("--out", metavar="PATH", help="Output CSV (map/batch results override)") parser.add_argument("--allow-status", action="append", dest="allow_status", metavar="STATUS", help="Eligible key statuses (map, repeatable). " "Default: Ready to activate, Ready to reassign") # No arguments → show friendly mode menu instead of cryptic usage line if len(sys.argv) == 1: print(bold(BANNER)) print(bold(" Quick-start — choose a mode:")) print("") print(" 1) activate License a single BIG-IP (prompts for host + reg key)") print(" 2) harvest Collect dossiers from all BIG-IPs in a CSV (no internet needed)") print(" 3) preflight Activate dossiers at F5 portal, save .license files (no BIG-IP)") print(" 4) batch Push license files to BIG-IPs one at a time (confirm each)") print(" 5) map Map BIG-IQ JSON device records to portal reg keys (interactive)") print(" 6) summary Count devices per SKU from BIG-IQ JSON exports") print(" 7) help Show full usage and all flags") print("") choice = input(" Enter mode number or name [1]: ").strip() mode_map = { "1":"activate", "2":"harvest", "3":"preflight", "4":"batch", "5":"map", "6":"summary", "7":"help", "activate":"activate", "harvest":"harvest", "preflight":"preflight", "batch":"batch", "map":"map", "summary":"summary", "help":"help", "":"activate", } chosen = mode_map.get(choice.lower()) if chosen is None: print(red(" Unknown choice — showing full help.")) parser.print_help() sys.exit(0) if chosen == "help": parser.print_help() sys.exit(0) sys.argv = [sys.argv[0], "--mode", chosen] args = parser.parse_args() print(bold(BANNER)) # ── Branch — wrapped in a loop so _next_step_prompt can chain modes ─── while True: if args.mode == "summary": _run_summary(args); break if args.mode == "map": _run_map(args); break if args.mode == "harvest": _run_harvest(args); break if args.mode == "preflight": _run_preflight(args); break if args.mode == "batch": _run_batch(args); break # ── activate (default) ──────────────────────────────────────────── cfg = load_config(args.config) if cfg: args.host = args.host or cfg.get("host") args.user = args.user or cfg.get("user") args.password = args.password or cfg.get("password") host, user, password = prompt_connection(args) session = make_session(host, user, password) device_info = collect_device_info(session) reg_key = args.reg_key or ask_reg_key() dossier = generate_dossier(session, reg_key) local_dossier = "{}.dossier".format(reg_key) with open(local_dossier, "w") as f: f.write(dossier) print(dim(" (Dossier saved locally: {})".format(local_dossier))) if args.offline: license_text = activate_offline(dossier, reg_key) else: license_text = activate_online(dossier, reg_key) lic_path = "{}.license".format(reg_key) active_key = "not-installed" if args.no_install: with open(lic_path, "w") as f: f.write(license_text) print(yellow("\n --no-install: license saved to {} but NOT pushed.".format(lic_path))) print(yellow(" scp {} root@{}:/config/bigip.license && ssh root@{} reloadlic".format( lic_path, host, host))) else: lic_path, active_key = install_license(session, license_text, reg_key, device_info) device_info["confirmed_reg_key"] = active_key save_path, _ = save_asset_record(device_info, reg_key, lic_path) print_postman_card(host, reg_key) print(green("="*64)) print(green(" Done.")) print(green(" Asset JSON : {}".format(save_path))) print(green(" License : {}".format(lic_path))) print(green(" Dossier : {}".format(local_dossier))) print(green("="*64 + "\n")) break # activate always exits after one run # ── Chain to next mode if _next_step_prompt requested it ───────────── if _CHAIN_NEXT[0] is not None: next_argv = _CHAIN_NEXT[0] _CHAIN_NEXT[0] = None # consume so we don't loop forever sys.argv = next_argv print(bold("\n ── Continuing: {} ──\n".format( " ".join(next_argv[2:4])))) main() # re-enter once; depth never > 6 (one per mode) if __name__ == "__main__": try: main() except KeyboardInterrupt: sys.stderr.write("\nAborted.\n") sys.exit(1)55Views0likes0CommentsIvanti MDM Core & F5 LTM/ASM with mTLS
Folks, One of our customers uses Ivanti MDM to manage mobile phones, both IOS & Android. Recently, due to a requirement, we have decided to place an F5 BIG-IP in front of the MDM Core server, which is located in the DMZ. Ivanti has a few sets of URIs. One set does not require enabling mTLS. On the other hand, the second set requires mTLS on the client side of the BIG-IP full proxy. Has anybody seen or done this before? Has anybody implemented an MDM behind LTM/ASM (not It functions more like a MITM than just a TCP load balancer) What is the recommended approach? Any advice or recommendations are greatly appreciated. Appliance: BIG-IP Tenant on r4600 TMOS: 16.x80Views0likes1CommentA Method for Auth and SSO
Recently, we discovered Cyberark has moved from the traditional HTML based auth page to the new JavaScript based. So, our client initiated sso method isn't working anymore. Webssso process could not identify the html form objects because there is no html form anymore. The new design relies on a bunch of JavaScripts which coordinates client browser to send requested data to be able to login. I never interested in JS and could not point out where the user credential comes into play either. I've found out another method to make SSO function work again. It is very basic and relies on the sideband method but i prefer to use http auth agent rather than sideband iRule. Since the "Http Auth" profile can store the http status code along with the cookies of the HTTP request we made, we can use it for basic jobs as "Sideband Http Requestor" Long story short, basically we sent crafted login request to auth page and it returns a couple of cookies[1] if credentials are valid. Then we sent those cookies to the client as a reponse. That is all. An iRule with two distict function is good enoug for this particular job. One function is to prepare json payload which we sent to the web service and the other one is parse the cookies from the response of the web service. You need a custom "HTTP Auth" profile. You can take a look at the below[2] as an example. HTTP Auth profile can be used only with http services not https. In order to use Http Auth profile for sending & receiving http messages to an https web service, you need to use a http2https virtual server which translates requests and responses. In my example[2] i sent http requests through a fake virtual server which is listening on "54.54.54.54:80" socket. The cyberark servers are attached in the pool behind this virtual server. I used this method for Grafana first around a year ago and it is still working. The grafana has similar login page which relies on JS. Here is my iRule: when CLIENT_ACCEPTED { ACCESS::restrict_irule_events disable } when ACCESS_POLICY_COMPLETED { if {[ACCESS::session data get {session.policy.result}] == "allow" } { log local0. "APM Session Started Successfuly in [ACCESS::session data get {session.user.sessionid}] for [ACCESS::session data get {session.logon.last.username}]" log local0. "APM DEBUG: Policy Complete Cookies: $respCookie_0 $respCookie_1 $respCookie_2" ACCESS::respond 302 Location "https://testpam.example.com/PasswordVault/v10/Accounts" "Connection" "close" "Set-Cookie" ${respCookie_0} "Set-Cookie" ${respCookie_1} "Set-Cookie" ${respCookie_2} } } when HTTP_REQUEST { if {[HTTP::has_responded]} { return } if {[string tolower [HTTP::path]] == "/logoff"} { set sid [ACCESS::session data get {session.user.sessionid}] log local0. "Logging out from [ACCESS::session data get {session.user.sessionid}] for [ACCESS::session data get {session.logon.last.username}]" HTTP::respond 302 noserver Location "https://testpam.example.com/PasswordVault/v10" "Connection" "close" "Set-Cookie" "CA11111=; expires=Thu, 01-Jan-1970 00:00:00 GMT; path=/PasswordVault/; secure; HttpOnly; SameSite=Strict" "Set-Cookie" "CA22222=; expires=expires=Thu, 01-Jan-1970 00:00:00 GMT; path=/PasswordVault/; secure; HttpOnly; SameSite=Strict" "Set-Cookie" "CA66666=; expires=Thu, 01-Jan-1970 00:00:00 GMT; path=/PasswordVault/; secure; HttpOnly; SameSite=Stric" ACCESS::session remove -sid $sid } } when ACCESS_POLICY_AGENT_EVENT { if {[ACCESS::policy agent_id] == "LoginSessionCreate" } { # Generate JSON payload to sent the Cyberark v10 set uname [ACCESS::session data get {session.logon.last.username}] set passwd [ACCESS::session data get -secure {session.sso.token.last.password}] log local0. "APM DEBUG: User: $uname : $passwd" set payload {{"username":"UUUU","password":"PPPP"}} set cred "UUUU $uname PPPP $passwd" set payload [string map "$cred" $payload] log local0. "APM DEBUG: Payload $payload" ACCESS::session data set session.custom.http.payload $payload } if {[ACCESS::policy agent_id] == "CookiePreperation" } { #### HTTP Auth #### if {([ACCESS::session data get {session.http.last.response_cookie}] != "") && ([ACCESS::session data get {session.http.last.response_status}] == 200) } { # HTTP Auth Succeed set cookies [ACCESS::session data get {session.http.last.response_cookie}] log local0. "APM DEBUG: Raw Cookies: $cookies" set cookies [string trimright [string map { \\r\\n @ } $cookies] "@"] set cookies [split $cookies '@'] log local0. "APM DEBUG: Cookies Now: $cookies" set listCount 0 foreach cookie $cookies { if {![string match CA* $cookie]} { continue } log local0. "APM DEBUG: listCount: $listCount Cookie: $cookie" set respCookie_${listCount} $cookie incr listCount } log local0. "APM DEBUG: Total listCount: $listCount RespCookie: $respCookie_0 $respCookie_1 $respCookie_2" } } } I also have attached a screenshot of the APM policy. In that APM policy the "GrafanaLogin" is the HTTP Auth agent. Logging lines in the iRule can be suppressed as per your needs. Hope this is helpful for someone. [1]: Cookie names are: "CA11111", "CA22222", "CA66666" [2]: apm aaa http /Common/CyberArk_Login { auth-type custom-post connection-timeout 3 content-type none custom-body "%{session.custom.http.payload}" form-action http://54.54.54.54/PasswordVault/api/login/ headers { header0 { name Content-Type value application/json } } request-timeout 5 success-match-type cookie success-match-value CA11111 } May the source be with you...44Views1like0CommentsWhat's new in BIG-IP v21.1?
Introduction F5 has officially released BIG-IP v21.1, delivering cutting-edge innovations designed to meet the dynamic needs of businesses and organizations. This version introduces advanced features such as quantum-resistant cryptography, AI-driven enhancements, protocol protection, and significant strides in software modernization. Packed with fixes and powerful new capabilities, BIG-IP v21.1 strengthens the F5 Application Delivery and Security Platform (ADSP) by improving delivery, security, and deployment, ensuring your applications remain fast, secure, and simple to manage. PQC Readiness Support for Additional NIST-Compliant PQC Key Exchanges Building upon the groundwork laid in v17.5.0, BIG-IP v21.1 introduces expanded Post Quantum Cryptography (PQC) support. This release adds SecP + ML-KEM key exchanges, combining traditional cryptographic methods with quantum-resistant algorithms for hybrid cryptography. New Supported Key Exchanges: SecP256r1ML-KEM-768 SecP384r1ML-KEM-1024 These enhancements benefit organizations in government and regulated industries adhering to NIST guidelines and FIPS standards. Both client- and server-side connections are supported alongside SSL Forward Proxy use cases. Quantum-Resistant TLS/SSL VPN Tunneling As quantum computing emerges, traditional encryption methods face threats from advanced computational power. To counteract this, BIG-IP Zero Trust Access (formerly BIG-IP APM) introduces quantum-resistant TLS/SSL VPN tunneling with X25519 + ML-KEM-768 hybrid key exchanges. This solution ensures NIST compliance while securing modern VPN tunnels. AI Workload Delivery, Security, and Access Enhancements Expanded Security and Delivery for Model Context Protocol (MCP) BIG-IP v21.1 enhances MCP support to ensure secure and consistent communication between AI models, applications, and data sources. MCP Protection: BIG-IP Advanced WAF now inspects MCP traffic, shielding AI workflows from emerging threats such as tool poisoning, secret exposure, and injection attacks detailed in the OWASP MCP Top 10. A new Security Policy template called MCP Protection Policy has been added: A new Blocking Page Response type has been added. The MCP Session ID is included in the Response Headers: The MCP Request ID is included in the Response Body: Sample response for an “Echo tool”: Sample rejected response: MCP Session Persistence: New session persistence for MCP traffic ensures smoother workflows by consistently routing session requests to the correct server. A new MCP Persistence Profile aimcp has been added: Optimized Agent-to-Agent Connectivity The introduction of experimental support for the Agent2Agent (A2A) protocol optimizes communication between AI agents, ensuring interoperability across fragmented platforms. Features Include: Load balancing for A2A traffic. Governance via iRules based logging and visibility. While experimental in v21.1, future releases promise full support with expanded management capabilities. Seamless, Secure Access for AI Agents The Dynamic Client Registration (DCR) capability expedites access requests for agentic AI systems. Agents can register themselves programmatically with BIG-IP Zero Trust Access, eliminating manual steps and streamlining workflows via API driven automation. F5 BIG-IP Zero Trust Access enables dynamic client registration to expedite access requests. Modern API and Protocol Protection HTTP/3 Protocol Traffic Security With HTTP/3 adoption expected to surpass HTTP/2 soon, BIG-IP Advanced WAF brings cutting edge protection for HTTP/3 traffic, matching the security levels of earlier protocol versions. Currently limited to client side protection, server side capabilities will follow in subsequent releases. OpenAPI 3.1 Specification Support BIG-IP Advanced WAF now protects APIs defined by OpenAPI 3.1, learning expected endpoints, data types, and security requirements while blocking improper requests, undocumented endpoint abuse, and API specific attacks. Previous OpenAPI versions (2.0, 3.0) remain supported. BIG-IP TMOS Software Modernization DNS Enhancements Multiple Response Policy Zones Feed Zones DNS security and efficiency are strengthened with the ability to configure and consolidate multiple RPZ feeds into a single DNS cache profile. Enhanced DNS Threat Mitigation Improved granularity allows IP-based blocking for malicious domains and dynamic responses to regional compliance mandates. Flexible DNS Response Actions Organizations gain flexibility to block, redirect, or manage traffic dynamically, streamlining DNS-level policy management. BIG-IP TMOS Software Modernization Introducing the New BIG-IP Declarative API Designed for modern application environments, the new BIG-IP Declarative API (in Alpha state) offers: Integrated lifecycle management. Per-app scalability for simplified configurations. Broadened automation capabilities with near real-time deployment. This marks a significant upgrade over AS3, empowering faster and more efficient automation workflows. Continued Control Plane Enhancements Control plane improvements focus on reliability, performance, and resource efficiency through upgrades to MCPd, iControl REST, and the BigD daemon. These enhancements: Speed up iControl REST API requests by up to 10%. Boost control-plane resilience under low-memory conditions. Enable multi-threaded scalability for BigD health monitoring. Expect recurring advancements across future BIG-IP versions. New Features in SSL Orchestrator v14 Policy-based Dynamic Egress Routing Policy-based dynamic egress routing is introduced in SSL Orchestrator that enables you to easily define egress routes based on traffic conditions, directly within the policy definition, thereby avoiding the need to configure complex layers involving multiple topologies and iRules. This feature is supported for Outbound and Inbound Gateway topologies. You can create a policy in BIG-IP LTM tailored to your routing requirements and then attach the policy to the SSL Orchestrator Virtual Server. This will ensure that the traffic is dynamically routed to the appropriate egress route based on the configurations defined within the LTM policy. L2 Devices Scalability Previously, SSL Orchestrator supported up to 8 physical devices per L2 Inspection Service. Now, SSL Orchestrator supports up to 50 devices per L2 Inspection Service, enabling greater scalability and flexibility. Inspection Service Persistence SSL Orchestrator now supports inspection service persistence, which enables client connections to consistently flow through the same inspection service, allowing it to track the entire user application flow seamlessly. This feature is supported for L2, L3, HTTP, and Advanced WAF off-box inspection services. A new Default Persistence Profile dropdown has been added to the Services page: Destination Address Affinity Hash Host (specific to HTTP services) Source Address Affinity SSL (for TCP Virtual Servers of L2/L3 services) Universal New Forcepoint URLDB Categories The following new Forcepoint URLDB categories have been added: Cryptocurrency (235): sites that provide digital currencies, decentralized financial services. Includes platforms facilitating cryptocurrency trading, wallets, ICOs, and blockchain-based financial services. Crypto Mining (236): sites that promote mining pools or enable crypto mining, including software downloads and browser-based mining scripts. Relaxed Protocol Compliance Rules for External Sites SSL Orchestrator now supports Relaxed HTTP Protocol Compliance by allowing you to select Transparent HTTP profiles to ease enforcement for non-compliant websites. This approach eliminates protocol disruptions and provides flexibility for forward proxy scenarios. The L7 Profile dropdown is introduced in Outbound Topology settings in the Interception Rules screen, which enables you to select a reverse or transparent HTTP Profile. By default, the topology specific http profile is used. HTTP Service: The Proxy Type dropdown is introduced. When you select Proxy Type as Transparent, the HTTP Profile dropdown will appear, allowing you to select the required HTTP transparent Profile. New Features in Zero Trust Access IPsec VPN Support Added support for Access IPsec VPN Tunnels, to meet global security standards and enable the transition from SSL/TLS-VPNs to IPsec VPNs. Clients can now connect to BIG-IP using the Windows Edge Client or F5 Access for macOS, establish an IPsec tunnel, and securely access the backend network. A new field, VPN Type, is introduced in the Connectivity Profile screen. When you set it to IPsec, the system automatically generates an Access IPsec Policy. HTTP Connector Support Added to Per-Session Policies in APM Support for the HTTP Connector in per-session policies is now available in F5 BIG-IP Access Policy Manager (APM). This feature enables administrators to send HTTP requests to external services during session establishment and use the response for authentication, authorization, and access control decisions. Dynamic Client Registration (DCR) support This release adds support for OAuth 2.0 Dynamic Client Registration (RFC 7591). Administrators can enable DCR on OAuth profiles to allow authorized clients to dynamically register using an Initial Access Token (IAT). The feature includes support for the Client Credentials grant type, configurable client authentication settings, client secret expiration, and enhanced logging. Custom Logging Preferences for Windows Edge Client The Windows Edge Client now offers custom logging preferences, giving you enhanced control over log verbosity to improve both security and flexibility. You can select the required log level from the APM Client Log Level drop-down in General Settings while creating a connectivity profile. Native Support for SAML Authentication for Windows APM clients now support native SAML authentication, significantly improving user experience, maintainability, and overall supportability. Edge Client on macOS and Windows can leverage the system’s default browser to authenticate users with identity providers (IdPs), enabling modern authentication mechanisms such as FIDO2 and Microsoft Entra ID device authentication. To enable this feature, select the Enable System Browser checkbox in Desktop Client Settings while creating a Connectivity Profile from Access > Connectivity / VPN > Connectivity > Profiles in BIG-IP. Auto-Upgrade Machine Tunnel Service Windows Edge Clients can now automatically upgrade the F5 Machine Tunnel Service when a newer version is available on BIG-IP, and the auto-upgrade feature is enabled. Additionally, if the Machine Tunnel service is running before the upgrade, it continues to run after the upgrade completes without affecting existing VPN configuration settings. Endpoint Inspection Support on Ubuntu with ARM64 Endpoint Inspection is now supported on Ubuntu with ARM64, allowing seamless management and inspection of endpoints on Linux ARM64 platforms. Conclusion Upgrade to BIG-IP v21.1 to unlock a new wave of features that enhance application delivery, security, and management. From PQC readiness and dynamic AI solutions to cutting-edge protocol protection, this release propels BIG-IP capabilities forward. Related Content BIG-IP v21.1 Release Notes Live Webinar on BIG-IP v21.1 Features Blog F5 BIG-IP v21.1 is now generally available, bringing PQC and AI security enhancements Feel free to reach out for additional resources or clarification. Happy upgrading!112Views1like0CommentsIntent-Based Load Balancing for AI Traffic on BIG-IP
Intent-Based Load Balancing is a native BIG-IP AI Gateway project that routes OpenAI-compatible traffic based on user intent, Virtual Keys, routing policy, and backend model configuration. It uses iApps LX, iRules, iRules LX, and BIG-IP LTM pools to classify, route, block, or locally respond to AI requests without requiring an external control-plane service.
104Views1like0CommentsF5 LTM Virtual Server IP NAT Configuration
If from firewall side needs to do NAT Server Mapping between My Virtual Server IP and One public IP and the connection is outbound only , will i give Virtual Server IP or F5 Self-IP to security Team to do the NAT Mapping. From My Understanding i should give them self-Ip since Since F5 will change the source Ip to Self-Ip when going out.91Views0likes2Comments