Last active
November 24, 2024 01:40
-
-
Save mildsunrise/adb4068650484d9fe354a3ee4238eed3 to your computer and use it in GitHub Desktop.
π Open source implementation of FNMT's certificate configurator (https://twitter.com/mild_sunrise/status/1585611873860440067)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Open source implementation of FNMT's certificate configurator v4.0.6 | |
<https://www.sede.fnmt.gob.es/descargas/descarga-software/instalacion-software-generacion-de-claves> | |
No warranty provided; use this ONLY if you know what you're doing. | |
Usage: ./fnmt_handle.py <fnmtcr URL> | |
Fulfills the request indicated by the URL, sending request to answer operation as completed if there are no errors. | |
For the fnmtcr://request phase, the generated private key is written, unencrypted, to "privkey.pem" in current directory. | |
For the fnmtcr://install phase, the received PKCS#7 / X.509 blob is written to "cert.der" in current directory. | |
After both 'request' and 'install' are done, you can build a PKCS#12 store from them: | |
$ openssl pkcs7 -print_certs -inform der -in cert.der -out certs.cer | |
$ openssl pkcs12 -export -in certs.cer -inkey privkey.pem -out certificate.p12 | |
''' | |
import sys | |
import gzip | |
from cryptography.x509 import CertificateSigningRequestBuilder, Name | |
from cryptography.hazmat.primitives import serialization, ciphers, asymmetric, hashes | |
from urllib.parse import urlparse, parse_qs, urlencode, unquote | |
from urllib.request import urlopen, Request | |
from base64 import urlsafe_b64encode, urlsafe_b64decode | |
from xml.etree import ElementTree | |
try: | |
from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES | |
except ImportError: | |
from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES | |
# MAIN CODE | |
def __main__(): | |
args = sys.argv[1:] | |
if len(args) != 1: | |
print(__doc__.strip(), file=sys.stderr) | |
exit(2) | |
url, = args | |
url = urlparse(url) | |
assert url.scheme == 'fnmtcr', f'Unexpected URL schema: {repr(url.scheme)}' | |
handlers = { 'request': do_request, 'install': do_install } | |
assert url.netloc in handlers, f'Unimplemented operation {repr(url.netloc)}, try with official configurator' | |
handler = handlers[url.netloc] | |
assert url.path == '', f'Unexpected URL path: {repr(url.path)}' | |
params = ensure_unique(parse_qs(url.query)) | |
if 'rtservlet' in params: | |
retrieve_parameters(params) | |
common_params = {} | |
for k in { 'rtservlet', 'stservlet', 'key', 'fileid' }: | |
if k in params: | |
common_params[k] = params.pop(k) | |
answer_data = handler(params) | |
send_data(answer_data, common_params) | |
# HANDLERS | |
def do_request(params: dict[str, str]) -> bytes: | |
keytype = params.pop('keytype', 'rsa').lower() | |
keylength = int(params.pop('keylength', '2048')) | |
forcecard = params.pop('forcecard', None) | |
assert keytype == 'rsa' and forcecard != 'true' and not params, \ | |
'Unimplemented "request" parameters, try with official configurator' | |
key = asymmetric.rsa.generate_private_key(65537, keylength) | |
with open('privkey.pem', 'xb') as f: | |
f.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption())) | |
return CertificateSigningRequestBuilder(Name.from_rfc4514_string('CN=foo.bar.com')) \ | |
.sign(key, hashes.SHA256()).public_bytes(serialization.Encoding.DER) | |
def do_install(params: dict[str, str]) -> bytes: | |
assert 'cert' in params, '"cert" parameter not present, I do not know what to do' | |
cert = params.pop('cert') | |
if params: | |
print(f'warning: Unknown parameters: {params}') | |
cert = urlsafe_b64decode(cert) | |
try: | |
cert = gzip.decompress(cert) | |
except gzip.BadGzipFile: | |
print('note: Invalid GZIP, assuming not compressed.') | |
with open('cert.der', 'xb') as f: | |
f.write(cert) | |
return b'OK' | |
# SERVER COMMUNICATION | |
def retrieve_parameters(params: dict[str, str]): | |
retrieval_servlet_url = params['rtservlet'] | |
print('requesting additional parameters to:', repr(retrieval_servlet_url)) | |
body = { 'op': 'get', 'v': '1', 'id': params['fileid'] } | |
with urlopen(Request(validate_url(retrieval_servlet_url + '?' + urlencode(body)), method='GET')) as resp: | |
body = resp.read().decode() | |
assert not body.upper().startswith('ERR'), f'error response from server: {body}' | |
body = decipher_data(body, params['key'].encode()).decode() if params.get('key') else body | |
for node in ElementTree.fromstring(body): | |
assert node.tag == 'e' and set(node.attrib) == {'k', 'v'} and not list(node), f'unexpected node {node}' | |
params[node.attrib['k']] = unquote(node.attrib['v']) | |
def send_data(data: bytes, params: dict[str, str]): | |
storage_servlet_url = params['stservlet'] | |
print('submitting completion request to:', repr(storage_servlet_url)) | |
enc = cipher_data(data, params.get('key', '').encode()) | |
body = { 'op': 'put', 'v': '1_0', 'id': params['fileid'], 'dat': enc } | |
with urlopen(Request(validate_url(storage_servlet_url), urlencode(body).encode('ascii'), method='POST')) as resp: | |
body = resp.read() | |
print(f'server response: status={resp.code}, body={body}') | |
assert body.strip() == b'OK', 'unexpected response from server' | |
CIPHER_ALG = TripleDES | |
def cipher_data(data: bytes, key: bytes) -> str: | |
if not key: | |
return urlsafe_b64encode(data).decode('ascii') | |
assert len(key) == 8, 'invalid key length' | |
padding_len = (-len(data)) % (CIPHER_ALG.block_size // 8) | |
data += b'\0' * padding_len | |
cipher = ciphers.Cipher(CIPHER_ALG(key), ciphers.modes.ECB()).encryptor() | |
data = cipher.update(data) + cipher.finalize() | |
return f'{padding_len}.' + urlsafe_b64encode(data).decode('ascii') | |
def decipher_data(enc: str, key: bytes) -> bytes: | |
if not key: | |
return urlsafe_b64decode(enc) | |
assert len(key) == 8, 'invalid key length' | |
if not ( (idx := enc.find('.')) != -1 and (padding_len := enc[:idx]).isdigit() and (padding_len := int(padding_len)) < 8 ): | |
raise AssertionError('invalid encrypted data') | |
data = urlsafe_b64decode(enc[idx + 1:]) | |
cipher = ciphers.Cipher(CIPHER_ALG(key), ciphers.modes.ECB()).decryptor() | |
data = cipher.update(data) + cipher.finalize() | |
return data[:len(data) - padding_len] | |
# OTHER | |
def ensure_unique(params: dict[str, list[str]]) -> dict[str, str]: | |
new_params = {} | |
for k, v in params.items(): | |
assert len(v) == 1, f'Unexpected duplicate parameter {repr(k)}' | |
new_params[k] = v[0] | |
return new_params | |
def validate_url(url: str) -> str: | |
parsed = urlparse(url) | |
allowed_suffixes = [ '.fnmt.es', '.fnmt.gob.es' ] | |
if not (parsed.scheme.lower() == 'https' and any(parsed.hostname.lower().endswith(suffix) for suffix in allowed_suffixes)): | |
answer = input(f'allow request to {repr(parsed.hostname)}? [y/n]: ') | |
assert answer == 'y', 'user rejected request' | |
return url | |
if __name__ == '__main__': __main__() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment