Created
September 13, 2024 05:57
-
-
Save hzbd/34b59034339b54828457b1b6a2d393eb to your computer and use it in GitHub Desktop.
easyrsa generate kit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
from datetime import datetime | |
from os import path | |
from shutil import rmtree | |
from subprocess import PIPE, CalledProcessError, Popen | |
import OpenSSL | |
import ssl | |
logger = logging.getLogger(__name__) | |
OVPN_DIR = "/tmp/openvpn" | |
OVPN_BIN = "/usr/sbin/openvpn" | |
EASYRSA_BIN = "/usr/share/easy-rsa/easyrsa" | |
OPENSSL_BIN = "/usr/bin/openssl" | |
def asntime_to_datetime(asn_time): | |
if len(asn_time) != 13 and asn_time[12:] != "Z": | |
return None | |
return datetime( | |
year=2000 + int(asn_time[:2]), | |
month=int(asn_time[2:4]), | |
day=int(asn_time[4:6]), | |
hour=int(asn_time[6:8]), | |
minute=int(asn_time[8:10]), | |
second=int(asn_time[10:12]), | |
) | |
class EasyRSA(object): | |
def __init__(self, openvpn_dir=None, openvpn_bin=None, easyrsa_bin=None, openssl_bin=None, env={}): | |
self.easyrsa_bin = easyrsa_bin or os.environ.get("EASYRSA_BIN", EASYRSA_BIN) | |
if self.easyrsa_bin is None: | |
raise Exception( | |
"can not locate EASYRSA tool, consider set environment variable EASYRSA" | |
) | |
self.easyrsa_bin = easyrsa_bin | |
self.openssl_bin = openssl_bin if openssl_bin is not None else OPENSSL_BIN | |
self.openvpn_bin = openvpn_bin if openvpn_bin is not None else OVPN_BIN | |
self.openvpn_dir = openvpn_dir if openvpn_dir is not None else OVPN_DIR | |
self.pki_dir = os.path.join(self.openvpn_dir, "pki") | |
self.DATE_FORMAT = '%Y%m%d%H%M%SZ' | |
try: | |
os.makedirs(self.pki_dir, exist_ok=True) | |
logger.info(f"PKI directory {self.pki_dir} created successfully.") | |
except OSError as e: | |
logger.warning(f"Failed to create PKI directory {self.pki_dir}. Error: {e}") | |
default_env = { | |
"EASYRSA_BATCH": "yes", | |
"EASYRSA_DIGEST": "sha512", | |
"EASYRSA_RAND_SN": "yes", | |
"EASYRSA_NS_SUPPORT": "yes", | |
"EASYRSA_ALGO": "rsa", | |
"EASYRSA_KEY_SIZE": "2048", | |
"EASYRSA_CA_EXPIRE": "3650", | |
"EASYRSA_CERT_EXPIRE": "3650", | |
"EASYRSA_CERT_RENEW": "30", | |
"EASYRSA_CRL_DAYS": "180", | |
"EASYRSA_DN": "cn_only", | |
"EASYRSA_REQ_COUNTRY": "CN", | |
"EASYRSA_REQ_PROVINCE": "SH", | |
"EASYRSA_REQ_CITY": "Shanghai", | |
"EASYRSA_REQ_ORG": "Fuxi,Co", | |
"EASYRSA_REQ_EMAIL": "[email protected]", | |
"EASYRSA_REQ_OU": "NetworkSIG", | |
"EASYRSA_PKI": self.pki_dir, | |
} | |
self.env = dict(default_env, **env) | |
self.file_dh = path.join(self.pki_dir, "dh.pem") | |
self.file_ca_crt = path.join(self.pki_dir, "ca.crt") | |
self.file_server_cert = "" | |
self.file_server_key = "" | |
self.file_tls_crypt = path.join(self.pki_dir, "tls-crypt.key") | |
self.file_crl = path.join(self.pki_dir, "crl.pem") | |
self.file_index = path.join(self.pki_dir, "index.txt") | |
# self.file_openssl = path.join(self.pki_dir, "openssl-easyrsa.cnf") | |
def _run(self, args, env={}): | |
default_env = self.env.copy() | |
sys_env = os.environ.copy() | |
sys_env.update(default_env) | |
current_env = dict(sys_env, **env) | |
p = Popen(args, stdout=PIPE, stderr=PIPE, cwd=self.openvpn_dir, env=current_env) | |
stdout, stderr = p.communicate() | |
retcode = p.poll() | |
if retcode: | |
try: | |
stderr_str = stderr.decode("utf-8") | |
except UnicodeDecodeError: | |
stderr_str = "Error decoding stderr" | |
logger.error("command returned error: %s", stderr_str) | |
raise CalledProcessError(retcode, args, output=stderr_str) | |
else: | |
logger.info( | |
"command execution completed with stdout: %s", stdout.decode("utf-8") | |
) | |
return True | |
def _load(self, file_path): | |
try: | |
with open(file_path, 'rb') as file: | |
return file.read() | |
except IOError as e: | |
logging.error(f"Error reading file {file_path}: {e}") | |
raise | |
def _write_vars_file(self, vars_dict, output_path): | |
build_at = datetime.now() | |
with open(output_path, 'w') as f: | |
f.write('# @buildAt {}\n by Fuxi'.format(build_at)) | |
for key, value in vars_dict.items(): | |
f.write(f'set_var {key} "{value}"\n') | |
logger.warning(f"Vars({output_path}) file written variables: {vars_dict}") | |
def _read_file_by_dict(self, files_dict): | |
json_data = {} | |
for filename, filepath in files_dict.items(): | |
if os.path.exists(filepath): | |
with open(filepath, 'r') as file: | |
content = file.read().strip() | |
json_data[filename] = content | |
else: | |
json_data[filename] = "File not found: {}".format(filepath) | |
return json_data | |
def gen_rnd(self): | |
logger.info("Gen random file.") | |
file = path.join(self.pki_dir, ".rnd") | |
args = [self.openssl_bin, "rand", "-writerand", file] | |
return self._run(args) | |
def init_pki(self): | |
logger.info("Initializing public key infrastructure (PKI)") | |
args = [self.easyrsa_bin, "init-pki"] | |
return self._run(args) | |
def remove_pki(self): | |
logger.warning("Clean all easyrsa data.") | |
return rmtree(self.pki_dir) | |
def build_ca(self, cn="Root"): | |
logger.info("Building certificiate authority (CA)") | |
env = {"EASYRSA_REQ_CN": cn} | |
args = [self.easyrsa_bin, "build-ca", "nopass"] | |
return self._run(args, env=env) | |
def gen_dh(self): | |
logger.info("Generating Diffie-Hellman (DH/2048) parameters") | |
if path.exists(self.file_dh): | |
logger.warning("DH already exists:", self.file_dh) | |
else: | |
args = [self.easyrsa_bin, "gen-dh"] | |
return self._run(args) | |
def pre_gen_dh(self, conext): | |
logger.info("Pre generating Diffie-Hellman(DH/2048) file: {}".format(self.file_dh)) | |
with open(self.file_dh, 'w') as f: | |
f.write(conext) | |
def build_server_cert(self, cn=None): | |
# vars_file = path.join(self.pki_dir, "vars") | |
# logger.info("Creating a customized vars file: %s" % vars_file) | |
# self._write_vars_file(self.env, vars_file) | |
logger.info("Building server certificiate") | |
if cn is None: | |
raise "CN must be specified" | |
args = [self.easyrsa_bin, "build-server-full", cn, "nopass"] | |
self.file_server_cert = path.join(self.pki_dir, "issued/{}.crt".format(cn)) | |
self.file_server_key = path.join(self.pki_dir, "private/{}.key".format(cn)) | |
return self._run(args) | |
def build_client_cert(self, cn=None): | |
logger.info("Builing client certificiate") | |
if cn is None: | |
raise "CN must be specified" | |
args = [self.easyrsa_bin, "build-client-full", cn, "nopass"] | |
return self._run(args) | |
def build_tls_crypt_key(self): | |
logger.info("Generating tls crypt key") | |
args = [self.openvpn_bin, "--genkey", "secret", self.file_tls_crypt] | |
return self._run(args) | |
def renew_cert(self, cn): | |
""" | |
!!! Command can only be used when there are 30 days left | |
in the certificate's validity period. | |
"EASYRSA_CERT_RENEW": "30" # days | |
""" | |
logger.info("Renew certificiate") | |
if cn is None: | |
raise "CN must be specified" | |
args = [self.easyrsa_bin, "renew", cn, "nopass"] | |
return self._run(args) | |
def gen_crl(self): | |
logger.info("Generating certificate revocation list (CRL)") | |
args = [self.easyrsa_bin, "gen-crl"] | |
return self._run(args) | |
def revoke_client_cert(self, cn): | |
try: | |
if cn is None: | |
raise "CN must be specified" | |
args = [self.easyrsa_bin, "revoke", cn] | |
self._run(args) | |
return self.gen_crl() | |
except Exception as error: | |
logger.warning(f"Certificate not exists: {error}") | |
def init_all(self, dh_conext=None, server_cn=None): | |
self.gen_rnd() | |
self.init_pki() | |
self.build_ca() | |
self.pre_gen_dh(conext=dh_conext) | |
self.build_server_cert(cn=server_cn) | |
self.build_tls_crypt_key() | |
self.gen_crl() | |
def get_cert_expiry(self, file, direct=False): | |
if not direct: | |
cert_context = self._load(file) | |
else: | |
cert_context = file | |
try: | |
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_context) | |
expiry_date = x509.get_notAfter().decode() | |
return datetime.strptime(expiry_date, self.DATE_FORMAT) | |
except OpenSSL.crypto.Error as error: | |
logging.error(f"Error parsing certificate: {error}") | |
return None | |
def get_client_certs(self): | |
""" | |
STATUS: V:Active, R:Revoked, E:Expired, S:Suspended, C:Ceased | |
""" | |
import csv | |
certs = [] | |
with open(self.file_index) as indexfile: | |
for line in csv.reader(indexfile, dialect="excel-tab"): | |
if line[0]: | |
cert = {} | |
cert["status"] = line[0] | |
cert["expire_at"] = asntime_to_datetime(line[1]) | |
cert["revoke_at"] = asntime_to_datetime(line[2]) | |
cert["serial"] = line[3] | |
cert["reason"] = line[4] | |
cert["subject"] = line[5] | |
cert["identity"] = {} | |
for i in line[5].split("/"): | |
if len(i.strip()) == 0: | |
continue | |
k, v = i.split("=") | |
cert["identity"][k] = v.strip() | |
cert["name"] = cert["identity"]["CN"] | |
certs.append(cert) | |
return certs | |
def export_crl_json(self): | |
files_dict = {"ctl_verify": self.file_crl} | |
json_data = self._read_file_by_dict(files_dict) | |
return json_data | |
def export_init_json(self): | |
ca_expiry_at = self.get_cert_expiry(self.file_ca_crt) | |
item_dict = { | |
"dh": self.file_dh, | |
"ca": self.file_ca_crt, | |
"server_cert": self.file_server_cert, | |
"server_key": self.file_server_key, | |
"tls_crypt": self.file_tls_crypt, | |
"crl_verify": self.file_crl, | |
"index": self.file_index, | |
} | |
json_data = self._read_file_by_dict(item_dict) | |
json_data['ca_expiry_at'] = ca_expiry_at | |
return json_data | |
def export_client_json(self, cn=None): | |
client_cert = path.join(self.pki_dir, "issued/{}.crt".format(cn)) | |
client_key = path.join(self.pki_dir, "private/{}.key".format(cn)) | |
cert_expiry_at = self.get_cert_expiry(client_cert) | |
item_dict = {"client_cert": client_cert, "client_key": client_key} | |
json_data = self._read_file_by_dict(item_dict) | |
json_data['client_cert_expiry_at'] = cert_expiry_at | |
return json_data | |
if __name__ == "__main__": | |
maker = EasyRSA( | |
openvpn_dir=OVPN_DIR, | |
openvpn_bin=OVPN_BIN, | |
easyrsa_bin=EASYRSA_BIN, | |
openssl_bin=OPENSSL_BIN | |
) | |
# maker.gen_rnd() | |
# maker.init_pki() | |
# maker.gen_dh() | |
# maker.build_ca() | |
# maker.build_server_cert(cn="server_10010101") | |
# print(maker.build_client_cert(cn="user1")) | |
# maker.build_client_cert(cn="user2") | |
# maker.build_client_cert(cn="user3") | |
# maker.build_tls_crypt_key() | |
# maker.gen_crl() | |
# print(maker.revoke_client_cert(cn="user1")) | |
# print(maker.get_client_certs()) | |
# ret = maker.export_init_json() | |
# print(ret) | |
client_ret = maker.export_client_json(cn="user3") | |
print(client_ret) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment