Last active
May 6, 2022 10:53
-
-
Save sp3c73r2038/1c9dfc42a67364db12f1d7cb636d7f16 to your computer and use it in GitHub Desktop.
minimal api client for fortigate vpn local user
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import logging | |
import requests | |
LOGGER = logging.getLogger(__name__) | |
def join(*args): | |
return '/'.join(list(map(lambda x: x.strip('/'), args))) | |
class FortiAPI: | |
""" | |
a simplified version of local user/group management API client | |
always interactive with local storage type | |
""" | |
def __init__(self, url, username, password, tlsCert=None): | |
self.url = url | |
self.username = username | |
self.password = password | |
self.session = requests.Session() | |
if tlsCert is not None: | |
LOGGER.info('>> using tls cert: %s', tlsCert) | |
self.session.verify = tlsCert | |
self.login() | |
def _handleResp(self, resp, prompt='request failed', expected={200,}): | |
# handle csrf token | |
for cookie in resp.cookies: | |
if cookie.name == 'ccsrftoken': | |
v = cookie.value[1:-1] | |
self.session.headers.update({'X-CSRFTOKEN': v}) | |
if resp.status_code not in expected: | |
raise RuntimeError('{}: {}'.format(prompt, resp.text)) | |
def _handleJSON(self, resp, default=None, single=False): | |
rt = resp.json().get('results', default) | |
if isinstance(rt, list) and single: | |
rt = rt[0] | |
return rt | |
def login(self): | |
url = join(self.url, 'logincheck') | |
resp = self.session.post(url, { | |
'username': self.username, | |
'secretkey': self.password, | |
}) | |
self._handleResp(resp) | |
def getUserSchema(self): | |
url = join(self.url, '/api/v2/cmdb/user/local') | |
resp = self.session.get(url, params={'action': 'schema'}) | |
self._handleResp(resp) | |
return self._handleJSON(resp) | |
def getUsers(self): | |
url = join(self.url, '/api/v2/cmdb/user/local') | |
resp = self.session.get(url) | |
self._handleResp(resp) | |
return self._handleJSON(resp) | |
def getUser(self, name): | |
if not name: | |
raise ValueError('name must not be empty') | |
url = join(self.url, '/api/v2/cmdb/user/local', name) | |
resp = self.session.get(url) | |
self._handleResp(resp, prompt='cannot get user') | |
return self._handleJSON(resp, single=True) | |
def getGroups(self): | |
url = join(self.url, '/api/v2/cmdb/user/group') | |
resp = self.session.get(url) | |
self._handleResp(resp) | |
return self._handleJSON(resp) | |
def getGroup(self, name): | |
if not name: | |
raise ValueError('name must not be empty') | |
url = join(self.url, '/api/v2/cmdb/user/group', name) | |
resp = self.session.get(url) | |
self._handleResp(resp, prompt='cannot get group') | |
return self._handleJSON(resp, single=True) | |
def addUserToGroup(self, user, group): | |
if not user or not group: | |
raise ValueError('user and group must not be empty') | |
# check user existence | |
self.getUser(user) | |
g = self.getGroup(group) | |
memberSet = set(map(lambda x: x['name'], g['member'])) | |
if user in memberSet: | |
LOGGER.info('user %s already in group %s', user, group) | |
return | |
g['member'].append({ | |
'name': user, | |
'q_origin_key': user, | |
}) | |
url = join(self.url, '/api/v2/cmdb/user/group', group) | |
resp = self.session.put(url, json=g) | |
self._handleResp(resp, prompt='add user to group failed') | |
def removeUserFromGroup(self, user, group): | |
if not user or not group: | |
raise ValueError('user and group must not be empty') | |
# check user existence | |
self.getUser(user) | |
g = self.getGroup(group) | |
memberSet = set(map(lambda x: x['name'], g['member'])) | |
if user not in memberSet: | |
LOGGER.info('user %s is already absent from group %s', user, group) | |
return | |
memberSet.remove(user) | |
member = list(map(lambda x: {'name': x, 'q_origin_key': x}, sorted(list(memberSet)))) | |
g['member'] = member | |
url = join(self.url, '/api/v2/cmdb/user/group', group) | |
resp = self.session.put(url, json=g) | |
self._handleResp(resp, prompt='remove user from group failed') | |
def createUser(self, user, email, password): | |
if not user or not email or not password: | |
raise ValueError('user, email and password must not be empty') | |
try: | |
self.getUser(user) | |
LOGGER.info('>> user %s already exists', user) | |
return | |
except RuntimeError: | |
pass | |
url = join(self.url, '/api/v2/cmdb/user/local') | |
data = { | |
'name': user, | |
'email-to': email, | |
'passwd': password, | |
'type': 'password', | |
} | |
resp = self.session.post(url, json=data) | |
self._handleResp(resp, prompt='create user failed') | |
body = resp.json() | |
if body.get('status') != 'success': | |
raise RuntimeError('create user failed: {}'.format(body)) | |
def deleteUser(self, user): | |
try: | |
self.getUser(user) | |
except RuntimeError: | |
LOGGER.info(">> user %s is already absent", user) | |
return | |
url = join(self.url, '/api/v2/cmdb/user/local', user) | |
resp = self.session.delete(url) | |
self._handleResp(resp, prompt='delete user failed') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment