Created
March 18, 2026 10:32
-
-
Save jborean93/d999edb6b51ac9bd54040b9e7e230eb9 to your computer and use it in GitHub Desktop.
CredSSP Test Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # Copyright: (c) 2026, Jordan Borean (@jborean93) <jborean93@gmail.com> | |
| # MIT License (see LICENSE or https://opensource.org/licenses/MIT) | |
| # /// script | |
| # dependencies = [ | |
| # "gssapi", | |
| # "krb5", | |
| # "cryptography", | |
| # "pyasn1", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import base64 | |
| import datetime | |
| import enum | |
| import getpass | |
| import os | |
| import pathlib | |
| import ssl | |
| import sys | |
| import tempfile | |
| import traceback | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| import krb5 | |
| import gssapi | |
| from cryptography import x509 | |
| from cryptography.x509.oid import NameOID | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric import rsa | |
| from pyasn1.codec.der import decoder as der_decoder, encoder as der_encoder | |
| from pyasn1.type import univ, namedtype, namedval, tag, char, useful | |
| class KnownOID(str, enum.Enum): | |
| """Known OIDs for security mechanisms.""" | |
| SPNEGO = "1.3.6.1.5.5.2" | |
| KERBEROS_5 = "1.2.840.113554.1.2.2" | |
| KERBEROS_5_U2U = "1.2.840.113554.1.2.2.3" | |
| MS_KERBEROS = "1.2.840.48018.1.2.2" | |
| NTLM = "1.3.6.1.4.1.311.2.2.10" | |
| NEGOEX = "1.3.6.1.4.1.311.2.2.30" | |
| IAKERB = "1.3.6.1.5.2.5" | |
| @classmethod | |
| def get_name(cls, oid_str: str) -> str: | |
| """Get the friendly name for an OID.""" | |
| try: | |
| oid_enum = cls(oid_str) | |
| friendly_name = oid_enum.name.replace('_', ' ').title() | |
| return f"{friendly_name} ({oid_str})" | |
| except ValueError: | |
| return oid_str | |
| class KerberosPrincipalNameType(enum.IntEnum): | |
| """Kerberos Principal Name Types.""" | |
| UNKNOWN = 0 | |
| NT_PRINCIPAL = 1 | |
| NT_SRV_INST = 2 | |
| NT_SRV_HST = 3 | |
| NT_SRV_XHST = 4 | |
| NT_UID = 5 | |
| NT_X500_PRINCIPAL = 6 | |
| NT_SMTP_NAME = 7 | |
| NT_ENTERPRISE_PRINCIPAL = 10 | |
| @classmethod | |
| def get_name(cls, type_id: int) -> str: | |
| """Get the friendly name for a principal name type.""" | |
| try: | |
| friendly_name = cls(type_id).name | |
| except ValueError: | |
| friendly_name = "Unknown" | |
| return f"{friendly_name} ({type_id})" | |
| class NegState(enum.IntEnum): | |
| """SPNEGO NegTokenResp negState values.""" | |
| ACCEPT_COMPLETED = 0 | |
| ACCEPT_INCOMPLETE = 1 | |
| REJECT = 2 | |
| REQUEST_MIC = 3 | |
| @classmethod | |
| def get_name(cls, state: int) -> str: | |
| """Get the friendly name for a negState value.""" | |
| try: | |
| friendly_name = cls(state).name.replace('_', '-').lower() | |
| except ValueError: | |
| friendly_name = "unknown" | |
| return f"{friendly_name} ({state})" | |
| class MechType(univ.ObjectIdentifier): | |
| """Mechanism type OID. | |
| ASN.1 Definition: | |
| MechType ::= OBJECT IDENTIFIER | |
| """ | |
| pass | |
| class MechTypeList(univ.SequenceOf): | |
| """List of mechanism OIDs. | |
| ASN.1 Definition: | |
| MechTypeList ::= SEQUENCE OF MechType | |
| """ | |
| componentType = MechType() | |
| class ContextFlags(univ.BitString): | |
| """Context flags for SPNEGO. | |
| ASN.1 Definition: | |
| ContextFlags ::= BIT STRING { | |
| delegFlag (0), | |
| mutualFlag (1), | |
| replayFlag (2), | |
| sequenceFlag (3), | |
| anonFlag (4), | |
| confFlag (5), | |
| integFlag (6) | |
| } (SIZE (32)) | |
| """ | |
| namedValues = namedval.NamedValues( | |
| ('delegFlag', 0), | |
| ('mutualFlag', 1), | |
| ('replayFlag', 2), | |
| ('sequenceFlag', 3), | |
| ('anonFlag', 4), | |
| ('confFlag', 5), | |
| ('integFlag', 6) | |
| ) | |
| class APOptions(univ.BitString): | |
| """APOptions for AP-REQ | |
| ASN.1 Definition: | |
| APOptions ::= KerberosFlags | |
| -- reserved(0), | |
| -- use-session-key(1), | |
| -- mutual-required(2) | |
| """ | |
| namedValues = namedval.NamedValues( | |
| ('reserved', 0), | |
| ('use-session-key', 1), | |
| ('mutual-required', 2) | |
| ) | |
| class NegTokenInit(univ.Sequence): | |
| """SPNEGO NegTokenInit structure. | |
| ASN.1 Definition: | |
| NegTokenInit ::= SEQUENCE { | |
| mechTypes [0] MechTypeList, | |
| reqFlags [1] ContextFlags OPTIONAL, | |
| mechToken [2] OCTET STRING OPTIONAL, | |
| mechListMIC [3] OCTET STRING OPTIONAL, | |
| ... | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'mechTypes', | |
| MechTypeList().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'reqFlags', | |
| ContextFlags().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'mechToken', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'mechListMIC', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ) | |
| ) | |
| class NegTokenResp(univ.Sequence): | |
| """SPNEGO NegTokenResp structure. | |
| ASN.1 Definition: | |
| NegTokenResp ::= SEQUENCE { | |
| negState [0] ENUMERATED { | |
| accept-completed (0), | |
| accept-incomplete (1), | |
| reject (2), | |
| request-mic (3) | |
| } OPTIONAL, | |
| supportedMech [1] MechType OPTIONAL, | |
| responseToken [2] OCTET STRING OPTIONAL, | |
| mechListMIC [3] OCTET STRING OPTIONAL, | |
| ... | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.OptionalNamedType( | |
| 'negState', | |
| univ.Enumerated().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'supportedMech', | |
| MechType().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'responseToken', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'mechListMIC', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ) | |
| ) | |
| class NegotiationToken(univ.Choice): | |
| """SPNEGO NegotiationToken. | |
| ASN.1 Definition: | |
| NegotiationToken ::= CHOICE { | |
| negTokenInit [0] NegTokenInit, | |
| negTokenResp [1] NegTokenResp | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'negTokenInit', | |
| NegTokenInit().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'negTokenResp', | |
| NegTokenResp().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ) | |
| ) | |
| class KerberosString(char.GeneralString): | |
| """Kerberos String type. | |
| ASN.1 Definition: | |
| KerberosString ::= GeneralString (IA5String) | |
| """ | |
| pass | |
| class PrincipalName(univ.Sequence): | |
| """Kerberos PrincipalName. | |
| ASN.1 Definition: | |
| PrincipalName ::= SEQUENCE { | |
| name-type [0] Int32, | |
| name-string [1] SEQUENCE OF KerberosString | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'name-type', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'name-string', | |
| univ.SequenceOf(componentType=KerberosString()).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ) | |
| ) | |
| class KerbU2UTgtRequest(univ.Sequence): | |
| """Kerberos U2U TGT Request. | |
| ASN.1 Definition: | |
| KERB-TGT-REQUEST ::= SEQUENCE { | |
| pvno [0] INTEGER, | |
| msg-type [1] INTEGER, | |
| server-name [2] PrincipalName OPTIONAL, | |
| realm [3] Realm OPTIONAL | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'pvno', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'msg-type', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'server-name', | |
| PrincipalName().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'realm', | |
| KerberosString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ) | |
| ) | |
| class KerbU2UTgtReply(univ.Sequence): | |
| """Kerberos U2U TGT Reply. | |
| ASN.1 Definition: | |
| KERB-TGT-REPLY ::= SEQUENCE { | |
| pvno [0] INTEGER, | |
| msg-type [1] INTEGER, | |
| ticket [2] Ticket | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'pvno', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'msg-type', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'ticket', | |
| univ.Any().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ) | |
| ) | |
| class KerberosTime(useful.GeneralizedTime): | |
| """Kerberos Time type. | |
| ASN.1 Definition: | |
| KerberosTime ::= GeneralizedTime -- with no fractional seconds | |
| """ | |
| pass | |
| class Microseconds(univ.Integer): | |
| """Kerberos Microseconds type. | |
| ASN.1 Definition: | |
| Microseconds ::= INTEGER (0..999999) | |
| """ | |
| pass | |
| class KrbError(univ.Sequence): | |
| """Kerberos Error structure. | |
| ASN.1 Definition: | |
| KRB-ERROR ::= [APPLICATION 30] SEQUENCE { | |
| pvno [0] INTEGER (5), | |
| msg-type [1] INTEGER (30), | |
| ctime [2] KerberosTime OPTIONAL, | |
| cusec [3] Microseconds OPTIONAL, | |
| stime [4] KerberosTime, | |
| susec [5] Microseconds, | |
| error-code [6] Int32, | |
| crealm [7] Realm OPTIONAL, | |
| cname [8] PrincipalName OPTIONAL, | |
| realm [9] Realm -- service realm --, | |
| sname [10] PrincipalName -- service name --, | |
| e-text [11] KerberosString OPTIONAL, | |
| e-data [12] OCTET STRING OPTIONAL | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'pvno', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'msg-type', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'ctime', | |
| KerberosTime().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'cusec', | |
| Microseconds().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'stime', | |
| KerberosTime().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'susec', | |
| Microseconds().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'error-code', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'crealm', | |
| KerberosString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 7) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'cname', | |
| PrincipalName().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'realm', | |
| KerberosString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 9) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'sname', | |
| PrincipalName().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 10) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'e-text', | |
| KerberosString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 11) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'e-data', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 12) | |
| ) | |
| ) | |
| ) | |
| tagSet = univ.Sequence.tagSet.tagExplicitly( | |
| tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 30) | |
| ) | |
| class EncryptedData(univ.Sequence): | |
| """Kerberos EncryptedData structure. | |
| ASN.1 Definition: | |
| EncryptedData ::= SEQUENCE { | |
| etype [0] Int32, | |
| kvno [1] UInt32 OPTIONAL, | |
| cipher [2] OCTET STRING | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'etype', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'kvno', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'cipher', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ) | |
| ) | |
| class Ticket(univ.Sequence): | |
| """Kerberos Ticket structure. | |
| ASN.1 Definition: | |
| Ticket ::= [APPLICATION 1] SEQUENCE { | |
| tkt-vno [0] INTEGER (5), | |
| realm [1] Realm, | |
| sname [2] PrincipalName, | |
| enc-part [3] EncryptedData | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'tkt-vno', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'realm', | |
| KerberosString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'sname', | |
| PrincipalName().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'enc-part', | |
| EncryptedData().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ) | |
| ) | |
| tagSet = univ.Sequence.tagSet.tagExplicitly( | |
| tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 1) | |
| ) | |
| class KerberosApReq(univ.Sequence): | |
| """Kerberos AP-REQ structure (inner SEQUENCE, decode after stripping APPLICATION 14). | |
| ASN.1 Definition: | |
| AP-REQ ::= [APPLICATION 14] SEQUENCE { | |
| pvno [0] INTEGER (5), | |
| msg-type [1] INTEGER (14), | |
| ap-options [2] APOptions, | |
| ticket [3] Ticket, | |
| authenticator [4] EncryptedData | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'pvno', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'msg-type', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'ap-options', | |
| APOptions().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'ticket', | |
| Ticket().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'authenticator', | |
| EncryptedData().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4) | |
| ) | |
| ) | |
| ) | |
| tagSet = univ.Sequence.tagSet.tagExplicitly( | |
| tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 14) | |
| ) | |
| class NegoToken(univ.Sequence): | |
| """NegoToken structure. | |
| ASN.1 Definition: | |
| NegoToken ::= SEQUENCE { | |
| negoToken [0] OCTET STRING | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'negoToken', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ) | |
| ) | |
| class NegoData(univ.SequenceOf): | |
| """NegoData structure. | |
| ASN.1 Definition: | |
| NegoData ::= SEQUENCE OF NegoToken | |
| """ | |
| componentType = NegoToken() | |
| class TSCredentials(univ.Sequence): | |
| """TSCredentials structure (inside authInfo after decryption). | |
| ASN.1 Definition: | |
| TSCredentials ::= SEQUENCE { | |
| credType [0] INTEGER, | |
| credentials [1] OCTET STRING | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'credType', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'credentials', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ) | |
| ) | |
| class TSPasswordCreds(univ.Sequence): | |
| """TSPasswordCreds structure (credType 1). | |
| ASN.1 Definition: | |
| TSPasswordCreds ::= SEQUENCE { | |
| domainName [0] OCTET STRING, | |
| userName [1] OCTET STRING, | |
| password [2] OCTET STRING | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.NamedType( | |
| 'domainName', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'userName', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.NamedType( | |
| 'password', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ) | |
| ) | |
| class TSRequest(univ.Sequence): | |
| """TSRequest structure for CredSSP. | |
| ASN.1 Definition: | |
| TSRequest ::= SEQUENCE { | |
| version [0] INTEGER, | |
| negoTokens [1] NegoData OPTIONAL, | |
| authInfo [2] OCTET STRING OPTIONAL, | |
| pubKeyAuth [3] OCTET STRING OPTIONAL, | |
| errorCode [4] INTEGER OPTIONAL, | |
| clientNonce [5] OCTET STRING OPTIONAL | |
| } | |
| """ | |
| componentType = namedtype.NamedTypes( | |
| namedtype.OptionalNamedType( | |
| 'version', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'negoTokens', | |
| NegoData().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'authInfo', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'pubKeyAuth', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'errorCode', | |
| univ.Integer().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4) | |
| ) | |
| ), | |
| namedtype.OptionalNamedType( | |
| 'clientNonce', | |
| univ.OctetString().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5) | |
| ) | |
| ) | |
| ) | |
| def extract_initial_context_token( | |
| token: bytes, | |
| context: str = "", | |
| silent: bool = False | |
| ) -> tuple[str, bytes]: | |
| """Extract and display InitialContextToken with thisMech OID and inner bytes. | |
| Args: | |
| token: InitialContextToken bytes | |
| context: Context label prefix for display (e.g., "Incoming", "Outgoing") | |
| silent: If True, suppress printing | |
| Returns: | |
| Tuple of (thisMech OID string, inner bytes) | |
| """ | |
| if context: | |
| header = f"[{context} InitialContextToken]" | |
| else: | |
| header = "[InitialContextToken]" | |
| offset = 1 | |
| if token[offset] & 0x80: | |
| len_octets = token[offset] & 0x7F | |
| offset += 1 + len_octets | |
| else: | |
| offset += 1 | |
| if token[offset] != 0x06: | |
| raise ValueError(f"{header} Expected OID tag, got 0x{token[offset]:02x}") | |
| oid_len = token[offset + 1] | |
| oid_bytes = token[offset:offset + 2 + oid_len] | |
| oid_obj, _ = der_decoder.decode(oid_bytes) | |
| oid_str = str(oid_obj) | |
| offset += 2 + oid_len | |
| inner_bytes = token[offset:] | |
| if not silent: | |
| print(header) | |
| print(f" thisMech: {KnownOID.get_name(oid_str)}") | |
| print(f" innerContextToken: {inner_bytes.hex()}") | |
| print() | |
| return oid_str, inner_bytes | |
| def pretty_print_asn1(asn1_obj, indent=0, name=""): | |
| """Pretty print ASN.1 structure showing only field names and values. | |
| Args: | |
| asn1_obj: ASN.1 object to print | |
| indent: Current indentation level | |
| name: Name/label for this object | |
| """ | |
| prefix = " " * indent | |
| # Handle different types | |
| if isinstance(asn1_obj, univ.Enumerated): | |
| # Handle Enumerated (for NegTokenResp.negState) | |
| enum_val = int(asn1_obj) | |
| if name.endswith(" negState"): | |
| enum_name = NegState.get_name(enum_val) | |
| else: | |
| enum_name = str(enum_val) | |
| if name: | |
| print(f"{prefix}{name}: {enum_name}") | |
| else: | |
| print(f"{prefix}{enum_name})") | |
| elif isinstance(asn1_obj, (univ.Integer, univ.Boolean)): | |
| raw_int_val = int(asn1_obj) | |
| if name: | |
| if name == '[0] name-type': | |
| raw_int_val = KerberosPrincipalNameType.get_name(raw_int_val) | |
| print(f"{prefix}{name}: {raw_int_val}") | |
| else: | |
| print(f"{prefix}{raw_int_val}") | |
| elif isinstance(asn1_obj, char.GeneralString): | |
| str_val = bytes(asn1_obj).decode('utf-8') | |
| if name: | |
| print(f"{prefix}{name}: {str_val}") | |
| else: | |
| print(f"{prefix}{str_val}") | |
| elif isinstance(asn1_obj, univ.OctetString): | |
| data = bytes(asn1_obj) | |
| if name: | |
| print(f"{prefix}{name}: {data.hex()}") | |
| else: | |
| print(f"{prefix}{data.hex()}") | |
| elif isinstance(asn1_obj, univ.ObjectIdentifier): | |
| oid_str = str(asn1_obj) | |
| oid_display = KnownOID.get_name(oid_str) | |
| if name: | |
| print(f"{prefix}{name}: {oid_display}") | |
| else: | |
| print(f"{prefix}{oid_display}") | |
| elif isinstance(asn1_obj, univ.BitString): | |
| # Handle BitString (for ContextFlags) | |
| bits = asn1_obj | |
| # Get hex representation | |
| hex_val = bits.hexValue if hasattr(bits, 'hexValue') else None | |
| # Try to decode named bits if ContextFlags | |
| flag_names = [] | |
| if hasattr(bits, 'namedValues') and bits.namedValues: | |
| # Get the bit positions that are set | |
| for bit_name, bit_pos in bits.namedValues.items(): | |
| if len(bits) > bit_pos and bits[bit_pos]: | |
| flag_names.append(bit_name) | |
| if name: | |
| if flag_names: | |
| print(f"{prefix}{name}: {', '.join(flag_names)} ({hex_val or bits.prettyPrint()})") | |
| else: | |
| print(f"{prefix}{name}: {hex_val or bits.prettyPrint()}") | |
| else: | |
| if flag_names: | |
| print(f"{prefix}{', '.join(flag_names)} ({hex_val or bits.prettyPrint()})") | |
| else: | |
| print(f"{prefix}{hex_val or bits.prettyPrint()}") | |
| elif isinstance(asn1_obj, univ.Choice): | |
| # Handle Choice (for NegotiationToken) | |
| if name: | |
| print(f"{prefix}{name}:") | |
| # Get the chosen component | |
| chosen_name = asn1_obj.getName() | |
| chosen_value = asn1_obj.getComponent() | |
| if chosen_value is not None: | |
| pretty_print_asn1(chosen_value, indent + 1, chosen_name) | |
| elif isinstance(asn1_obj, (univ.Sequence, univ.Set)): | |
| # Print structure name if provided | |
| if name: | |
| print(f"{prefix}{name}:") | |
| # Print components | |
| if hasattr(asn1_obj, 'componentType') and asn1_obj.componentType: | |
| # Named components | |
| try: | |
| for idx, named_type in enumerate(asn1_obj.componentType.namedTypes): | |
| component_name = named_type.name | |
| try: | |
| component = asn1_obj[component_name] | |
| if component is not None and (not hasattr(component, 'hasValue') or component.hasValue()): | |
| pretty_print_asn1(component, indent + 1, f"[{idx}] {component_name}") | |
| except Exception as e: | |
| print(f"{prefix} [Error reading {component_name}: {e}]") | |
| except Exception as e: | |
| # Fallback to unnamed | |
| print(f"{prefix} [Error reading named types: {e}]") | |
| for idx in range(len(asn1_obj)): | |
| try: | |
| component = asn1_obj[idx] | |
| if component is not None: | |
| pretty_print_asn1(component, indent + 1, f"[{idx}]") | |
| except Exception as e: | |
| print(f"{prefix} [Error reading component {idx}: {e}]") | |
| else: | |
| # Unnamed components | |
| for idx in range(len(asn1_obj)): | |
| try: | |
| component = asn1_obj[idx] | |
| if component is not None: | |
| pretty_print_asn1(component, indent + 1, f"[{idx}]") | |
| except Exception as e: | |
| print(f"{prefix} [Error reading component {idx}: {e}]") | |
| elif isinstance(asn1_obj, univ.SequenceOf): | |
| if name: | |
| print(f"{prefix}{name}:") | |
| for idx, item in enumerate(asn1_obj): | |
| pretty_print_asn1(item, indent + 1, f"[{idx}]") | |
| else: | |
| # Generic handling | |
| if hasattr(asn1_obj, 'hasValue') and asn1_obj.hasValue(): | |
| try: | |
| if name: | |
| print(f"{prefix}{name}: {asn1_obj.prettyPrint()}") | |
| else: | |
| print(f"{prefix}{asn1_obj.prettyPrint()}") | |
| except Exception as e: | |
| print(f"{prefix}[Error printing value: {e}]") | |
| def generate_self_signed_cert(hostname: str) -> tuple[bytes, bytes, bytes]: | |
| """Generate an ephemeral self-signed certificate for CredSSP TLS. | |
| Args: | |
| hostname: Hostname to use in certificate CN | |
| Returns: | |
| Tuple of (certificate_pem, private_key_pem) | |
| """ | |
| # Generate private key | |
| private_key = rsa.generate_private_key( | |
| public_exponent=65537, | |
| key_size=2048, | |
| ) | |
| # Build certificate | |
| subject = issuer = x509.Name([ | |
| x509.NameAttribute(NameOID.COMMON_NAME, hostname), | |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, "CredSSP Test"), | |
| ]) | |
| cert = x509.CertificateBuilder().subject_name( | |
| subject | |
| ).issuer_name( | |
| issuer | |
| ).public_key( | |
| private_key.public_key() | |
| ).serial_number( | |
| x509.random_serial_number() | |
| ).not_valid_before( | |
| datetime.datetime.now(datetime.timezone.utc) | |
| ).not_valid_after( | |
| datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=1) | |
| ).add_extension( | |
| x509.SubjectAlternativeName([ | |
| x509.DNSName(hostname), | |
| ]), | |
| critical=False, | |
| ).sign(private_key, hashes.SHA256()) | |
| # Serialize to PEM | |
| cert_pem = cert.public_bytes(serialization.Encoding.PEM) | |
| key_pem = private_key.private_bytes( | |
| encoding=serialization.Encoding.PEM, | |
| format=serialization.PrivateFormat.TraditionalOpenSSL, | |
| encryption_algorithm=serialization.NoEncryption() | |
| ) | |
| # Calculate public key hash for CredSSP | |
| public_key = cert.public_key().public_bytes( | |
| encoding=serialization.Encoding.DER, | |
| format=serialization.PublicFormat.PKCS1, | |
| ) | |
| return cert_pem, key_pem, public_key | |
| class CredSSPContext: | |
| """Manages CredSSP TLS handshake and GSSAPI U2U authentication.""" | |
| def __init__( | |
| self, | |
| cert_pem: bytes, | |
| key_pem: bytes, | |
| server_public_key: bytes, | |
| keytab: bytes | None, | |
| service_tgt: tuple[krb5.Principal, krb5.KeyBlock, bytes] | None, | |
| ) -> None: | |
| """Initialize CredSSP context. | |
| Args: | |
| cert_pem: Server certificate in PEM format | |
| key_pem: Private key in PEM format | |
| server_public_key: The server's TLS public key | |
| keytab: Path to keytab file to use for GSSAPI credential acquisition | |
| service_tgt: The TGT to use for Kerberos U2U authentication | |
| """ | |
| self.tls_established = False | |
| self.gssapi_context: gssapi.SecurityContext | None = None | |
| self.completed = False | |
| self.server_public_key = server_public_key | |
| self.kerberos_service_tgt = service_tgt | |
| self._kerberos_u2u_continuation: bool = False | |
| # Acquire credentials from keytab without setting KRB5_KTNAME | |
| # Using GSS_C_NO_NAME (None) allows GSSAPI to select the principal | |
| # based on the incoming request's target name | |
| self.gssapi_cred: gssapi.Credentials | None = None | |
| if keytab: | |
| self.gssapi_creds = gssapi.Credentials( | |
| usage='accept', | |
| store={ | |
| 'keytab': keytab, | |
| }, | |
| ) | |
| # Create SSL context with memory BIO | |
| self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
| # Write cert and key to temp files (required by ssl module) | |
| with tempfile.NamedTemporaryFile(mode='wb',suffix='.pem') as temp_cert: | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.pem') as temp_key: | |
| temp_cert.write(cert_pem) | |
| temp_cert.flush() | |
| temp_key.write(key_pem) | |
| temp_key.flush() | |
| self.ssl_context.load_cert_chain(temp_cert.name, temp_key.name) | |
| # Create memory BIO | |
| self.incoming_bio = ssl.MemoryBIO() | |
| self.outgoing_bio = ssl.MemoryBIO() | |
| # Create SSL object | |
| self.ssl_obj = self.ssl_context.wrap_bio( | |
| self.incoming_bio, | |
| self.outgoing_bio, | |
| server_side=True | |
| ) | |
| def process_tls_token(self, client_token: bytes) -> bytes | None: | |
| """Process a TLS token from the client. | |
| Args: | |
| client_token: TLS record from client | |
| Returns: | |
| TLS response to send to client, or None | |
| """ | |
| # Write client data to incoming BIO | |
| self.incoming_bio.write(client_token) | |
| # Try to do handshake or read data | |
| try: | |
| if not self.tls_established: | |
| self.ssl_obj.do_handshake() | |
| self.tls_established = True | |
| print("[TLS] Handshake complete") | |
| else: | |
| # TLS is established, try to read application data | |
| app_data = self.ssl_obj.read(16384) | |
| if app_data: | |
| print(f"\n[TLS] Decrypted application data ({len(app_data)} bytes):") | |
| print(f" {app_data.hex()}\n") | |
| # This should be a TSRequest token. | |
| return self._process_ts_request(app_data) | |
| except ssl.SSLWantReadError: | |
| pass | |
| except ssl.SSLWantWriteError: | |
| pass | |
| except Exception as e: | |
| print(f"[TLS] Error: {e}") | |
| traceback.print_exc() | |
| # Read any data from outgoing BIO | |
| if self.outgoing_bio.pending: | |
| response = self.outgoing_bio.read() | |
| return response | |
| return None | |
| def _process_ts_request(self, token: bytes) -> bytes | None: | |
| """Process CredSSP message (after TLS). | |
| Handles three stages: | |
| 1. Authentication (negoTokens) | |
| 2. Public key validation (pubKeyAuth) | |
| 3. Credentials (authInfo) | |
| Args: | |
| token: CredSSP TSRequest | |
| Returns: | |
| Encrypted response token to send back through TLS | |
| """ | |
| try: | |
| ts_request, _ = der_decoder.decode(token, asn1Spec=TSRequest()) | |
| print("[Incoming TSRequest]") | |
| pretty_print_asn1(ts_request) | |
| print() | |
| output_token = None | |
| pub_key_auth = None | |
| if ts_request['negoTokens'].hasValue(): | |
| output_token = self._process_nego_tokens(bytes(ts_request['negoTokens'][0]['negoToken'])) | |
| if ts_request['pubKeyAuth'].hasValue(): | |
| client_nonce = bytes(ts_request['clientNonce']) if ts_request['clientNonce'].hasValue() else None | |
| pub_key_auth = self._process_pubkey_auth(bytes(ts_request['pubKeyAuth']), client_nonce) | |
| if ts_request['authInfo'].hasValue(): | |
| self._process_credentials(bytes(ts_request['authInfo'])) | |
| if output_token or pub_key_auth: | |
| return self._build_response(output_token, pub_key_auth) | |
| return None | |
| except Exception as e: | |
| print(f"[TSRequest] Processing error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _process_nego_tokens(self, nego_token: bytes) -> bytes | None: | |
| """Process negoTokens and return output token.""" | |
| print("[CredSSP] Processing negoTokens") | |
| is_spnego = False | |
| kerberos_u2u_continuation = self._kerberos_u2u_continuation | |
| self._kerberos_u2u_continuation = False | |
| build_spnego_resp = False | |
| if self.gssapi_context: | |
| print("[CredSSP] Continuing existing GSSAPI context with new negoToken") | |
| output_token = self.gssapi_context.step(nego_token) | |
| else: | |
| inner_context_token = None | |
| if kerberos_u2u_continuation: | |
| # This is the second call done in a Kerberos U2U exchange. We | |
| # expect either a SPNEGO NegTokenResp, an AP-REQ wrapped in an | |
| # InitialContextToken, or an NTLM token (if we sent NTLM mech). | |
| if nego_token[0] == 0xA1: | |
| # As we are unwrapping the SPNEGO wrapper before calling | |
| # GSSAPI we need to rebuild the response that the initiator | |
| # expects. | |
| build_spnego_resp = True | |
| spnego_info = self._extract_spnego_mech_token(nego_token) | |
| if not spnego_info: | |
| return None | |
| nego_token = spnego_info[0] | |
| # Check if it's NTLM (fallback from failed U2U) | |
| if nego_token.startswith(b"NTLMSSP\x00"): | |
| print("[CredSSP] U2U continuation received NTLM token due to U2U failure, this is not handled (yet)") | |
| return None | |
| if nego_token[0] != 0x60: | |
| print("[CredSSP] Did not find InitialContextToken in U2U AP-REQ payload") | |
| return None | |
| # The InitialContextToken uses the Kerberos U2U OID which GSSAPI | |
| # doesn't understand, we rewrap it with the Kerberos OID so that | |
| # GSSAPI will process it. | |
| inner_context_token = extract_initial_context_token(nego_token, context="U2U AP-REQ") | |
| nego_token = self._build_initial_context_token(KnownOID.KERBEROS_5.value, inner_context_token[1]) | |
| if nego_token[0] == 0x60: | |
| inner_context_token = extract_initial_context_token(nego_token, context="Incoming") | |
| if inner_context_token[0] == KnownOID.SPNEGO: | |
| is_spnego = True | |
| spnego_info = self._extract_spnego_mech_token(inner_context_token[1]) | |
| if not spnego_info: | |
| return None | |
| token_bytes, token_mech = spnego_info | |
| inner_context_token = extract_initial_context_token(token_bytes, context="SPNEGO mechToken") | |
| else: | |
| token_bytes = nego_token | |
| token_mech = KnownOID.KERBEROS_5.value | |
| elif nego_token[0] == 0x6E: | |
| # This is a raw AP-REQ for Kerberos. | |
| token_mech = KnownOID.KERBEROS_5.value | |
| elif nego_token.startswith(b"NTLMSSP\x00"): | |
| print("[CredSSP] Detected NTLM token, not implemented yet") | |
| return None | |
| else: | |
| print("[CredSSP] Unsupported negoToken format, cannot process") | |
| return None | |
| if token_mech in [KnownOID.MS_KERBEROS, KnownOID.KERBEROS_5]: | |
| output_token = self._process_kerberos_token( | |
| nego_token, | |
| inner_context_token, | |
| spnego_token_mech=token_mech if is_spnego else None, | |
| ) | |
| else: | |
| oid_display = KnownOID.get_name(token_mech) | |
| print(f"[GSSAPI] Unsupported mechanism OID: {oid_display}, cannot process token") | |
| output_token = None | |
| if not output_token: | |
| return output_token | |
| if kerberos_u2u_continuation: | |
| # We need to strip the InitialContextToken GSSAPI will have wrapped | |
| # the AP-REP in as this is not the first token for the initiator. | |
| # This also includes the first 2 bytes of the TOK_ID | |
| inner_context_bytes = extract_initial_context_token(output_token, silent=True)[1] | |
| tok_id = int.from_bytes(inner_context_bytes[:2], byteorder='big') | |
| output_token = inner_context_bytes[2:] | |
| if build_spnego_resp: | |
| # Done in the U2U AP-REQ response, we need to wrap it in the | |
| # NegTokenResp value the initiator expects. | |
| is_spnego = True | |
| neg_state = NegState.REJECT if tok_id == 0x300 else NegState.ACCEPT_COMPLETED | |
| output_token = self._build_neg_token_resp( | |
| neg_state=neg_state, | |
| response_token=output_token, | |
| ) | |
| if is_spnego: | |
| # Print out the SPNEGO response if we are doing SPNEGO, to see the NegTokenResp structure | |
| self._extract_spnego_mech_token(output_token) | |
| return output_token | |
| def _process_pubkey_auth(self, client_pub_key_auth: bytes, client_nonce: bytes | None = None) -> bytes | None: | |
| """Process pubKeyAuth and return server's pubKeyAuth. | |
| CredSSP public key authentication: | |
| - Version 5+ (with nonce): Use magic values + nonce + hash | |
| - Version 2-4 (no nonce): Increment hash by 1 | |
| Args: | |
| client_pub_key_auth: Encrypted public key hash from client | |
| client_nonce: Optional client nonce (CredSSP v5+) | |
| Returns: | |
| Server's encrypted pubKeyAuth response | |
| """ | |
| print("[CredSSP] Processing pubKeyAuth") | |
| if not self.gssapi_context: | |
| print("[CredSSP] ERROR: No GSSAPI context available for pubKeyAuth") | |
| return None | |
| try: | |
| # Decrypt client's pubKeyAuth | |
| unwrapped = self.gssapi_context.unwrap(client_pub_key_auth) | |
| client_hash = unwrapped.message | |
| print(f"[CredSSP] Decrypted client hash ({len(client_hash)} bytes): {client_hash.hex()}") | |
| if client_nonce: | |
| # Newer CredSSP with nonce | |
| print("[CredSSP] Version 5+ mode (with clientNonce)") | |
| # Calculate expected hash: SHA256("CredSSP Client-To-Server Binding Hash\x00" + nonce + public_key) | |
| client_to_server_magic = b"CredSSP Client-To-Server Binding Hash\x00" | |
| expected_data = client_to_server_magic + client_nonce + self.server_public_key | |
| digest = hashes.Hash(hashes.SHA256()) | |
| digest.update(expected_data) | |
| expected_hash = digest.finalize() | |
| # Calculate server response: SHA256("CredSSP Server-To-Client Binding Hash\x00" + nonce + public_key) | |
| server_to_client_magic = b"CredSSP Server-To-Client Binding Hash\x00" | |
| response_data = server_to_client_magic + client_nonce + self.server_public_key | |
| digest = hashes.Hash(hashes.SHA256()) | |
| digest.update(response_data) | |
| response_hash = digest.finalize() | |
| else: | |
| # Older CredSSP without nonce, Client sends public key hash, server increments by 1 | |
| print("[CredSSP] Version 2-4 mode (no nonce)") | |
| expected_hash = self.server_public_key | |
| # Increment hash by 1 (as big-endian integer) | |
| hash_int = int.from_bytes(self.server_public_key, 'big') | |
| response_hash = (hash_int + 1).to_bytes(len(self.server_public_key), 'big') | |
| if expected_hash != client_hash: | |
| print("[CredSSP] WARNING: Client hash does not match expected value!") | |
| print(f" Expected: {expected_hash.hex()}") | |
| print(f" Received: {client_hash.hex()}") | |
| return None | |
| # Encrypt response | |
| wrapped = self.gssapi_context.wrap(response_hash, encrypt=True) | |
| server_pub_key_auth = wrapped.message | |
| print(f"[CredSSP] Server pubKeyAuth ({len(server_pub_key_auth)} bytes): {server_pub_key_auth.hex()}\n") | |
| return server_pub_key_auth | |
| except Exception as e: | |
| print(f"[CredSSP] pubKeyAuth processing error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _process_credentials(self, auth_info: bytes) -> None: | |
| """Process final credentials stage. | |
| authInfo contains encrypted TSCredentials with domain, username, password. | |
| Args: | |
| auth_info: Encrypted TSCredentials | |
| """ | |
| print("[CredSSP] Processing authInfo") | |
| if not self.gssapi_context: | |
| print("[CredSSP] ERROR: No GSSAPI context available for authInfo") | |
| return | |
| try: | |
| # Decrypt authInfo | |
| unwrapped = self.gssapi_context.unwrap(auth_info) | |
| credentials_bytes = unwrapped.message | |
| print(f"[CredSSP] Decrypted credentials ({len(credentials_bytes)} bytes): {credentials_bytes.hex()}") | |
| # Decode TSCredentials | |
| ts_credentials, _ = der_decoder.decode(credentials_bytes, asn1Spec=TSCredentials()) | |
| print("[TSCredentials]") | |
| pretty_print_asn1(ts_credentials) | |
| print() | |
| cred_type = int(ts_credentials['credType']) | |
| credentials_data = bytes(ts_credentials['credentials']) | |
| if cred_type == 1: | |
| password_creds, _ = der_decoder.decode(credentials_data, asn1Spec=TSPasswordCreds()) | |
| domain = bytes(password_creds['domainName']).decode('utf-16-le') | |
| username = bytes(password_creds['userName']).decode('utf-16-le') | |
| password = bytes(password_creds['password']).decode('utf-16-le') | |
| print("[TSPasswordCreds]") | |
| print(f"[CredSSP] Domain: '{domain}'") | |
| print(f"[CredSSP] Username: '{username}'") | |
| print(f"[CredSSP] Password: '{password}'") | |
| else: | |
| print(f"[CredSSP] Unknown credential type {cred_type}, raw data ({len(credentials_data)} bytes): {credentials_data.hex()}") | |
| self.completed = True | |
| print("[CredSSP] Authentication completed!\n") | |
| except Exception as e: | |
| print(f"[CredSSP] Credentials processing error: {e}") | |
| traceback.print_exc() | |
| def _build_response(self, output_token: bytes | None, pub_key_auth: bytes | None) -> bytes: | |
| """Build TSRequest response with optional negoTokens and pubKeyAuth.""" | |
| ts_response = TSRequest() | |
| ts_response['version'] = 6 | |
| if output_token: | |
| nego_token_elem = NegoToken() | |
| nego_token_elem['negoToken'] = output_token | |
| nego_data = NegoData().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| nego_data[0] = nego_token_elem | |
| ts_response['negoTokens'] = nego_data | |
| if pub_key_auth: | |
| ts_response['pubKeyAuth'] = pub_key_auth | |
| encoded = der_encoder.encode(ts_response) | |
| print(f"[Outgoing TSRequest] {len(encoded)} bytes") | |
| pretty_print_asn1(ts_response) | |
| print() | |
| encrypted_response = self._encrypt_with_tls(encoded) | |
| return encrypted_response | |
| def _encrypt_with_tls(self, plaintext: bytes) -> bytes: | |
| """Encrypt data using TLS connection. | |
| Args: | |
| plaintext: Data to encrypt | |
| Returns: | |
| Encrypted TLS record | |
| """ | |
| self.ssl_obj.write(plaintext) | |
| encrypted = self.outgoing_bio.read() | |
| print(f"[TLS] Encrypted {len(plaintext)} bytes -> {len(encrypted)} bytes") | |
| return encrypted | |
| def _extract_spnego_mech_token(self, spnego_token: bytes) -> tuple[bytes, str] | None: | |
| """Extract SPNEGO mechanism token from InitialContextToken. | |
| Uses SPNEGOInitialContextToken schema for proper decoding. | |
| """ | |
| try: | |
| negotiate_token, _ = der_decoder.decode(spnego_token, asn1Spec=NegotiationToken()) | |
| print("[SPNEGO NegotiationToken]") | |
| pretty_print_asn1(negotiate_token) | |
| print() | |
| if 'negTokenInit' in negotiate_token: | |
| mech_token = negotiate_token['negTokenInit']['mechToken'] | |
| mech_types = negotiate_token['negTokenInit']['mechTypes'] | |
| return bytes(mech_token), str(mech_types[0]) | |
| else: | |
| mech_token = negotiate_token['negTokenResp']['responseToken'] | |
| if mech_token.hasValue(): | |
| inner_mech_oid = negotiate_token['negTokenResp']['supportedMech'] | |
| return bytes(mech_token), str(inner_mech_oid) if inner_mech_oid.hasValue() else "Unknown OID" | |
| else: | |
| return None | |
| except Exception as e: | |
| print(f"[SPNEGO] Decode error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _process_kerberos_token( | |
| self, | |
| nego_token: bytes, | |
| inner_context: tuple[str, bytes] | None, | |
| spnego_token_mech: str | None = None, | |
| ) -> bytes | None: | |
| """Extract Kerberos message from GSS-API InitialContextToken. | |
| For Kerberos, innerContextToken format: | |
| - 2 bytes: TOK_ID (big-endian) | |
| - Remaining: Kerberos message (ASN.1) | |
| """ | |
| try: | |
| if inner_context: | |
| request_data = inner_context[1][2:] | |
| tok_id = int.from_bytes(inner_context[1][:2], 'big') | |
| else: | |
| # If inner_context isn't set we assume this is a raw AP-REQ token. | |
| request_data = nego_token | |
| tok_id = 0x100 | |
| if tok_id == 0x100: | |
| return self._process_kerberos_ap_req_token(nego_token, request_data) | |
| elif tok_id == 0x400: | |
| # Used with Kerberos User to User that Windows clients use with CredSSP. | |
| return self._process_kerberos_tgt_request_token( | |
| request_data, | |
| spnego_token_mech=spnego_token_mech, | |
| ) | |
| else: | |
| print(f"[Kerberos] Unknown TOK_ID 0x{tok_id:04x}, skipping processing") | |
| return None | |
| except Exception as e: | |
| print(f"[Kerberos] Processing error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _process_kerberos_ap_req_token(self, spnego_token: bytes, request_bytes: bytes) -> bytes | None: | |
| """Process Kerberos AP-REQ message.""" | |
| try: | |
| ap_req, _ = der_decoder.decode(request_bytes, asn1Spec=KerberosApReq()) | |
| print("[Kerberos AP-REQ]") | |
| pretty_print_asn1(ap_req) | |
| print() | |
| except Exception as e: | |
| print(f"[Kerberos AP-REQ] Decode error: {e}") | |
| traceback.print_exc() | |
| creds = self.gssapi_creds | |
| self.gssapi_context = gssapi.SecurityContext(creds=creds, usage='accept') | |
| return self.gssapi_context.step(spnego_token) | |
| def _process_kerberos_tgt_request_token( | |
| self, | |
| tgt_request_bytes: bytes, | |
| spnego_token_mech: str | None = None, | |
| ) -> bytes | None: | |
| """Process Kerberos TGT Request token using gssapi library.""" | |
| try: | |
| tgt_req, _ = der_decoder.decode(tgt_request_bytes, asn1Spec=KerbU2UTgtRequest()) | |
| print("[Kerberos U2U TGT Request]") | |
| pretty_print_asn1(tgt_req) | |
| print() | |
| server_name_type = 0 | |
| server_names = [] | |
| if tgt_req['server-name'].hasValue(): | |
| server_name_type = int(tgt_req['server-name']['name-type']) | |
| server_names = [str(s) for s in tgt_req['server-name']['name-string']] | |
| realm = None | |
| if tgt_req['realm'].hasValue(): | |
| realm = str(tgt_req['realm']) | |
| except Exception as e: | |
| print(f"[Kerberos U2U TGT Request] Decode error: {e}") | |
| traceback.print_exc() | |
| # Tells the nego token handler to manually handle the incoming U2U | |
| # AP-REQ. The GSSAPI context won't be able to use it directly as it's | |
| # either a NegTokenResp not inside an InitialContextToken or an AP-REQ | |
| # inside an InitialContextToken but with the U2U OID which GSSAPI | |
| # doesn't support. | |
| self._kerberos_u2u_continuation = True | |
| if not self.kerberos_service_tgt: | |
| if spnego_token_mech: | |
| print("[Kerberos U2U TGT Request] No service TGT available, but SPNEGO was used, sending back NegTokenResp with NTLM mech to attempt fallback") | |
| # While this is not implemented it is here to test what Windows does. | |
| return self._build_neg_token_resp( | |
| neg_state=NegState.REQUEST_MIC, | |
| supported_mech=KnownOID.NTLM.value, | |
| ) | |
| print("[Kerberos U2U TGT Request] No service TGT available, returning KRB_AP_ERR_NO_TGT") | |
| sname = (server_name_type, server_names or ["UNKNOWN"]) | |
| krb_error_bytes = self._build_krb_error( | |
| error_code=0x43, # KRB_AP_ERR_NO_TGT | |
| realm=realm or "UNKNOWN", | |
| sname=sname, | |
| e_text="No TGT available for U2U authentication", | |
| ) | |
| return self._build_initial_context_token( | |
| this_mech=KnownOID.KERBEROS_5_U2U.value, | |
| inner_token=b'\x03\x00' + krb_error_bytes, # TOK_ID for KRB-ERROR | |
| ) | |
| # There's probably a better way to do this but this is a POC. | |
| if len(server_names) > 1: | |
| # Strip out the service portion (first component) | |
| names = b"".join(n.encode() for n in server_names[1:]) | |
| our_principal = self.kerberos_service_tgt[0] | |
| is_match = True | |
| if names != b"".join(our_principal.components): | |
| is_match = False | |
| elif realm and our_principal.realm != realm.encode(): | |
| is_match = False | |
| if not is_match: | |
| # We should probably reject this but just in case send back our TGT | |
| # and just let the client validate the ticket. | |
| print(f"[Kerberos U2U TGT Request] WARNING: Server principal in request does not match expected service TGT principal {our_principal.name.decode()}, sending back TGT anyway") | |
| # GSSAPI does not support U2U exchanges directly, it cannot handle an | |
| # InitialContextToken with the U2U mech and passing in a TGT-REQ | |
| # message with the Kerb OID will fail. Instead we build the TGT-REP | |
| # outselves with the TGT data for our principal and set the required | |
| # markers so the subsequent nego token handler will process the | |
| # incoming AP-REQ in a way that GSSAPI can handle. | |
| tgt_reply = KerbU2UTgtReply() | |
| tgt_reply['pvno'] = 5 # Kerberos protocol version | |
| tgt_reply['msg-type'] = 17 # KRB_TGT_REP | |
| tgt_reply['ticket'] = self.kerberos_service_tgt[2] | |
| tgt_reply_bytes = der_encoder.encode(tgt_reply) | |
| # Format: TOK_ID (0x0401) + KERB-TGT-REPLY | |
| token_data = self._build_initial_context_token( | |
| this_mech=KnownOID.KERBEROS_5_U2U.value, | |
| inner_token=b"\x04\x01" + tgt_reply_bytes, | |
| ) | |
| if spnego_token_mech: | |
| token_data = self._build_neg_token_resp( | |
| neg_state=NegState.ACCEPT_INCOMPLETE, | |
| response_token=token_data, | |
| supported_mech=spnego_token_mech, | |
| ) | |
| return token_data | |
| def _build_krb_error( | |
| self, | |
| error_code: int, | |
| realm: str, | |
| sname: tuple[int, list[str]] | None = None, | |
| e_text: str | None = None, | |
| crealm: str | None = None, | |
| cname: tuple[int, list[str]] | None = None, | |
| e_data: bytes | None = None, | |
| stime: str | None = None, | |
| susec: int = 0, | |
| ctime: str | None = None, | |
| cusec: int | None = None, | |
| ) -> bytes: | |
| """Build a Kerberos KRB-ERROR message. | |
| Args: | |
| error_code: Kerberos error code | |
| realm: Service realm | |
| sname: Service principal name as (name-type, name-strings) | |
| e_text: Optional error description text | |
| crealm: Optional client realm | |
| cname: Client principal name as (name-type, name-strings) | |
| e_data: Optional error-specific data | |
| stime: Server time (GeneralizedTime format), defaults to current time | |
| susec: Server microseconds, defaults to 0 | |
| ctime: Optional client time (GeneralizedTime format) | |
| cusec: Optional client microseconds | |
| Returns: | |
| DER-encoded KRB-ERROR bytes | |
| """ | |
| import time | |
| krb_error = KrbError() | |
| tagged_pvno = univ.Integer(5).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) | |
| ) | |
| krb_error['pvno'] = tagged_pvno | |
| tagged_msg_type = univ.Integer(30).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| krb_error['msg-type'] = tagged_msg_type | |
| if ctime: | |
| tagged_ctime = KerberosTime(ctime).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) | |
| ) | |
| krb_error['ctime'] = tagged_ctime | |
| if cusec is not None: | |
| tagged_cusec = Microseconds(cusec).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3) | |
| ) | |
| krb_error['cusec'] = tagged_cusec | |
| if not stime: | |
| now = time.gmtime() | |
| stime = time.strftime("%Y%m%d%H%M%SZ", now) | |
| tagged_stime = KerberosTime(stime).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4) | |
| ) | |
| krb_error['stime'] = tagged_stime | |
| tagged_susec = Microseconds(susec).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5) | |
| ) | |
| krb_error['susec'] = tagged_susec | |
| tagged_error_code = univ.Integer(error_code).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6) | |
| ) | |
| krb_error['error-code'] = tagged_error_code | |
| if crealm: | |
| tagged_crealm = KerberosString(crealm).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 7) | |
| ) | |
| krb_error['crealm'] = tagged_crealm | |
| if cname: | |
| # Build PrincipalName with tag [8] for cname | |
| name_type, name_strings = cname | |
| tagged_cname = PrincipalName().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8) | |
| ) | |
| tagged_cname['name-type'] = name_type | |
| tagged_cname['name-string'].clear() | |
| for name_str in name_strings: | |
| tagged_cname['name-string'].append(KerberosString(name_str)) | |
| krb_error['cname'] = tagged_cname | |
| tagged_realm = KerberosString(realm).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 9) | |
| ) | |
| krb_error['realm'] = tagged_realm | |
| if sname: | |
| # Build PrincipalName with tag [10] for sname | |
| name_type, name_strings = sname | |
| tagged_sname = PrincipalName().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 10) | |
| ) | |
| tagged_sname['name-type'] = name_type | |
| tagged_sname['name-string'].clear() | |
| for name_str in name_strings: | |
| tagged_sname['name-string'].append(KerberosString(name_str)) | |
| krb_error['sname'] = tagged_sname | |
| if e_text: | |
| tagged_etext = KerberosString(e_text).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 11) | |
| ) | |
| krb_error['e-text'] = tagged_etext | |
| if e_data: | |
| tagged_edata = univ.OctetString(e_data).subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 12) | |
| ) | |
| krb_error['e-data'] = tagged_edata | |
| return der_encoder.encode(krb_error) | |
| def _build_initial_context_token( | |
| self, | |
| this_mech: str, | |
| inner_token: bytes, | |
| ) -> bytes: | |
| # Wrap in InitialContextToken (APPLICATION 0) | |
| # Structure: 0x60 <length> <OID> <innerContextToken> | |
| oid_bytes = der_encoder.encode(univ.ObjectIdentifier(this_mech)) | |
| content_length = len(oid_bytes) + len(inner_token) | |
| if content_length < 128: | |
| length_bytes = bytes([content_length]) | |
| else: | |
| # Long form: 0x80 | num_octets, followed by length octets | |
| length_octets = content_length.to_bytes((content_length.bit_length() + 7) // 8, 'big') | |
| length_bytes = bytes([0x80 | len(length_octets)]) + length_octets | |
| return b"\x60" + length_bytes + oid_bytes + inner_token | |
| def _build_neg_token_resp( | |
| self, | |
| neg_state: int | NegState, | |
| response_token: bytes | None = None, | |
| supported_mech: str | None = None, | |
| ) -> bytes: | |
| neg_token_resp = NegTokenResp().subtype( | |
| explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) | |
| ) | |
| neg_token_resp['negState'] = neg_state | |
| if response_token: | |
| neg_token_resp['responseToken'] = response_token | |
| if supported_mech: | |
| neg_token_resp['supportedMech'] = supported_mech | |
| negotiation_token = NegotiationToken() | |
| negotiation_token['negTokenResp'] = neg_token_resp | |
| return der_encoder.encode(negotiation_token) | |
| class CredSSPTestHandler(BaseHTTPRequestHandler): | |
| """HTTP handler for testing CredSSP authentication.""" | |
| # Use HTTP/1.1 for persistent connections | |
| protocol_version = "HTTP/1.1" | |
| # Class-level storage for contexts (shared across all instances) | |
| contexts: dict[str, CredSSPContext] = {} | |
| done_contexts: set[str] = set() | |
| def __init__( | |
| self, | |
| *args, | |
| credssp_kwargs: dict[str, object], | |
| **kwargs, | |
| ) -> None: | |
| """Initialize handler with CredSSP configuration. | |
| Args: | |
| credssp_kwargs: Options to pass to the CredSSPContext. | |
| """ | |
| self.credssp_kwargs = credssp_kwargs | |
| super().__init__(*args, **kwargs) | |
| def log_message(self, format, *args): | |
| """Override to add custom formatting.""" | |
| print(f"[HTTP] {format % args}") | |
| def do_POST(self): | |
| """Handle POST request with CredSSP authentication.""" | |
| # Keep connection alive for multi-round authentication | |
| self.close_connection = False | |
| print("\n" + "="*80) | |
| conn_id = f"{self.client_address[0]}:{self.client_address[1]}" | |
| print(f"Request from {conn_id}") | |
| print("="*80) | |
| if conn_id in self.done_contexts: | |
| print("[HTTP] Processing already authenticated context.") | |
| self.send_response(200) | |
| self.end_headers() | |
| return | |
| auth_header = self.headers.get('Authorization', '') | |
| if auth_header.startswith('CredSSP '): | |
| token_b64 = auth_header[8:] | |
| print(f"Authorization: CredSSP {token_b64}\n") | |
| else: | |
| if auth_header: | |
| print(f"Authorization: {auth_header}\n") | |
| if not auth_header: | |
| self._send_auth_required() | |
| return | |
| if not auth_header.startswith('CredSSP '): | |
| self.send_response(400) | |
| self.end_headers() | |
| return | |
| # Decode token | |
| try: | |
| client_token = base64.b64decode(token_b64) | |
| # Identify TLS record | |
| if len(client_token) >= 5: | |
| record_type = client_token[0] | |
| record_types = { | |
| 0x14: "ChangeCipherSpec", | |
| 0x15: "Alert", | |
| 0x16: "Handshake", | |
| 0x17: "Application Data" | |
| } | |
| print(f"TLS Record Type: {record_types.get(record_type, f'Unknown ({record_type:#x})')}") | |
| if record_type == 0x16 and len(client_token) >= 6: | |
| hs_type = client_token[5] | |
| hs_types = {1: "ClientHello", 2: "ServerHello", 11: "Certificate", 16: "ClientKeyExchange", 20: "Finished"} | |
| print(f"Handshake Type: {hs_types.get(hs_type, f'Unknown ({hs_type})')}") | |
| except Exception as e: | |
| print(f"Token decode error: {e}") | |
| self.send_response(400) | |
| self.end_headers() | |
| return | |
| # Get or create context | |
| if conn_id not in self.contexts: | |
| self.contexts[conn_id] = CredSSPContext(**self.credssp_kwargs) | |
| context = self.contexts[conn_id] | |
| # Process token | |
| try: | |
| server_token = context.process_tls_token(client_token) | |
| if context.completed: | |
| print("Authentication complete!\n") | |
| self.send_response(200) | |
| del self.contexts[conn_id] | |
| self.done_contexts.add(conn_id) | |
| else: | |
| self.send_response(401) | |
| if server_token: | |
| response_b64 = base64.b64encode(server_token).decode('ascii') | |
| print(f"[Outgoing CredSSP Token] {len(server_token)} bytes") | |
| print(f"WWW-Authenticate: CredSSP {response_b64}\n") | |
| self.send_header('WWW-Authenticate', f'CredSSP {response_b64}') | |
| elif not context.completed: | |
| self.send_header('WWW-Authenticate', 'CredSSP') | |
| self.send_header('Content-Length', '0') | |
| self.end_headers() | |
| except Exception as e: | |
| print(f"Processing error: {e}") | |
| traceback.print_exc() | |
| self.send_response(403) | |
| self.end_headers() | |
| if conn_id in self.contexts: | |
| del self.contexts[conn_id] | |
| def _send_auth_required(self): | |
| """Send 401 requesting CredSSP.""" | |
| self.send_response(401) | |
| self.send_header('WWW-Authenticate', 'CredSSP') | |
| self.send_header('Content-Length', '0') | |
| self.end_headers() | |
| class Krb5PasswordPrompter(krb5.Krb5Prompt): | |
| def prompt( | |
| self, | |
| msg: bytes, | |
| hidden: bool, | |
| ) -> bytes: | |
| prompt_str = f"Prompt when retrieving Kerberos TGT for service principal - {msg.decode()}" | |
| if not prompt_str.endswith(": "): | |
| prompt_str += ": " | |
| return getpass.getpass(prompt_str).encode() | |
| def get_service_tgt( | |
| krb5_ctx: krb5.Context, | |
| principal: str | None = None, | |
| password: str | None = None, | |
| ) -> tuple[krb5.Principal, krb5.KeyBlock, bytes] | None: | |
| """Attempts to get a TGT for the specified principal either from the cache or through password authentication.""" | |
| default_ccache = krb5.cc_default(krb5_ctx) | |
| try: | |
| ccache_entries = list(default_ccache) | |
| except krb5.Krb5Error: | |
| ccache_entries = [] | |
| b_principal = principal.encode() if principal else None | |
| matched_cred: tuple[krb5.Principal, krb5.KeyBlock, bytes] | None = None | |
| for entry in ccache_entries: | |
| server_names = entry.server.components | |
| if not (server_names and server_names[0] == b"krbtgt"): | |
| continue | |
| if not b_principal or (b_principal and entry.client.name == b_principal): | |
| matched_cred = (entry.client, entry.keyblock, entry.ticket) | |
| break | |
| if matched_cred: | |
| return matched_cred | |
| if not b_principal: | |
| return None | |
| krb5_principal = krb5.parse_name_flags(krb5_ctx, b_principal) | |
| init_opt = krb5.get_init_creds_opt_alloc(krb5_ctx) | |
| krb5.get_init_creds_opt_set_canonicalize(init_opt, True) | |
| cred = krb5.get_init_creds_password( | |
| krb5_ctx, | |
| krb5_principal, | |
| init_opt, | |
| password=password.encode() if password else None, | |
| prompter=None if password else Krb5PasswordPrompter(), | |
| ) | |
| return (cred.client, cred.keyblock, cred.ticket) | |
| def main() -> None: | |
| """Run the CredSSP test server.""" | |
| parser = argparse.ArgumentParser( | |
| description='''CredSSP Test Server | |
| This server implements a CredSSP authentication test server that can logs the | |
| incoming CredSSP authentication tokens and performs the typical authentication | |
| steps. It is designed to troubleshoot and analyze client CredSSP authentication | |
| flows, specifically WinRM based clients. | |
| ''', | |
| epilog=''' | |
| Example usage: | |
| %(prog)s \\ | |
| --keytab wsmantest.domain.test.keytab \\ | |
| --hostname wsmantest.domain.test \\ | |
| --port 5985 | |
| Platform requirements: | |
| This server cannot run on Windows as it requires the krb5 and gssapi Python | |
| libraries which are only available on Linux/macOS. | |
| Keytab requirements | |
| For standard Kerberos authentication (what Python CredSSP clients use), the | |
| keytab must contain the service principal keys for the SPN the client is | |
| requesting. Typically this is HTTP/hostname or WSMAN/hostname. It is easy | |
| to just generate a keytab with both of these SPNs to cover both cases. | |
| For Kerberos User to User (U2U) authentication, which Windows clients use, | |
| the keytab is not used. Instead the server needs to have a valid TGT for the | |
| service principal so either call kinit principal@DOMAIN.TEST before starting | |
| the server or specify the --principal and --principal-password arguments. It | |
| is important this principal has the WSMAN/hostname servicePrincipalName | |
| registered in AD. | |
| To set up a Kerberos Principal for testing run the following PowerShell code: | |
| # Create AD user with AES256 encryption enabled and SPNs | |
| $domain = (Get-ADRootDSE).defaultNamingContext -replace 'DC=','' -replace ',', '.' | |
| $user = 'wsmantest' | |
| $password = 'MyPassword123!' | |
| $passwordSecureString = $password | ConvertTo-SecureString -AsPlainText -Force | |
| $userParams = @{ | |
| Name = $user | |
| AccountPassword = $passwordSecureString | |
| Enabled = $true | |
| KerberosEncryptionType = 'AES256' | |
| PasswordNeverExpires = $true | |
| UserPrincipalName = "$user@$domain" | |
| } | |
| New-ADUser @userParams | |
| # Generate keytab with ktpass (run on Windows Domain Controller) | |
| # This is required if using the Standard Kerberos authentication mode and not | |
| # U2U. | |
| $ktPassCommon = @( | |
| "/out", "$user.keytab" | |
| "/mapuser", "$user@$domain" | |
| "-setupn" | |
| "/pass", $password | |
| "-setpass" | |
| "/crypto", "AES256-SHA1" | |
| "/ptype", "KRB5_NT_PRINCIPAL" | |
| "/rawsalt", "$($domain.ToUpper())$user" | |
| ) | |
| ktpass.exe @ktPassCommon /princ "WSMAN/$user@$($domain.ToUpper())" | |
| ktpass.exe @ktPassCommon /princ "WSMAN/$user.$domain@$($domain.ToUpper())" /in "$user.keytab" | |
| ktpass.exe @ktPassCommon /princ "HTTP/$user@$($domain.ToUpper())" /in "$user.keytab" | |
| ktpass.exe @ktPassCommon /princ "HTTP/$user.$domain@$($domain.ToUpper())" /in "$user.keytab" | |
| To start the test server, make sure the keytab generated above has been copied | |
| locally. Use kinit to get the TGT for the service principal and run: | |
| kinit wsmantest@DOMAIN.TEST | |
| %(prog)s --keytab wsmantest.keytab --hostname wsmantest.domain.test \\ | |
| --port 5985 | |
| To test a Windows client connecting using CredSSP to this test server you need | |
| to update the client's host file to point wsmantest, wsmantest.domain.test to | |
| our test server: | |
| # Update Windows client hosts file to point to test server | |
| # Add to C:\\Windows\\System32\\drivers\\etc\\hosts: | |
| 192.168.1.100 wsmantest wsmantest.domain.test | |
| From there you can use this WSMan command to connect to the test server and | |
| trigger the CredSSP authentication flow: | |
| $user = '...' | |
| $pass = '...' | |
| winrm get winrm/config -r:http://wsmantest.domain.test:5985/wsman -auth:CredSSP -username:$user -password:$pass | |
| You may need to configure the WSMan client to allow CredSSP to such a target in | |
| the trusted hosts. | |
| ''', | |
| formatter_class=argparse.RawDescriptionHelpFormatter | |
| ) | |
| parser.add_argument( | |
| '--hostname', | |
| required=True, | |
| help='Server hostname (used in certificate CN)', | |
| ) | |
| parser.add_argument( | |
| '--keytab', | |
| help='Path to Kerberos keytab file containing service principal keys. If not provided, the ' | |
| 'server attempts to use system keytab (/etc/krb5.keytab).', | |
| ) | |
| parser.add_argument( | |
| '--principal', | |
| help='Kerberos service principal name in format SERVICE/hostname@REALM ' | |
| '(e.g., WSMAN/wsmantest.domain.test@DOMAIN.TEST). ' | |
| 'Used for U2U authentication to obtain service TGT.', | |
| ) | |
| parser.add_argument( | |
| '--principal-password', | |
| help='Password for service principal. If not provided, will attempt to acquire TGT from ' | |
| 'client keytab, credential cache, or prompt interactively if needed for U2U.', | |
| ) | |
| parser.add_argument( | |
| '--port', | |
| type=int, | |
| default=5985, | |
| help='Port to listen on (default: 5985)', | |
| ) | |
| args = parser.parse_args() | |
| cert_pem, key_pem, server_public_key = generate_self_signed_cert(args.hostname) | |
| krb5_ctx = krb5.init_context() | |
| unique_kt_name = f"MEMORY:unique_{os.urandom(8).hex()}".encode() | |
| krb5_keytab = krb5.kt_resolve(krb5_ctx, unique_kt_name) | |
| # service_tgt = get_service_tgt(krb5_ctx, args.principal, password="YourSecurePassword123!") | |
| principal = args.principal | |
| service_tgt = get_service_tgt(krb5_ctx, principal, password=args.principal_password) | |
| if service_tgt: | |
| principal = service_tgt[0].name.decode() | |
| krb5.kt_add_entry(krb5_ctx, krb5_keytab, service_tgt[0], 0, 0, service_tgt[1]) | |
| # We also copy any keytab entries in our explicit keytab path or the system | |
| # default keytab into our in-memory keytab so they can be used. | |
| if args.keytab: | |
| kt_path = pathlib.Path(args.keytab).resolve() | |
| system_kt = krb5.kt_resolve(krb5_ctx, str(kt_path).encode()) | |
| else: | |
| system_kt = krb5.kt_default(krb5_ctx) | |
| keytab_path = system_kt.name.decode() | |
| try: | |
| system_kt_entries = list(system_kt) | |
| except krb5.Krb5Error: | |
| system_kt_entries = [] | |
| print("="*80) | |
| print("CredSSP Test Server") | |
| print("="*80) | |
| print(f"Hostname: '{args.hostname}'") | |
| print(f"Port: {args.port}") | |
| if principal: | |
| print(f"U2U Principal: '{principal}'") | |
| else: | |
| print("U2U Principal: None (Kerberos U2U authentication will not work without a valid service TGT)") | |
| print(f"Keytab: '{keytab_path}'") | |
| if system_kt_entries: | |
| for entry in system_kt_entries: | |
| print(f" {str(entry)}") | |
| krb5.kt_add_entry(krb5_ctx, krb5_keytab, entry.principal, entry.kvno, entry.timestamp, entry.key) | |
| else: | |
| print(" No entries found in keytab, normal Kerberos authentication will not work.") | |
| print("="*80) | |
| print(f"Listening on 0.0.0.0:{args.port} - Program '{sys.executable}' - PID {os.getpid()}\n") | |
| httpd = HTTPServer(('0.0.0.0', args.port), lambda *handler_args, **handler_kwargs: CredSSPTestHandler( | |
| *handler_args, | |
| credssp_kwargs={ | |
| 'cert_pem': cert_pem, | |
| 'key_pem': key_pem, | |
| 'server_public_key': server_public_key, | |
| 'keytab': unique_kt_name if len(list(krb5_keytab)) else None, | |
| 'service_tgt': service_tgt, | |
| }, | |
| **handler_kwargs | |
| )) | |
| try: | |
| httpd.serve_forever() | |
| except KeyboardInterrupt: | |
| print("\n\nShutting down...") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment