Skip to content

Instantly share code, notes, and snippets.

@wyattanderson
Created June 16, 2014 01:09
Show Gist options
  • Save wyattanderson/a208734ffc8e29dca371 to your computer and use it in GitHub Desktop.
Save wyattanderson/a208734ffc8e29dca371 to your computer and use it in GitHub Desktop.
UFW Salt Module
root@ufw-test:/srv/salt/_modules# salt-call --local ufw_firewall.set_enabled
local:
Firewall is active and enabled on system startup
root@ufw-test:/srv/salt/_modules# salt-call --local ufw_firewall.status
local:
----------
app_policy:
skip
default:
----------
incoming:
deny
outgoing:
allow
routed:
disabled
logging:
on (low)
status:
active
import gettext
import re
import textwrap
from functools import wraps
import salt.utils
import ufw.frontend
import ufw.common
def __virtual__():
if not salt.utils.which('ufw'):
return False
return True
class _Deserializers(object):
@staticmethod
def paren_kv(value):
pairs = value.split(', ')
reversed_pairs = (re.match(r"(\w+) \((\w+)\)", pair).groups() for pair
in pairs)
return dict((p[1], p[0]) for p in reversed_pairs)
def with_ui(f):
"""Ensures that the wrapped function will be passed an instance of
``UFWFrontend`` as its first positional argument, with ``dryrun`` set
accordingly."""
@wraps(f)
def wrapper(*args, **kwargs):
dryrun = False
if 'dryrun' in kwargs:
dryrun = kwargs.pop('dryrun')
gettext.install(ufw.common.programName)
ui = ufw.frontend.UFWFrontend(dryrun)
if not args:
args = []
return f(ui, *args, **kwargs)
return wrapper
@with_ui
def status(ui, **kwargs):
"""Parses the UFW status output into a dictionary suitable for machine
consumption."""
_STATUSLINE_MAP = {
'Status': dict(key='status'),
'Logging': dict(key='logging'),
'Default': dict(key='default', deserializer='paren_kv'),
'New profiles': dict(key='app_policy'),
}
status_lines = ui.get_status(verbose=True, show_count=True).split('\n')
ret = {}
while status_lines:
line = status_lines.pop(0)
if not line:
break
key, value = line.split(': ', 1)
mapping = _STATUSLINE_MAP[key]
if 'deserializer' in mapping:
value = getattr(_Deserializers, mapping['deserializer'])(value)
ret[_STATUSLINE_MAP[key]['key']] = value
return ret
@with_ui
def set_enabled(ui, enabled=True, **kwargs):
return ui.set_enabled(enabled)
@with_ui
def set_default_policy(ui, policy, direction, **kwargs):
return ui.set_default_policy(policy, direction)
@with_ui
def set_loglevel(ui, level, **kwargs):
return ui.set_loglevel(level)
@with_ui
def get_show_raw(ui, rules_type="raw", **kwargs):
return ui.get_show_raw(rules_type)
@with_ui
def set_default_application_policy(ui, policy, **kwargs):
return ui.set_default_application_policy(policy)
@with_ui
def get_application_list(ui, **kwargs):
app_list_lines = ui.get_application_list().split('\n')
# Pop the header line
app_list_lines.pop(0)
return tuple((line.strip() for line in app_list_lines))
@with_ui
def get_application_info(ui, pname, **kwargs):
app_info_lines = ui.get_application_info(pname).split('\n', 2)
_APP_INFO_MAPPING = {
'Profile': dict(key='profile'),
'Title': dict(key='title'),
'Description': dict(key='description'),
}
ret = {}
should_parse_ports = False
while app_info_lines:
line = app_info_lines.pop(0)
if not line:
break
if should_parse_ports:
# TODO: parse ports into a list
break
key, value = line.split(': ', 1)
if key == 'Description':
value, rest = value.split('\n\n')
app_info_lines.append(rest)
should_parse_ports = True
mapping = _APP_INFO_MAPPING[key]
if 'deserializer' in mapping:
value = getattr(_Deserializers, mapping['deserializer'])(value)
ret[mapping['key']] = value
return ret
@with_ui
def reset(ui, **kwargs):
# Doesn't really make any sense to prompt in this context; we'll just
# always force a reset.
return ui.reset(force=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment