Created
June 16, 2014 01:09
-
-
Save wyattanderson/a208734ffc8e29dca371 to your computer and use it in GitHub Desktop.
UFW Salt Module
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
root@ufw-test:/srv/salt/_modules# salt-call --local ufw_firewall.set_enabled | |
local: | |
Firewall is active and enabled on system startup | |
root@ufw-test:/srv/salt/_modules# salt-call --local ufw_firewall.status | |
local: | |
---------- | |
app_policy: | |
skip | |
default: | |
---------- | |
incoming: | |
deny | |
outgoing: | |
allow | |
routed: | |
disabled | |
logging: | |
on (low) | |
status: | |
active |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gettext | |
import re | |
import textwrap | |
from functools import wraps | |
import salt.utils | |
import ufw.frontend | |
import ufw.common | |
def __virtual__(): | |
if not salt.utils.which('ufw'): | |
return False | |
return True | |
class _Deserializers(object): | |
@staticmethod | |
def paren_kv(value): | |
pairs = value.split(', ') | |
reversed_pairs = (re.match(r"(\w+) \((\w+)\)", pair).groups() for pair | |
in pairs) | |
return dict((p[1], p[0]) for p in reversed_pairs) | |
def with_ui(f): | |
"""Ensures that the wrapped function will be passed an instance of | |
``UFWFrontend`` as its first positional argument, with ``dryrun`` set | |
accordingly.""" | |
@wraps(f) | |
def wrapper(*args, **kwargs): | |
dryrun = False | |
if 'dryrun' in kwargs: | |
dryrun = kwargs.pop('dryrun') | |
gettext.install(ufw.common.programName) | |
ui = ufw.frontend.UFWFrontend(dryrun) | |
if not args: | |
args = [] | |
return f(ui, *args, **kwargs) | |
return wrapper | |
@with_ui | |
def status(ui, **kwargs): | |
"""Parses the UFW status output into a dictionary suitable for machine | |
consumption.""" | |
_STATUSLINE_MAP = { | |
'Status': dict(key='status'), | |
'Logging': dict(key='logging'), | |
'Default': dict(key='default', deserializer='paren_kv'), | |
'New profiles': dict(key='app_policy'), | |
} | |
status_lines = ui.get_status(verbose=True, show_count=True).split('\n') | |
ret = {} | |
while status_lines: | |
line = status_lines.pop(0) | |
if not line: | |
break | |
key, value = line.split(': ', 1) | |
mapping = _STATUSLINE_MAP[key] | |
if 'deserializer' in mapping: | |
value = getattr(_Deserializers, mapping['deserializer'])(value) | |
ret[_STATUSLINE_MAP[key]['key']] = value | |
return ret | |
@with_ui | |
def set_enabled(ui, enabled=True, **kwargs): | |
return ui.set_enabled(enabled) | |
@with_ui | |
def set_default_policy(ui, policy, direction, **kwargs): | |
return ui.set_default_policy(policy, direction) | |
@with_ui | |
def set_loglevel(ui, level, **kwargs): | |
return ui.set_loglevel(level) | |
@with_ui | |
def get_show_raw(ui, rules_type="raw", **kwargs): | |
return ui.get_show_raw(rules_type) | |
@with_ui | |
def set_default_application_policy(ui, policy, **kwargs): | |
return ui.set_default_application_policy(policy) | |
@with_ui | |
def get_application_list(ui, **kwargs): | |
app_list_lines = ui.get_application_list().split('\n') | |
# Pop the header line | |
app_list_lines.pop(0) | |
return tuple((line.strip() for line in app_list_lines)) | |
@with_ui | |
def get_application_info(ui, pname, **kwargs): | |
app_info_lines = ui.get_application_info(pname).split('\n', 2) | |
_APP_INFO_MAPPING = { | |
'Profile': dict(key='profile'), | |
'Title': dict(key='title'), | |
'Description': dict(key='description'), | |
} | |
ret = {} | |
should_parse_ports = False | |
while app_info_lines: | |
line = app_info_lines.pop(0) | |
if not line: | |
break | |
if should_parse_ports: | |
# TODO: parse ports into a list | |
break | |
key, value = line.split(': ', 1) | |
if key == 'Description': | |
value, rest = value.split('\n\n') | |
app_info_lines.append(rest) | |
should_parse_ports = True | |
mapping = _APP_INFO_MAPPING[key] | |
if 'deserializer' in mapping: | |
value = getattr(_Deserializers, mapping['deserializer'])(value) | |
ret[mapping['key']] = value | |
return ret | |
@with_ui | |
def reset(ui, **kwargs): | |
# Doesn't really make any sense to prompt in this context; we'll just | |
# always force a reset. | |
return ui.reset(force=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment