Skip to content

Instantly share code, notes, and snippets.

@hzbd
Created September 13, 2024 05:57
Show Gist options
  • Save hzbd/34b59034339b54828457b1b6a2d393eb to your computer and use it in GitHub Desktop.
Save hzbd/34b59034339b54828457b1b6a2d393eb to your computer and use it in GitHub Desktop.
easyrsa generate kit
import logging
import os
from datetime import datetime
from os import path
from shutil import rmtree
from subprocess import PIPE, CalledProcessError, Popen
import OpenSSL
import ssl
logger = logging.getLogger(__name__)
OVPN_DIR = "/tmp/openvpn"
OVPN_BIN = "/usr/sbin/openvpn"
EASYRSA_BIN = "/usr/share/easy-rsa/easyrsa"
OPENSSL_BIN = "/usr/bin/openssl"
def asntime_to_datetime(asn_time):
if len(asn_time) != 13 and asn_time[12:] != "Z":
return None
return datetime(
year=2000 + int(asn_time[:2]),
month=int(asn_time[2:4]),
day=int(asn_time[4:6]),
hour=int(asn_time[6:8]),
minute=int(asn_time[8:10]),
second=int(asn_time[10:12]),
)
class EasyRSA(object):
def __init__(self, openvpn_dir=None, openvpn_bin=None, easyrsa_bin=None, openssl_bin=None, env={}):
self.easyrsa_bin = easyrsa_bin or os.environ.get("EASYRSA_BIN", EASYRSA_BIN)
if self.easyrsa_bin is None:
raise Exception(
"can not locate EASYRSA tool, consider set environment variable EASYRSA"
)
self.easyrsa_bin = easyrsa_bin
self.openssl_bin = openssl_bin if openssl_bin is not None else OPENSSL_BIN
self.openvpn_bin = openvpn_bin if openvpn_bin is not None else OVPN_BIN
self.openvpn_dir = openvpn_dir if openvpn_dir is not None else OVPN_DIR
self.pki_dir = os.path.join(self.openvpn_dir, "pki")
self.DATE_FORMAT = '%Y%m%d%H%M%SZ'
try:
os.makedirs(self.pki_dir, exist_ok=True)
logger.info(f"PKI directory {self.pki_dir} created successfully.")
except OSError as e:
logger.warning(f"Failed to create PKI directory {self.pki_dir}. Error: {e}")
default_env = {
"EASYRSA_BATCH": "yes",
"EASYRSA_DIGEST": "sha512",
"EASYRSA_RAND_SN": "yes",
"EASYRSA_NS_SUPPORT": "yes",
"EASYRSA_ALGO": "rsa",
"EASYRSA_KEY_SIZE": "2048",
"EASYRSA_CA_EXPIRE": "3650",
"EASYRSA_CERT_EXPIRE": "3650",
"EASYRSA_CERT_RENEW": "30",
"EASYRSA_CRL_DAYS": "180",
"EASYRSA_DN": "cn_only",
"EASYRSA_REQ_COUNTRY": "CN",
"EASYRSA_REQ_PROVINCE": "SH",
"EASYRSA_REQ_CITY": "Shanghai",
"EASYRSA_REQ_ORG": "Fuxi,Co",
"EASYRSA_REQ_EMAIL": "[email protected]",
"EASYRSA_REQ_OU": "NetworkSIG",
"EASYRSA_PKI": self.pki_dir,
}
self.env = dict(default_env, **env)
self.file_dh = path.join(self.pki_dir, "dh.pem")
self.file_ca_crt = path.join(self.pki_dir, "ca.crt")
self.file_server_cert = ""
self.file_server_key = ""
self.file_tls_crypt = path.join(self.pki_dir, "tls-crypt.key")
self.file_crl = path.join(self.pki_dir, "crl.pem")
self.file_index = path.join(self.pki_dir, "index.txt")
# self.file_openssl = path.join(self.pki_dir, "openssl-easyrsa.cnf")
def _run(self, args, env={}):
default_env = self.env.copy()
sys_env = os.environ.copy()
sys_env.update(default_env)
current_env = dict(sys_env, **env)
p = Popen(args, stdout=PIPE, stderr=PIPE, cwd=self.openvpn_dir, env=current_env)
stdout, stderr = p.communicate()
retcode = p.poll()
if retcode:
try:
stderr_str = stderr.decode("utf-8")
except UnicodeDecodeError:
stderr_str = "Error decoding stderr"
logger.error("command returned error: %s", stderr_str)
raise CalledProcessError(retcode, args, output=stderr_str)
else:
logger.info(
"command execution completed with stdout: %s", stdout.decode("utf-8")
)
return True
def _load(self, file_path):
try:
with open(file_path, 'rb') as file:
return file.read()
except IOError as e:
logging.error(f"Error reading file {file_path}: {e}")
raise
def _write_vars_file(self, vars_dict, output_path):
build_at = datetime.now()
with open(output_path, 'w') as f:
f.write('# @buildAt {}\n by Fuxi'.format(build_at))
for key, value in vars_dict.items():
f.write(f'set_var {key} "{value}"\n')
logger.warning(f"Vars({output_path}) file written variables: {vars_dict}")
def _read_file_by_dict(self, files_dict):
json_data = {}
for filename, filepath in files_dict.items():
if os.path.exists(filepath):
with open(filepath, 'r') as file:
content = file.read().strip()
json_data[filename] = content
else:
json_data[filename] = "File not found: {}".format(filepath)
return json_data
def gen_rnd(self):
logger.info("Gen random file.")
file = path.join(self.pki_dir, ".rnd")
args = [self.openssl_bin, "rand", "-writerand", file]
return self._run(args)
def init_pki(self):
logger.info("Initializing public key infrastructure (PKI)")
args = [self.easyrsa_bin, "init-pki"]
return self._run(args)
def remove_pki(self):
logger.warning("Clean all easyrsa data.")
return rmtree(self.pki_dir)
def build_ca(self, cn="Root"):
logger.info("Building certificiate authority (CA)")
env = {"EASYRSA_REQ_CN": cn}
args = [self.easyrsa_bin, "build-ca", "nopass"]
return self._run(args, env=env)
def gen_dh(self):
logger.info("Generating Diffie-Hellman (DH/2048) parameters")
if path.exists(self.file_dh):
logger.warning("DH already exists:", self.file_dh)
else:
args = [self.easyrsa_bin, "gen-dh"]
return self._run(args)
def pre_gen_dh(self, conext):
logger.info("Pre generating Diffie-Hellman(DH/2048) file: {}".format(self.file_dh))
with open(self.file_dh, 'w') as f:
f.write(conext)
def build_server_cert(self, cn=None):
# vars_file = path.join(self.pki_dir, "vars")
# logger.info("Creating a customized vars file: %s" % vars_file)
# self._write_vars_file(self.env, vars_file)
logger.info("Building server certificiate")
if cn is None:
raise "CN must be specified"
args = [self.easyrsa_bin, "build-server-full", cn, "nopass"]
self.file_server_cert = path.join(self.pki_dir, "issued/{}.crt".format(cn))
self.file_server_key = path.join(self.pki_dir, "private/{}.key".format(cn))
return self._run(args)
def build_client_cert(self, cn=None):
logger.info("Builing client certificiate")
if cn is None:
raise "CN must be specified"
args = [self.easyrsa_bin, "build-client-full", cn, "nopass"]
return self._run(args)
def build_tls_crypt_key(self):
logger.info("Generating tls crypt key")
args = [self.openvpn_bin, "--genkey", "secret", self.file_tls_crypt]
return self._run(args)
def renew_cert(self, cn):
"""
!!! Command can only be used when there are 30 days left
in the certificate's validity period.
"EASYRSA_CERT_RENEW": "30" # days
"""
logger.info("Renew certificiate")
if cn is None:
raise "CN must be specified"
args = [self.easyrsa_bin, "renew", cn, "nopass"]
return self._run(args)
def gen_crl(self):
logger.info("Generating certificate revocation list (CRL)")
args = [self.easyrsa_bin, "gen-crl"]
return self._run(args)
def revoke_client_cert(self, cn):
try:
if cn is None:
raise "CN must be specified"
args = [self.easyrsa_bin, "revoke", cn]
self._run(args)
return self.gen_crl()
except Exception as error:
logger.warning(f"Certificate not exists: {error}")
def init_all(self, dh_conext=None, server_cn=None):
self.gen_rnd()
self.init_pki()
self.build_ca()
self.pre_gen_dh(conext=dh_conext)
self.build_server_cert(cn=server_cn)
self.build_tls_crypt_key()
self.gen_crl()
def get_cert_expiry(self, file, direct=False):
if not direct:
cert_context = self._load(file)
else:
cert_context = file
try:
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_context)
expiry_date = x509.get_notAfter().decode()
return datetime.strptime(expiry_date, self.DATE_FORMAT)
except OpenSSL.crypto.Error as error:
logging.error(f"Error parsing certificate: {error}")
return None
def get_client_certs(self):
"""
STATUS: V:Active, R:Revoked, E:Expired, S:Suspended, C:Ceased
"""
import csv
certs = []
with open(self.file_index) as indexfile:
for line in csv.reader(indexfile, dialect="excel-tab"):
if line[0]:
cert = {}
cert["status"] = line[0]
cert["expire_at"] = asntime_to_datetime(line[1])
cert["revoke_at"] = asntime_to_datetime(line[2])
cert["serial"] = line[3]
cert["reason"] = line[4]
cert["subject"] = line[5]
cert["identity"] = {}
for i in line[5].split("/"):
if len(i.strip()) == 0:
continue
k, v = i.split("=")
cert["identity"][k] = v.strip()
cert["name"] = cert["identity"]["CN"]
certs.append(cert)
return certs
def export_crl_json(self):
files_dict = {"ctl_verify": self.file_crl}
json_data = self._read_file_by_dict(files_dict)
return json_data
def export_init_json(self):
ca_expiry_at = self.get_cert_expiry(self.file_ca_crt)
item_dict = {
"dh": self.file_dh,
"ca": self.file_ca_crt,
"server_cert": self.file_server_cert,
"server_key": self.file_server_key,
"tls_crypt": self.file_tls_crypt,
"crl_verify": self.file_crl,
"index": self.file_index,
}
json_data = self._read_file_by_dict(item_dict)
json_data['ca_expiry_at'] = ca_expiry_at
return json_data
def export_client_json(self, cn=None):
client_cert = path.join(self.pki_dir, "issued/{}.crt".format(cn))
client_key = path.join(self.pki_dir, "private/{}.key".format(cn))
cert_expiry_at = self.get_cert_expiry(client_cert)
item_dict = {"client_cert": client_cert, "client_key": client_key}
json_data = self._read_file_by_dict(item_dict)
json_data['client_cert_expiry_at'] = cert_expiry_at
return json_data
if __name__ == "__main__":
maker = EasyRSA(
openvpn_dir=OVPN_DIR,
openvpn_bin=OVPN_BIN,
easyrsa_bin=EASYRSA_BIN,
openssl_bin=OPENSSL_BIN
)
# maker.gen_rnd()
# maker.init_pki()
# maker.gen_dh()
# maker.build_ca()
# maker.build_server_cert(cn="server_10010101")
# print(maker.build_client_cert(cn="user1"))
# maker.build_client_cert(cn="user2")
# maker.build_client_cert(cn="user3")
# maker.build_tls_crypt_key()
# maker.gen_crl()
# print(maker.revoke_client_cert(cn="user1"))
# print(maker.get_client_certs())
# ret = maker.export_init_json()
# print(ret)
client_ret = maker.export_client_json(cn="user3")
print(client_ret)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment