|
#!/usr/bin/env python3 |
|
#-*- coding: utf-8 -*- |
|
|
|
import requests |
|
import argparse |
|
import sys |
|
import re |
|
|
|
# -- edit here |
|
email = 'your-api-email' |
|
zone_id = 'your-zone-id' |
|
auth_key = 'your-auth-key' |
|
|
|
debug = False |
|
|
|
# -- stop editing here |
|
|
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument('action', choices=('add', 'delete')) |
|
parser.add_argument('ip', type=str) |
|
|
|
args = parser.parse_args() |
|
|
|
headers = { |
|
'X-Auth-Email': email, |
|
'X-Auth-Key': auth_key, |
|
'Content-Type': 'application/json', |
|
} |
|
|
|
params = ( |
|
('description', 'fail2ban'), |
|
) |
|
|
|
api_url = 'https://api.cloudflare.com/client/v4/zones/{0}/firewall/rules'.format(zone_id) |
|
|
|
s = requests.Session() |
|
s.headers.update(headers) |
|
s.timeout = 1 |
|
|
|
def dLog(string): |
|
if debug: |
|
print(string) |
|
|
|
def create_rule(): |
|
data = '[{ "filter": { "expression":"(ip.src in {127.0.0.235})" }, "action": "block", "description": "fail2ban" }]' |
|
d = s.post(api_url, data=data) |
|
|
|
if d.status_code == 200 and d.json()['success']: |
|
dLog('Rule creation succesful: {0}'.format(str(d.json())) ) |
|
return d.json()['result'][0]['id'], d.json()['result'][0]['filter']['id'], d.json()['result'][0]['filter']['expression'] |
|
else: |
|
dLog('Rule creation failed: {0}'.format( str(d.json()))) |
|
sys.exit(1) |
|
|
|
def get_rule(): |
|
d = s.get(api_url, params=params) |
|
|
|
if d.status_code == 200 and d.json()['result']: |
|
dLog('Rule exists!') |
|
return d.json()['result'][0]['id'], d.json()['result'][0]['filter']['id'], d.json()['result'][0]['filter']['expression'] |
|
else: |
|
dLog('Rule doesn\'t exist. Creating...') |
|
return create_rule() |
|
|
|
def update_rule(ips): |
|
list_of_ips = ' '.join(ips) |
|
expression = 'ip.src in {%s}' % list_of_ips |
|
data = '{ "id": "%s", "paused": false, "description": "fail2ban", "expression": "(%s)"}' % (filter_id, expression) |
|
|
|
api_url = 'https://api.cloudflare.com/client/v4/zones/{0}/filters/{1}'.format(zone_id, filter_id) |
|
|
|
dLog('Data: {0}'.format(data)) |
|
dLog('URL: {0}'.format(api_url)) |
|
|
|
d = s.put(api_url, data=data) |
|
|
|
if d.status_code == 200: |
|
dLog('Filter update succeeded: {0}'.format( str(d.json())) ) |
|
print('Action: {0} {1} from filter returned success'.format(args.action, args.ip)) |
|
sys.exit(0) |
|
else: |
|
dLog('Filter update failed: {0}'.format( str(d.json()) )) |
|
sys.exit(1) |
|
|
|
# stuff happens here: |
|
rule_id, filter_id, expression = get_rule() |
|
|
|
m = re.search('{(.+?)}', expression) |
|
|
|
if m: |
|
ips = m[1].split() |
|
else: |
|
print('could not retrieve a list of IPs from the expression?') |
|
sys.exit(1) |
|
|
|
if args.action == 'add': |
|
if args.ip in ips: |
|
print('IP {0} already exists in the filter expression'.format(args.ip)) |
|
sys.exit(1) |
|
else: |
|
ips.append(args.ip) |
|
update_rule(ips) |
|
|
|
elif args.action == 'delete': |
|
if args.ip in ips: |
|
ips.remove(args.ip) |
|
update_rule(ips) |
|
else: |
|
print('IP {0} does not exist in the filter expression'.format(args.ip)) |
|
sys.exit(1) |
|
|
|
else: |
|
print('?!') |
|
|
|
# vim: set sw=2 tabstop=2 expandtab : |