Skip to content

Instantly share code, notes, and snippets.

@jborean93
Created March 18, 2026 10:32
Show Gist options
  • Select an option

  • Save jborean93/d999edb6b51ac9bd54040b9e7e230eb9 to your computer and use it in GitHub Desktop.

Select an option

Save jborean93/d999edb6b51ac9bd54040b9e7e230eb9 to your computer and use it in GitHub Desktop.
CredSSP Test Server
#!/usr/bin/env -S uv run --script
# Copyright: (c) 2026, Jordan Borean (@jborean93) <jborean93@gmail.com>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
# /// script
# dependencies = [
# "gssapi",
# "krb5",
# "cryptography",
# "pyasn1",
# ]
# ///
from __future__ import annotations
import argparse
import base64
import datetime
import enum
import getpass
import os
import pathlib
import ssl
import sys
import tempfile
import traceback
from http.server import HTTPServer, BaseHTTPRequestHandler
import krb5
import gssapi
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from pyasn1.codec.der import decoder as der_decoder, encoder as der_encoder
from pyasn1.type import univ, namedtype, namedval, tag, char, useful
class KnownOID(str, enum.Enum):
"""Known OIDs for security mechanisms."""
SPNEGO = "1.3.6.1.5.5.2"
KERBEROS_5 = "1.2.840.113554.1.2.2"
KERBEROS_5_U2U = "1.2.840.113554.1.2.2.3"
MS_KERBEROS = "1.2.840.48018.1.2.2"
NTLM = "1.3.6.1.4.1.311.2.2.10"
NEGOEX = "1.3.6.1.4.1.311.2.2.30"
IAKERB = "1.3.6.1.5.2.5"
@classmethod
def get_name(cls, oid_str: str) -> str:
"""Get the friendly name for an OID."""
try:
oid_enum = cls(oid_str)
friendly_name = oid_enum.name.replace('_', ' ').title()
return f"{friendly_name} ({oid_str})"
except ValueError:
return oid_str
class KerberosPrincipalNameType(enum.IntEnum):
"""Kerberos Principal Name Types."""
UNKNOWN = 0
NT_PRINCIPAL = 1
NT_SRV_INST = 2
NT_SRV_HST = 3
NT_SRV_XHST = 4
NT_UID = 5
NT_X500_PRINCIPAL = 6
NT_SMTP_NAME = 7
NT_ENTERPRISE_PRINCIPAL = 10
@classmethod
def get_name(cls, type_id: int) -> str:
"""Get the friendly name for a principal name type."""
try:
friendly_name = cls(type_id).name
except ValueError:
friendly_name = "Unknown"
return f"{friendly_name} ({type_id})"
class NegState(enum.IntEnum):
"""SPNEGO NegTokenResp negState values."""
ACCEPT_COMPLETED = 0
ACCEPT_INCOMPLETE = 1
REJECT = 2
REQUEST_MIC = 3
@classmethod
def get_name(cls, state: int) -> str:
"""Get the friendly name for a negState value."""
try:
friendly_name = cls(state).name.replace('_', '-').lower()
except ValueError:
friendly_name = "unknown"
return f"{friendly_name} ({state})"
class MechType(univ.ObjectIdentifier):
"""Mechanism type OID.
ASN.1 Definition:
MechType ::= OBJECT IDENTIFIER
"""
pass
class MechTypeList(univ.SequenceOf):
"""List of mechanism OIDs.
ASN.1 Definition:
MechTypeList ::= SEQUENCE OF MechType
"""
componentType = MechType()
class ContextFlags(univ.BitString):
"""Context flags for SPNEGO.
ASN.1 Definition:
ContextFlags ::= BIT STRING {
delegFlag (0),
mutualFlag (1),
replayFlag (2),
sequenceFlag (3),
anonFlag (4),
confFlag (5),
integFlag (6)
} (SIZE (32))
"""
namedValues = namedval.NamedValues(
('delegFlag', 0),
('mutualFlag', 1),
('replayFlag', 2),
('sequenceFlag', 3),
('anonFlag', 4),
('confFlag', 5),
('integFlag', 6)
)
class APOptions(univ.BitString):
"""APOptions for AP-REQ
ASN.1 Definition:
APOptions ::= KerberosFlags
-- reserved(0),
-- use-session-key(1),
-- mutual-required(2)
"""
namedValues = namedval.NamedValues(
('reserved', 0),
('use-session-key', 1),
('mutual-required', 2)
)
class NegTokenInit(univ.Sequence):
"""SPNEGO NegTokenInit structure.
ASN.1 Definition:
NegTokenInit ::= SEQUENCE {
mechTypes [0] MechTypeList,
reqFlags [1] ContextFlags OPTIONAL,
mechToken [2] OCTET STRING OPTIONAL,
mechListMIC [3] OCTET STRING OPTIONAL,
...
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'mechTypes',
MechTypeList().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.OptionalNamedType(
'reqFlags',
ContextFlags().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.OptionalNamedType(
'mechToken',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.OptionalNamedType(
'mechListMIC',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
)
)
class NegTokenResp(univ.Sequence):
"""SPNEGO NegTokenResp structure.
ASN.1 Definition:
NegTokenResp ::= SEQUENCE {
negState [0] ENUMERATED {
accept-completed (0),
accept-incomplete (1),
reject (2),
request-mic (3)
} OPTIONAL,
supportedMech [1] MechType OPTIONAL,
responseToken [2] OCTET STRING OPTIONAL,
mechListMIC [3] OCTET STRING OPTIONAL,
...
}
"""
componentType = namedtype.NamedTypes(
namedtype.OptionalNamedType(
'negState',
univ.Enumerated().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.OptionalNamedType(
'supportedMech',
MechType().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.OptionalNamedType(
'responseToken',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.OptionalNamedType(
'mechListMIC',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
)
)
class NegotiationToken(univ.Choice):
"""SPNEGO NegotiationToken.
ASN.1 Definition:
NegotiationToken ::= CHOICE {
negTokenInit [0] NegTokenInit,
negTokenResp [1] NegTokenResp
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'negTokenInit',
NegTokenInit().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'negTokenResp',
NegTokenResp().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
)
)
class KerberosString(char.GeneralString):
"""Kerberos String type.
ASN.1 Definition:
KerberosString ::= GeneralString (IA5String)
"""
pass
class PrincipalName(univ.Sequence):
"""Kerberos PrincipalName.
ASN.1 Definition:
PrincipalName ::= SEQUENCE {
name-type [0] Int32,
name-string [1] SEQUENCE OF KerberosString
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'name-type',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'name-string',
univ.SequenceOf(componentType=KerberosString()).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
)
)
class KerbU2UTgtRequest(univ.Sequence):
"""Kerberos U2U TGT Request.
ASN.1 Definition:
KERB-TGT-REQUEST ::= SEQUENCE {
pvno [0] INTEGER,
msg-type [1] INTEGER,
server-name [2] PrincipalName OPTIONAL,
realm [3] Realm OPTIONAL
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'pvno',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'msg-type',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.OptionalNamedType(
'server-name',
PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.OptionalNamedType(
'realm',
KerberosString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
)
)
class KerbU2UTgtReply(univ.Sequence):
"""Kerberos U2U TGT Reply.
ASN.1 Definition:
KERB-TGT-REPLY ::= SEQUENCE {
pvno [0] INTEGER,
msg-type [1] INTEGER,
ticket [2] Ticket
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'pvno',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'msg-type',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.NamedType(
'ticket',
univ.Any().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
)
)
class KerberosTime(useful.GeneralizedTime):
"""Kerberos Time type.
ASN.1 Definition:
KerberosTime ::= GeneralizedTime -- with no fractional seconds
"""
pass
class Microseconds(univ.Integer):
"""Kerberos Microseconds type.
ASN.1 Definition:
Microseconds ::= INTEGER (0..999999)
"""
pass
class KrbError(univ.Sequence):
"""Kerberos Error structure.
ASN.1 Definition:
KRB-ERROR ::= [APPLICATION 30] SEQUENCE {
pvno [0] INTEGER (5),
msg-type [1] INTEGER (30),
ctime [2] KerberosTime OPTIONAL,
cusec [3] Microseconds OPTIONAL,
stime [4] KerberosTime,
susec [5] Microseconds,
error-code [6] Int32,
crealm [7] Realm OPTIONAL,
cname [8] PrincipalName OPTIONAL,
realm [9] Realm -- service realm --,
sname [10] PrincipalName -- service name --,
e-text [11] KerberosString OPTIONAL,
e-data [12] OCTET STRING OPTIONAL
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'pvno',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'msg-type',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.OptionalNamedType(
'ctime',
KerberosTime().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.OptionalNamedType(
'cusec',
Microseconds().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
),
namedtype.NamedType(
'stime',
KerberosTime().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4)
)
),
namedtype.NamedType(
'susec',
Microseconds().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5)
)
),
namedtype.NamedType(
'error-code',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6)
)
),
namedtype.OptionalNamedType(
'crealm',
KerberosString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 7)
)
),
namedtype.OptionalNamedType(
'cname',
PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8)
)
),
namedtype.NamedType(
'realm',
KerberosString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 9)
)
),
namedtype.NamedType(
'sname',
PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 10)
)
),
namedtype.OptionalNamedType(
'e-text',
KerberosString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 11)
)
),
namedtype.OptionalNamedType(
'e-data',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 12)
)
)
)
tagSet = univ.Sequence.tagSet.tagExplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 30)
)
class EncryptedData(univ.Sequence):
"""Kerberos EncryptedData structure.
ASN.1 Definition:
EncryptedData ::= SEQUENCE {
etype [0] Int32,
kvno [1] UInt32 OPTIONAL,
cipher [2] OCTET STRING
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'etype',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.OptionalNamedType(
'kvno',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.NamedType(
'cipher',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
)
)
class Ticket(univ.Sequence):
"""Kerberos Ticket structure.
ASN.1 Definition:
Ticket ::= [APPLICATION 1] SEQUENCE {
tkt-vno [0] INTEGER (5),
realm [1] Realm,
sname [2] PrincipalName,
enc-part [3] EncryptedData
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'tkt-vno',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'realm',
KerberosString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.NamedType(
'sname',
PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.NamedType(
'enc-part',
EncryptedData().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
)
)
tagSet = univ.Sequence.tagSet.tagExplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 1)
)
class KerberosApReq(univ.Sequence):
"""Kerberos AP-REQ structure (inner SEQUENCE, decode after stripping APPLICATION 14).
ASN.1 Definition:
AP-REQ ::= [APPLICATION 14] SEQUENCE {
pvno [0] INTEGER (5),
msg-type [1] INTEGER (14),
ap-options [2] APOptions,
ticket [3] Ticket,
authenticator [4] EncryptedData
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'pvno',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'msg-type',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.NamedType(
'ap-options',
APOptions().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.NamedType(
'ticket',
Ticket().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
),
namedtype.NamedType(
'authenticator',
EncryptedData().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4)
)
)
)
tagSet = univ.Sequence.tagSet.tagExplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatConstructed, 14)
)
class NegoToken(univ.Sequence):
"""NegoToken structure.
ASN.1 Definition:
NegoToken ::= SEQUENCE {
negoToken [0] OCTET STRING
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'negoToken',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
)
)
class NegoData(univ.SequenceOf):
"""NegoData structure.
ASN.1 Definition:
NegoData ::= SEQUENCE OF NegoToken
"""
componentType = NegoToken()
class TSCredentials(univ.Sequence):
"""TSCredentials structure (inside authInfo after decryption).
ASN.1 Definition:
TSCredentials ::= SEQUENCE {
credType [0] INTEGER,
credentials [1] OCTET STRING
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'credType',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'credentials',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
)
)
class TSPasswordCreds(univ.Sequence):
"""TSPasswordCreds structure (credType 1).
ASN.1 Definition:
TSPasswordCreds ::= SEQUENCE {
domainName [0] OCTET STRING,
userName [1] OCTET STRING,
password [2] OCTET STRING
}
"""
componentType = namedtype.NamedTypes(
namedtype.NamedType(
'domainName',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.NamedType(
'userName',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.NamedType(
'password',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
)
)
class TSRequest(univ.Sequence):
"""TSRequest structure for CredSSP.
ASN.1 Definition:
TSRequest ::= SEQUENCE {
version [0] INTEGER,
negoTokens [1] NegoData OPTIONAL,
authInfo [2] OCTET STRING OPTIONAL,
pubKeyAuth [3] OCTET STRING OPTIONAL,
errorCode [4] INTEGER OPTIONAL,
clientNonce [5] OCTET STRING OPTIONAL
}
"""
componentType = namedtype.NamedTypes(
namedtype.OptionalNamedType(
'version',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
),
namedtype.OptionalNamedType(
'negoTokens',
NegoData().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
),
namedtype.OptionalNamedType(
'authInfo',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
),
namedtype.OptionalNamedType(
'pubKeyAuth',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
),
namedtype.OptionalNamedType(
'errorCode',
univ.Integer().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4)
)
),
namedtype.OptionalNamedType(
'clientNonce',
univ.OctetString().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5)
)
)
)
def extract_initial_context_token(
token: bytes,
context: str = "",
silent: bool = False
) -> tuple[str, bytes]:
"""Extract and display InitialContextToken with thisMech OID and inner bytes.
Args:
token: InitialContextToken bytes
context: Context label prefix for display (e.g., "Incoming", "Outgoing")
silent: If True, suppress printing
Returns:
Tuple of (thisMech OID string, inner bytes)
"""
if context:
header = f"[{context} InitialContextToken]"
else:
header = "[InitialContextToken]"
offset = 1
if token[offset] & 0x80:
len_octets = token[offset] & 0x7F
offset += 1 + len_octets
else:
offset += 1
if token[offset] != 0x06:
raise ValueError(f"{header} Expected OID tag, got 0x{token[offset]:02x}")
oid_len = token[offset + 1]
oid_bytes = token[offset:offset + 2 + oid_len]
oid_obj, _ = der_decoder.decode(oid_bytes)
oid_str = str(oid_obj)
offset += 2 + oid_len
inner_bytes = token[offset:]
if not silent:
print(header)
print(f" thisMech: {KnownOID.get_name(oid_str)}")
print(f" innerContextToken: {inner_bytes.hex()}")
print()
return oid_str, inner_bytes
def pretty_print_asn1(asn1_obj, indent=0, name=""):
"""Pretty print ASN.1 structure showing only field names and values.
Args:
asn1_obj: ASN.1 object to print
indent: Current indentation level
name: Name/label for this object
"""
prefix = " " * indent
# Handle different types
if isinstance(asn1_obj, univ.Enumerated):
# Handle Enumerated (for NegTokenResp.negState)
enum_val = int(asn1_obj)
if name.endswith(" negState"):
enum_name = NegState.get_name(enum_val)
else:
enum_name = str(enum_val)
if name:
print(f"{prefix}{name}: {enum_name}")
else:
print(f"{prefix}{enum_name})")
elif isinstance(asn1_obj, (univ.Integer, univ.Boolean)):
raw_int_val = int(asn1_obj)
if name:
if name == '[0] name-type':
raw_int_val = KerberosPrincipalNameType.get_name(raw_int_val)
print(f"{prefix}{name}: {raw_int_val}")
else:
print(f"{prefix}{raw_int_val}")
elif isinstance(asn1_obj, char.GeneralString):
str_val = bytes(asn1_obj).decode('utf-8')
if name:
print(f"{prefix}{name}: {str_val}")
else:
print(f"{prefix}{str_val}")
elif isinstance(asn1_obj, univ.OctetString):
data = bytes(asn1_obj)
if name:
print(f"{prefix}{name}: {data.hex()}")
else:
print(f"{prefix}{data.hex()}")
elif isinstance(asn1_obj, univ.ObjectIdentifier):
oid_str = str(asn1_obj)
oid_display = KnownOID.get_name(oid_str)
if name:
print(f"{prefix}{name}: {oid_display}")
else:
print(f"{prefix}{oid_display}")
elif isinstance(asn1_obj, univ.BitString):
# Handle BitString (for ContextFlags)
bits = asn1_obj
# Get hex representation
hex_val = bits.hexValue if hasattr(bits, 'hexValue') else None
# Try to decode named bits if ContextFlags
flag_names = []
if hasattr(bits, 'namedValues') and bits.namedValues:
# Get the bit positions that are set
for bit_name, bit_pos in bits.namedValues.items():
if len(bits) > bit_pos and bits[bit_pos]:
flag_names.append(bit_name)
if name:
if flag_names:
print(f"{prefix}{name}: {', '.join(flag_names)} ({hex_val or bits.prettyPrint()})")
else:
print(f"{prefix}{name}: {hex_val or bits.prettyPrint()}")
else:
if flag_names:
print(f"{prefix}{', '.join(flag_names)} ({hex_val or bits.prettyPrint()})")
else:
print(f"{prefix}{hex_val or bits.prettyPrint()}")
elif isinstance(asn1_obj, univ.Choice):
# Handle Choice (for NegotiationToken)
if name:
print(f"{prefix}{name}:")
# Get the chosen component
chosen_name = asn1_obj.getName()
chosen_value = asn1_obj.getComponent()
if chosen_value is not None:
pretty_print_asn1(chosen_value, indent + 1, chosen_name)
elif isinstance(asn1_obj, (univ.Sequence, univ.Set)):
# Print structure name if provided
if name:
print(f"{prefix}{name}:")
# Print components
if hasattr(asn1_obj, 'componentType') and asn1_obj.componentType:
# Named components
try:
for idx, named_type in enumerate(asn1_obj.componentType.namedTypes):
component_name = named_type.name
try:
component = asn1_obj[component_name]
if component is not None and (not hasattr(component, 'hasValue') or component.hasValue()):
pretty_print_asn1(component, indent + 1, f"[{idx}] {component_name}")
except Exception as e:
print(f"{prefix} [Error reading {component_name}: {e}]")
except Exception as e:
# Fallback to unnamed
print(f"{prefix} [Error reading named types: {e}]")
for idx in range(len(asn1_obj)):
try:
component = asn1_obj[idx]
if component is not None:
pretty_print_asn1(component, indent + 1, f"[{idx}]")
except Exception as e:
print(f"{prefix} [Error reading component {idx}: {e}]")
else:
# Unnamed components
for idx in range(len(asn1_obj)):
try:
component = asn1_obj[idx]
if component is not None:
pretty_print_asn1(component, indent + 1, f"[{idx}]")
except Exception as e:
print(f"{prefix} [Error reading component {idx}: {e}]")
elif isinstance(asn1_obj, univ.SequenceOf):
if name:
print(f"{prefix}{name}:")
for idx, item in enumerate(asn1_obj):
pretty_print_asn1(item, indent + 1, f"[{idx}]")
else:
# Generic handling
if hasattr(asn1_obj, 'hasValue') and asn1_obj.hasValue():
try:
if name:
print(f"{prefix}{name}: {asn1_obj.prettyPrint()}")
else:
print(f"{prefix}{asn1_obj.prettyPrint()}")
except Exception as e:
print(f"{prefix}[Error printing value: {e}]")
def generate_self_signed_cert(hostname: str) -> tuple[bytes, bytes, bytes]:
"""Generate an ephemeral self-signed certificate for CredSSP TLS.
Args:
hostname: Hostname to use in certificate CN
Returns:
Tuple of (certificate_pem, private_key_pem)
"""
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Build certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "CredSSP Test"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.now(datetime.timezone.utc)
).not_valid_after(
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=1)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(hostname),
]),
critical=False,
).sign(private_key, hashes.SHA256())
# Serialize to PEM
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
# Calculate public key hash for CredSSP
public_key = cert.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.PKCS1,
)
return cert_pem, key_pem, public_key
class CredSSPContext:
"""Manages CredSSP TLS handshake and GSSAPI U2U authentication."""
def __init__(
self,
cert_pem: bytes,
key_pem: bytes,
server_public_key: bytes,
keytab: bytes | None,
service_tgt: tuple[krb5.Principal, krb5.KeyBlock, bytes] | None,
) -> None:
"""Initialize CredSSP context.
Args:
cert_pem: Server certificate in PEM format
key_pem: Private key in PEM format
server_public_key: The server's TLS public key
keytab: Path to keytab file to use for GSSAPI credential acquisition
service_tgt: The TGT to use for Kerberos U2U authentication
"""
self.tls_established = False
self.gssapi_context: gssapi.SecurityContext | None = None
self.completed = False
self.server_public_key = server_public_key
self.kerberos_service_tgt = service_tgt
self._kerberos_u2u_continuation: bool = False
# Acquire credentials from keytab without setting KRB5_KTNAME
# Using GSS_C_NO_NAME (None) allows GSSAPI to select the principal
# based on the incoming request's target name
self.gssapi_cred: gssapi.Credentials | None = None
if keytab:
self.gssapi_creds = gssapi.Credentials(
usage='accept',
store={
'keytab': keytab,
},
)
# Create SSL context with memory BIO
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Write cert and key to temp files (required by ssl module)
with tempfile.NamedTemporaryFile(mode='wb',suffix='.pem') as temp_cert:
with tempfile.NamedTemporaryFile(mode='wb', suffix='.pem') as temp_key:
temp_cert.write(cert_pem)
temp_cert.flush()
temp_key.write(key_pem)
temp_key.flush()
self.ssl_context.load_cert_chain(temp_cert.name, temp_key.name)
# Create memory BIO
self.incoming_bio = ssl.MemoryBIO()
self.outgoing_bio = ssl.MemoryBIO()
# Create SSL object
self.ssl_obj = self.ssl_context.wrap_bio(
self.incoming_bio,
self.outgoing_bio,
server_side=True
)
def process_tls_token(self, client_token: bytes) -> bytes | None:
"""Process a TLS token from the client.
Args:
client_token: TLS record from client
Returns:
TLS response to send to client, or None
"""
# Write client data to incoming BIO
self.incoming_bio.write(client_token)
# Try to do handshake or read data
try:
if not self.tls_established:
self.ssl_obj.do_handshake()
self.tls_established = True
print("[TLS] Handshake complete")
else:
# TLS is established, try to read application data
app_data = self.ssl_obj.read(16384)
if app_data:
print(f"\n[TLS] Decrypted application data ({len(app_data)} bytes):")
print(f" {app_data.hex()}\n")
# This should be a TSRequest token.
return self._process_ts_request(app_data)
except ssl.SSLWantReadError:
pass
except ssl.SSLWantWriteError:
pass
except Exception as e:
print(f"[TLS] Error: {e}")
traceback.print_exc()
# Read any data from outgoing BIO
if self.outgoing_bio.pending:
response = self.outgoing_bio.read()
return response
return None
def _process_ts_request(self, token: bytes) -> bytes | None:
"""Process CredSSP message (after TLS).
Handles three stages:
1. Authentication (negoTokens)
2. Public key validation (pubKeyAuth)
3. Credentials (authInfo)
Args:
token: CredSSP TSRequest
Returns:
Encrypted response token to send back through TLS
"""
try:
ts_request, _ = der_decoder.decode(token, asn1Spec=TSRequest())
print("[Incoming TSRequest]")
pretty_print_asn1(ts_request)
print()
output_token = None
pub_key_auth = None
if ts_request['negoTokens'].hasValue():
output_token = self._process_nego_tokens(bytes(ts_request['negoTokens'][0]['negoToken']))
if ts_request['pubKeyAuth'].hasValue():
client_nonce = bytes(ts_request['clientNonce']) if ts_request['clientNonce'].hasValue() else None
pub_key_auth = self._process_pubkey_auth(bytes(ts_request['pubKeyAuth']), client_nonce)
if ts_request['authInfo'].hasValue():
self._process_credentials(bytes(ts_request['authInfo']))
if output_token or pub_key_auth:
return self._build_response(output_token, pub_key_auth)
return None
except Exception as e:
print(f"[TSRequest] Processing error: {e}")
traceback.print_exc()
return None
def _process_nego_tokens(self, nego_token: bytes) -> bytes | None:
"""Process negoTokens and return output token."""
print("[CredSSP] Processing negoTokens")
is_spnego = False
kerberos_u2u_continuation = self._kerberos_u2u_continuation
self._kerberos_u2u_continuation = False
build_spnego_resp = False
if self.gssapi_context:
print("[CredSSP] Continuing existing GSSAPI context with new negoToken")
output_token = self.gssapi_context.step(nego_token)
else:
inner_context_token = None
if kerberos_u2u_continuation:
# This is the second call done in a Kerberos U2U exchange. We
# expect either a SPNEGO NegTokenResp, an AP-REQ wrapped in an
# InitialContextToken, or an NTLM token (if we sent NTLM mech).
if nego_token[0] == 0xA1:
# As we are unwrapping the SPNEGO wrapper before calling
# GSSAPI we need to rebuild the response that the initiator
# expects.
build_spnego_resp = True
spnego_info = self._extract_spnego_mech_token(nego_token)
if not spnego_info:
return None
nego_token = spnego_info[0]
# Check if it's NTLM (fallback from failed U2U)
if nego_token.startswith(b"NTLMSSP\x00"):
print("[CredSSP] U2U continuation received NTLM token due to U2U failure, this is not handled (yet)")
return None
if nego_token[0] != 0x60:
print("[CredSSP] Did not find InitialContextToken in U2U AP-REQ payload")
return None
# The InitialContextToken uses the Kerberos U2U OID which GSSAPI
# doesn't understand, we rewrap it with the Kerberos OID so that
# GSSAPI will process it.
inner_context_token = extract_initial_context_token(nego_token, context="U2U AP-REQ")
nego_token = self._build_initial_context_token(KnownOID.KERBEROS_5.value, inner_context_token[1])
if nego_token[0] == 0x60:
inner_context_token = extract_initial_context_token(nego_token, context="Incoming")
if inner_context_token[0] == KnownOID.SPNEGO:
is_spnego = True
spnego_info = self._extract_spnego_mech_token(inner_context_token[1])
if not spnego_info:
return None
token_bytes, token_mech = spnego_info
inner_context_token = extract_initial_context_token(token_bytes, context="SPNEGO mechToken")
else:
token_bytes = nego_token
token_mech = KnownOID.KERBEROS_5.value
elif nego_token[0] == 0x6E:
# This is a raw AP-REQ for Kerberos.
token_mech = KnownOID.KERBEROS_5.value
elif nego_token.startswith(b"NTLMSSP\x00"):
print("[CredSSP] Detected NTLM token, not implemented yet")
return None
else:
print("[CredSSP] Unsupported negoToken format, cannot process")
return None
if token_mech in [KnownOID.MS_KERBEROS, KnownOID.KERBEROS_5]:
output_token = self._process_kerberos_token(
nego_token,
inner_context_token,
spnego_token_mech=token_mech if is_spnego else None,
)
else:
oid_display = KnownOID.get_name(token_mech)
print(f"[GSSAPI] Unsupported mechanism OID: {oid_display}, cannot process token")
output_token = None
if not output_token:
return output_token
if kerberos_u2u_continuation:
# We need to strip the InitialContextToken GSSAPI will have wrapped
# the AP-REP in as this is not the first token for the initiator.
# This also includes the first 2 bytes of the TOK_ID
inner_context_bytes = extract_initial_context_token(output_token, silent=True)[1]
tok_id = int.from_bytes(inner_context_bytes[:2], byteorder='big')
output_token = inner_context_bytes[2:]
if build_spnego_resp:
# Done in the U2U AP-REQ response, we need to wrap it in the
# NegTokenResp value the initiator expects.
is_spnego = True
neg_state = NegState.REJECT if tok_id == 0x300 else NegState.ACCEPT_COMPLETED
output_token = self._build_neg_token_resp(
neg_state=neg_state,
response_token=output_token,
)
if is_spnego:
# Print out the SPNEGO response if we are doing SPNEGO, to see the NegTokenResp structure
self._extract_spnego_mech_token(output_token)
return output_token
def _process_pubkey_auth(self, client_pub_key_auth: bytes, client_nonce: bytes | None = None) -> bytes | None:
"""Process pubKeyAuth and return server's pubKeyAuth.
CredSSP public key authentication:
- Version 5+ (with nonce): Use magic values + nonce + hash
- Version 2-4 (no nonce): Increment hash by 1
Args:
client_pub_key_auth: Encrypted public key hash from client
client_nonce: Optional client nonce (CredSSP v5+)
Returns:
Server's encrypted pubKeyAuth response
"""
print("[CredSSP] Processing pubKeyAuth")
if not self.gssapi_context:
print("[CredSSP] ERROR: No GSSAPI context available for pubKeyAuth")
return None
try:
# Decrypt client's pubKeyAuth
unwrapped = self.gssapi_context.unwrap(client_pub_key_auth)
client_hash = unwrapped.message
print(f"[CredSSP] Decrypted client hash ({len(client_hash)} bytes): {client_hash.hex()}")
if client_nonce:
# Newer CredSSP with nonce
print("[CredSSP] Version 5+ mode (with clientNonce)")
# Calculate expected hash: SHA256("CredSSP Client-To-Server Binding Hash\x00" + nonce + public_key)
client_to_server_magic = b"CredSSP Client-To-Server Binding Hash\x00"
expected_data = client_to_server_magic + client_nonce + self.server_public_key
digest = hashes.Hash(hashes.SHA256())
digest.update(expected_data)
expected_hash = digest.finalize()
# Calculate server response: SHA256("CredSSP Server-To-Client Binding Hash\x00" + nonce + public_key)
server_to_client_magic = b"CredSSP Server-To-Client Binding Hash\x00"
response_data = server_to_client_magic + client_nonce + self.server_public_key
digest = hashes.Hash(hashes.SHA256())
digest.update(response_data)
response_hash = digest.finalize()
else:
# Older CredSSP without nonce, Client sends public key hash, server increments by 1
print("[CredSSP] Version 2-4 mode (no nonce)")
expected_hash = self.server_public_key
# Increment hash by 1 (as big-endian integer)
hash_int = int.from_bytes(self.server_public_key, 'big')
response_hash = (hash_int + 1).to_bytes(len(self.server_public_key), 'big')
if expected_hash != client_hash:
print("[CredSSP] WARNING: Client hash does not match expected value!")
print(f" Expected: {expected_hash.hex()}")
print(f" Received: {client_hash.hex()}")
return None
# Encrypt response
wrapped = self.gssapi_context.wrap(response_hash, encrypt=True)
server_pub_key_auth = wrapped.message
print(f"[CredSSP] Server pubKeyAuth ({len(server_pub_key_auth)} bytes): {server_pub_key_auth.hex()}\n")
return server_pub_key_auth
except Exception as e:
print(f"[CredSSP] pubKeyAuth processing error: {e}")
traceback.print_exc()
return None
def _process_credentials(self, auth_info: bytes) -> None:
"""Process final credentials stage.
authInfo contains encrypted TSCredentials with domain, username, password.
Args:
auth_info: Encrypted TSCredentials
"""
print("[CredSSP] Processing authInfo")
if not self.gssapi_context:
print("[CredSSP] ERROR: No GSSAPI context available for authInfo")
return
try:
# Decrypt authInfo
unwrapped = self.gssapi_context.unwrap(auth_info)
credentials_bytes = unwrapped.message
print(f"[CredSSP] Decrypted credentials ({len(credentials_bytes)} bytes): {credentials_bytes.hex()}")
# Decode TSCredentials
ts_credentials, _ = der_decoder.decode(credentials_bytes, asn1Spec=TSCredentials())
print("[TSCredentials]")
pretty_print_asn1(ts_credentials)
print()
cred_type = int(ts_credentials['credType'])
credentials_data = bytes(ts_credentials['credentials'])
if cred_type == 1:
password_creds, _ = der_decoder.decode(credentials_data, asn1Spec=TSPasswordCreds())
domain = bytes(password_creds['domainName']).decode('utf-16-le')
username = bytes(password_creds['userName']).decode('utf-16-le')
password = bytes(password_creds['password']).decode('utf-16-le')
print("[TSPasswordCreds]")
print(f"[CredSSP] Domain: '{domain}'")
print(f"[CredSSP] Username: '{username}'")
print(f"[CredSSP] Password: '{password}'")
else:
print(f"[CredSSP] Unknown credential type {cred_type}, raw data ({len(credentials_data)} bytes): {credentials_data.hex()}")
self.completed = True
print("[CredSSP] Authentication completed!\n")
except Exception as e:
print(f"[CredSSP] Credentials processing error: {e}")
traceback.print_exc()
def _build_response(self, output_token: bytes | None, pub_key_auth: bytes | None) -> bytes:
"""Build TSRequest response with optional negoTokens and pubKeyAuth."""
ts_response = TSRequest()
ts_response['version'] = 6
if output_token:
nego_token_elem = NegoToken()
nego_token_elem['negoToken'] = output_token
nego_data = NegoData().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
nego_data[0] = nego_token_elem
ts_response['negoTokens'] = nego_data
if pub_key_auth:
ts_response['pubKeyAuth'] = pub_key_auth
encoded = der_encoder.encode(ts_response)
print(f"[Outgoing TSRequest] {len(encoded)} bytes")
pretty_print_asn1(ts_response)
print()
encrypted_response = self._encrypt_with_tls(encoded)
return encrypted_response
def _encrypt_with_tls(self, plaintext: bytes) -> bytes:
"""Encrypt data using TLS connection.
Args:
plaintext: Data to encrypt
Returns:
Encrypted TLS record
"""
self.ssl_obj.write(plaintext)
encrypted = self.outgoing_bio.read()
print(f"[TLS] Encrypted {len(plaintext)} bytes -> {len(encrypted)} bytes")
return encrypted
def _extract_spnego_mech_token(self, spnego_token: bytes) -> tuple[bytes, str] | None:
"""Extract SPNEGO mechanism token from InitialContextToken.
Uses SPNEGOInitialContextToken schema for proper decoding.
"""
try:
negotiate_token, _ = der_decoder.decode(spnego_token, asn1Spec=NegotiationToken())
print("[SPNEGO NegotiationToken]")
pretty_print_asn1(negotiate_token)
print()
if 'negTokenInit' in negotiate_token:
mech_token = negotiate_token['negTokenInit']['mechToken']
mech_types = negotiate_token['negTokenInit']['mechTypes']
return bytes(mech_token), str(mech_types[0])
else:
mech_token = negotiate_token['negTokenResp']['responseToken']
if mech_token.hasValue():
inner_mech_oid = negotiate_token['negTokenResp']['supportedMech']
return bytes(mech_token), str(inner_mech_oid) if inner_mech_oid.hasValue() else "Unknown OID"
else:
return None
except Exception as e:
print(f"[SPNEGO] Decode error: {e}")
traceback.print_exc()
return None
def _process_kerberos_token(
self,
nego_token: bytes,
inner_context: tuple[str, bytes] | None,
spnego_token_mech: str | None = None,
) -> bytes | None:
"""Extract Kerberos message from GSS-API InitialContextToken.
For Kerberos, innerContextToken format:
- 2 bytes: TOK_ID (big-endian)
- Remaining: Kerberos message (ASN.1)
"""
try:
if inner_context:
request_data = inner_context[1][2:]
tok_id = int.from_bytes(inner_context[1][:2], 'big')
else:
# If inner_context isn't set we assume this is a raw AP-REQ token.
request_data = nego_token
tok_id = 0x100
if tok_id == 0x100:
return self._process_kerberos_ap_req_token(nego_token, request_data)
elif tok_id == 0x400:
# Used with Kerberos User to User that Windows clients use with CredSSP.
return self._process_kerberos_tgt_request_token(
request_data,
spnego_token_mech=spnego_token_mech,
)
else:
print(f"[Kerberos] Unknown TOK_ID 0x{tok_id:04x}, skipping processing")
return None
except Exception as e:
print(f"[Kerberos] Processing error: {e}")
traceback.print_exc()
return None
def _process_kerberos_ap_req_token(self, spnego_token: bytes, request_bytes: bytes) -> bytes | None:
"""Process Kerberos AP-REQ message."""
try:
ap_req, _ = der_decoder.decode(request_bytes, asn1Spec=KerberosApReq())
print("[Kerberos AP-REQ]")
pretty_print_asn1(ap_req)
print()
except Exception as e:
print(f"[Kerberos AP-REQ] Decode error: {e}")
traceback.print_exc()
creds = self.gssapi_creds
self.gssapi_context = gssapi.SecurityContext(creds=creds, usage='accept')
return self.gssapi_context.step(spnego_token)
def _process_kerberos_tgt_request_token(
self,
tgt_request_bytes: bytes,
spnego_token_mech: str | None = None,
) -> bytes | None:
"""Process Kerberos TGT Request token using gssapi library."""
try:
tgt_req, _ = der_decoder.decode(tgt_request_bytes, asn1Spec=KerbU2UTgtRequest())
print("[Kerberos U2U TGT Request]")
pretty_print_asn1(tgt_req)
print()
server_name_type = 0
server_names = []
if tgt_req['server-name'].hasValue():
server_name_type = int(tgt_req['server-name']['name-type'])
server_names = [str(s) for s in tgt_req['server-name']['name-string']]
realm = None
if tgt_req['realm'].hasValue():
realm = str(tgt_req['realm'])
except Exception as e:
print(f"[Kerberos U2U TGT Request] Decode error: {e}")
traceback.print_exc()
# Tells the nego token handler to manually handle the incoming U2U
# AP-REQ. The GSSAPI context won't be able to use it directly as it's
# either a NegTokenResp not inside an InitialContextToken or an AP-REQ
# inside an InitialContextToken but with the U2U OID which GSSAPI
# doesn't support.
self._kerberos_u2u_continuation = True
if not self.kerberos_service_tgt:
if spnego_token_mech:
print("[Kerberos U2U TGT Request] No service TGT available, but SPNEGO was used, sending back NegTokenResp with NTLM mech to attempt fallback")
# While this is not implemented it is here to test what Windows does.
return self._build_neg_token_resp(
neg_state=NegState.REQUEST_MIC,
supported_mech=KnownOID.NTLM.value,
)
print("[Kerberos U2U TGT Request] No service TGT available, returning KRB_AP_ERR_NO_TGT")
sname = (server_name_type, server_names or ["UNKNOWN"])
krb_error_bytes = self._build_krb_error(
error_code=0x43, # KRB_AP_ERR_NO_TGT
realm=realm or "UNKNOWN",
sname=sname,
e_text="No TGT available for U2U authentication",
)
return self._build_initial_context_token(
this_mech=KnownOID.KERBEROS_5_U2U.value,
inner_token=b'\x03\x00' + krb_error_bytes, # TOK_ID for KRB-ERROR
)
# There's probably a better way to do this but this is a POC.
if len(server_names) > 1:
# Strip out the service portion (first component)
names = b"".join(n.encode() for n in server_names[1:])
our_principal = self.kerberos_service_tgt[0]
is_match = True
if names != b"".join(our_principal.components):
is_match = False
elif realm and our_principal.realm != realm.encode():
is_match = False
if not is_match:
# We should probably reject this but just in case send back our TGT
# and just let the client validate the ticket.
print(f"[Kerberos U2U TGT Request] WARNING: Server principal in request does not match expected service TGT principal {our_principal.name.decode()}, sending back TGT anyway")
# GSSAPI does not support U2U exchanges directly, it cannot handle an
# InitialContextToken with the U2U mech and passing in a TGT-REQ
# message with the Kerb OID will fail. Instead we build the TGT-REP
# outselves with the TGT data for our principal and set the required
# markers so the subsequent nego token handler will process the
# incoming AP-REQ in a way that GSSAPI can handle.
tgt_reply = KerbU2UTgtReply()
tgt_reply['pvno'] = 5 # Kerberos protocol version
tgt_reply['msg-type'] = 17 # KRB_TGT_REP
tgt_reply['ticket'] = self.kerberos_service_tgt[2]
tgt_reply_bytes = der_encoder.encode(tgt_reply)
# Format: TOK_ID (0x0401) + KERB-TGT-REPLY
token_data = self._build_initial_context_token(
this_mech=KnownOID.KERBEROS_5_U2U.value,
inner_token=b"\x04\x01" + tgt_reply_bytes,
)
if spnego_token_mech:
token_data = self._build_neg_token_resp(
neg_state=NegState.ACCEPT_INCOMPLETE,
response_token=token_data,
supported_mech=spnego_token_mech,
)
return token_data
def _build_krb_error(
self,
error_code: int,
realm: str,
sname: tuple[int, list[str]] | None = None,
e_text: str | None = None,
crealm: str | None = None,
cname: tuple[int, list[str]] | None = None,
e_data: bytes | None = None,
stime: str | None = None,
susec: int = 0,
ctime: str | None = None,
cusec: int | None = None,
) -> bytes:
"""Build a Kerberos KRB-ERROR message.
Args:
error_code: Kerberos error code
realm: Service realm
sname: Service principal name as (name-type, name-strings)
e_text: Optional error description text
crealm: Optional client realm
cname: Client principal name as (name-type, name-strings)
e_data: Optional error-specific data
stime: Server time (GeneralizedTime format), defaults to current time
susec: Server microseconds, defaults to 0
ctime: Optional client time (GeneralizedTime format)
cusec: Optional client microseconds
Returns:
DER-encoded KRB-ERROR bytes
"""
import time
krb_error = KrbError()
tagged_pvno = univ.Integer(5).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
krb_error['pvno'] = tagged_pvno
tagged_msg_type = univ.Integer(30).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
krb_error['msg-type'] = tagged_msg_type
if ctime:
tagged_ctime = KerberosTime(ctime).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
krb_error['ctime'] = tagged_ctime
if cusec is not None:
tagged_cusec = Microseconds(cusec).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
krb_error['cusec'] = tagged_cusec
if not stime:
now = time.gmtime()
stime = time.strftime("%Y%m%d%H%M%SZ", now)
tagged_stime = KerberosTime(stime).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 4)
)
krb_error['stime'] = tagged_stime
tagged_susec = Microseconds(susec).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5)
)
krb_error['susec'] = tagged_susec
tagged_error_code = univ.Integer(error_code).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6)
)
krb_error['error-code'] = tagged_error_code
if crealm:
tagged_crealm = KerberosString(crealm).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 7)
)
krb_error['crealm'] = tagged_crealm
if cname:
# Build PrincipalName with tag [8] for cname
name_type, name_strings = cname
tagged_cname = PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8)
)
tagged_cname['name-type'] = name_type
tagged_cname['name-string'].clear()
for name_str in name_strings:
tagged_cname['name-string'].append(KerberosString(name_str))
krb_error['cname'] = tagged_cname
tagged_realm = KerberosString(realm).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 9)
)
krb_error['realm'] = tagged_realm
if sname:
# Build PrincipalName with tag [10] for sname
name_type, name_strings = sname
tagged_sname = PrincipalName().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 10)
)
tagged_sname['name-type'] = name_type
tagged_sname['name-string'].clear()
for name_str in name_strings:
tagged_sname['name-string'].append(KerberosString(name_str))
krb_error['sname'] = tagged_sname
if e_text:
tagged_etext = KerberosString(e_text).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 11)
)
krb_error['e-text'] = tagged_etext
if e_data:
tagged_edata = univ.OctetString(e_data).subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 12)
)
krb_error['e-data'] = tagged_edata
return der_encoder.encode(krb_error)
def _build_initial_context_token(
self,
this_mech: str,
inner_token: bytes,
) -> bytes:
# Wrap in InitialContextToken (APPLICATION 0)
# Structure: 0x60 <length> <OID> <innerContextToken>
oid_bytes = der_encoder.encode(univ.ObjectIdentifier(this_mech))
content_length = len(oid_bytes) + len(inner_token)
if content_length < 128:
length_bytes = bytes([content_length])
else:
# Long form: 0x80 | num_octets, followed by length octets
length_octets = content_length.to_bytes((content_length.bit_length() + 7) // 8, 'big')
length_bytes = bytes([0x80 | len(length_octets)]) + length_octets
return b"\x60" + length_bytes + oid_bytes + inner_token
def _build_neg_token_resp(
self,
neg_state: int | NegState,
response_token: bytes | None = None,
supported_mech: str | None = None,
) -> bytes:
neg_token_resp = NegTokenResp().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
neg_token_resp['negState'] = neg_state
if response_token:
neg_token_resp['responseToken'] = response_token
if supported_mech:
neg_token_resp['supportedMech'] = supported_mech
negotiation_token = NegotiationToken()
negotiation_token['negTokenResp'] = neg_token_resp
return der_encoder.encode(negotiation_token)
class CredSSPTestHandler(BaseHTTPRequestHandler):
"""HTTP handler for testing CredSSP authentication."""
# Use HTTP/1.1 for persistent connections
protocol_version = "HTTP/1.1"
# Class-level storage for contexts (shared across all instances)
contexts: dict[str, CredSSPContext] = {}
done_contexts: set[str] = set()
def __init__(
self,
*args,
credssp_kwargs: dict[str, object],
**kwargs,
) -> None:
"""Initialize handler with CredSSP configuration.
Args:
credssp_kwargs: Options to pass to the CredSSPContext.
"""
self.credssp_kwargs = credssp_kwargs
super().__init__(*args, **kwargs)
def log_message(self, format, *args):
"""Override to add custom formatting."""
print(f"[HTTP] {format % args}")
def do_POST(self):
"""Handle POST request with CredSSP authentication."""
# Keep connection alive for multi-round authentication
self.close_connection = False
print("\n" + "="*80)
conn_id = f"{self.client_address[0]}:{self.client_address[1]}"
print(f"Request from {conn_id}")
print("="*80)
if conn_id in self.done_contexts:
print("[HTTP] Processing already authenticated context.")
self.send_response(200)
self.end_headers()
return
auth_header = self.headers.get('Authorization', '')
if auth_header.startswith('CredSSP '):
token_b64 = auth_header[8:]
print(f"Authorization: CredSSP {token_b64}\n")
else:
if auth_header:
print(f"Authorization: {auth_header}\n")
if not auth_header:
self._send_auth_required()
return
if not auth_header.startswith('CredSSP '):
self.send_response(400)
self.end_headers()
return
# Decode token
try:
client_token = base64.b64decode(token_b64)
# Identify TLS record
if len(client_token) >= 5:
record_type = client_token[0]
record_types = {
0x14: "ChangeCipherSpec",
0x15: "Alert",
0x16: "Handshake",
0x17: "Application Data"
}
print(f"TLS Record Type: {record_types.get(record_type, f'Unknown ({record_type:#x})')}")
if record_type == 0x16 and len(client_token) >= 6:
hs_type = client_token[5]
hs_types = {1: "ClientHello", 2: "ServerHello", 11: "Certificate", 16: "ClientKeyExchange", 20: "Finished"}
print(f"Handshake Type: {hs_types.get(hs_type, f'Unknown ({hs_type})')}")
except Exception as e:
print(f"Token decode error: {e}")
self.send_response(400)
self.end_headers()
return
# Get or create context
if conn_id not in self.contexts:
self.contexts[conn_id] = CredSSPContext(**self.credssp_kwargs)
context = self.contexts[conn_id]
# Process token
try:
server_token = context.process_tls_token(client_token)
if context.completed:
print("Authentication complete!\n")
self.send_response(200)
del self.contexts[conn_id]
self.done_contexts.add(conn_id)
else:
self.send_response(401)
if server_token:
response_b64 = base64.b64encode(server_token).decode('ascii')
print(f"[Outgoing CredSSP Token] {len(server_token)} bytes")
print(f"WWW-Authenticate: CredSSP {response_b64}\n")
self.send_header('WWW-Authenticate', f'CredSSP {response_b64}')
elif not context.completed:
self.send_header('WWW-Authenticate', 'CredSSP')
self.send_header('Content-Length', '0')
self.end_headers()
except Exception as e:
print(f"Processing error: {e}")
traceback.print_exc()
self.send_response(403)
self.end_headers()
if conn_id in self.contexts:
del self.contexts[conn_id]
def _send_auth_required(self):
"""Send 401 requesting CredSSP."""
self.send_response(401)
self.send_header('WWW-Authenticate', 'CredSSP')
self.send_header('Content-Length', '0')
self.end_headers()
class Krb5PasswordPrompter(krb5.Krb5Prompt):
def prompt(
self,
msg: bytes,
hidden: bool,
) -> bytes:
prompt_str = f"Prompt when retrieving Kerberos TGT for service principal - {msg.decode()}"
if not prompt_str.endswith(": "):
prompt_str += ": "
return getpass.getpass(prompt_str).encode()
def get_service_tgt(
krb5_ctx: krb5.Context,
principal: str | None = None,
password: str | None = None,
) -> tuple[krb5.Principal, krb5.KeyBlock, bytes] | None:
"""Attempts to get a TGT for the specified principal either from the cache or through password authentication."""
default_ccache = krb5.cc_default(krb5_ctx)
try:
ccache_entries = list(default_ccache)
except krb5.Krb5Error:
ccache_entries = []
b_principal = principal.encode() if principal else None
matched_cred: tuple[krb5.Principal, krb5.KeyBlock, bytes] | None = None
for entry in ccache_entries:
server_names = entry.server.components
if not (server_names and server_names[0] == b"krbtgt"):
continue
if not b_principal or (b_principal and entry.client.name == b_principal):
matched_cred = (entry.client, entry.keyblock, entry.ticket)
break
if matched_cred:
return matched_cred
if not b_principal:
return None
krb5_principal = krb5.parse_name_flags(krb5_ctx, b_principal)
init_opt = krb5.get_init_creds_opt_alloc(krb5_ctx)
krb5.get_init_creds_opt_set_canonicalize(init_opt, True)
cred = krb5.get_init_creds_password(
krb5_ctx,
krb5_principal,
init_opt,
password=password.encode() if password else None,
prompter=None if password else Krb5PasswordPrompter(),
)
return (cred.client, cred.keyblock, cred.ticket)
def main() -> None:
"""Run the CredSSP test server."""
parser = argparse.ArgumentParser(
description='''CredSSP Test Server
This server implements a CredSSP authentication test server that can logs the
incoming CredSSP authentication tokens and performs the typical authentication
steps. It is designed to troubleshoot and analyze client CredSSP authentication
flows, specifically WinRM based clients.
''',
epilog='''
Example usage:
%(prog)s \\
--keytab wsmantest.domain.test.keytab \\
--hostname wsmantest.domain.test \\
--port 5985
Platform requirements:
This server cannot run on Windows as it requires the krb5 and gssapi Python
libraries which are only available on Linux/macOS.
Keytab requirements
For standard Kerberos authentication (what Python CredSSP clients use), the
keytab must contain the service principal keys for the SPN the client is
requesting. Typically this is HTTP/hostname or WSMAN/hostname. It is easy
to just generate a keytab with both of these SPNs to cover both cases.
For Kerberos User to User (U2U) authentication, which Windows clients use,
the keytab is not used. Instead the server needs to have a valid TGT for the
service principal so either call kinit principal@DOMAIN.TEST before starting
the server or specify the --principal and --principal-password arguments. It
is important this principal has the WSMAN/hostname servicePrincipalName
registered in AD.
To set up a Kerberos Principal for testing run the following PowerShell code:
# Create AD user with AES256 encryption enabled and SPNs
$domain = (Get-ADRootDSE).defaultNamingContext -replace 'DC=','' -replace ',', '.'
$user = 'wsmantest'
$password = 'MyPassword123!'
$passwordSecureString = $password | ConvertTo-SecureString -AsPlainText -Force
$userParams = @{
Name = $user
AccountPassword = $passwordSecureString
Enabled = $true
KerberosEncryptionType = 'AES256'
PasswordNeverExpires = $true
UserPrincipalName = "$user@$domain"
}
New-ADUser @userParams
# Generate keytab with ktpass (run on Windows Domain Controller)
# This is required if using the Standard Kerberos authentication mode and not
# U2U.
$ktPassCommon = @(
"/out", "$user.keytab"
"/mapuser", "$user@$domain"
"-setupn"
"/pass", $password
"-setpass"
"/crypto", "AES256-SHA1"
"/ptype", "KRB5_NT_PRINCIPAL"
"/rawsalt", "$($domain.ToUpper())$user"
)
ktpass.exe @ktPassCommon /princ "WSMAN/$user@$($domain.ToUpper())"
ktpass.exe @ktPassCommon /princ "WSMAN/$user.$domain@$($domain.ToUpper())" /in "$user.keytab"
ktpass.exe @ktPassCommon /princ "HTTP/$user@$($domain.ToUpper())" /in "$user.keytab"
ktpass.exe @ktPassCommon /princ "HTTP/$user.$domain@$($domain.ToUpper())" /in "$user.keytab"
To start the test server, make sure the keytab generated above has been copied
locally. Use kinit to get the TGT for the service principal and run:
kinit wsmantest@DOMAIN.TEST
%(prog)s --keytab wsmantest.keytab --hostname wsmantest.domain.test \\
--port 5985
To test a Windows client connecting using CredSSP to this test server you need
to update the client's host file to point wsmantest, wsmantest.domain.test to
our test server:
# Update Windows client hosts file to point to test server
# Add to C:\\Windows\\System32\\drivers\\etc\\hosts:
192.168.1.100 wsmantest wsmantest.domain.test
From there you can use this WSMan command to connect to the test server and
trigger the CredSSP authentication flow:
$user = '...'
$pass = '...'
winrm get winrm/config -r:http://wsmantest.domain.test:5985/wsman -auth:CredSSP -username:$user -password:$pass
You may need to configure the WSMan client to allow CredSSP to such a target in
the trusted hosts.
''',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--hostname',
required=True,
help='Server hostname (used in certificate CN)',
)
parser.add_argument(
'--keytab',
help='Path to Kerberos keytab file containing service principal keys. If not provided, the '
'server attempts to use system keytab (/etc/krb5.keytab).',
)
parser.add_argument(
'--principal',
help='Kerberos service principal name in format SERVICE/hostname@REALM '
'(e.g., WSMAN/wsmantest.domain.test@DOMAIN.TEST). '
'Used for U2U authentication to obtain service TGT.',
)
parser.add_argument(
'--principal-password',
help='Password for service principal. If not provided, will attempt to acquire TGT from '
'client keytab, credential cache, or prompt interactively if needed for U2U.',
)
parser.add_argument(
'--port',
type=int,
default=5985,
help='Port to listen on (default: 5985)',
)
args = parser.parse_args()
cert_pem, key_pem, server_public_key = generate_self_signed_cert(args.hostname)
krb5_ctx = krb5.init_context()
unique_kt_name = f"MEMORY:unique_{os.urandom(8).hex()}".encode()
krb5_keytab = krb5.kt_resolve(krb5_ctx, unique_kt_name)
# service_tgt = get_service_tgt(krb5_ctx, args.principal, password="YourSecurePassword123!")
principal = args.principal
service_tgt = get_service_tgt(krb5_ctx, principal, password=args.principal_password)
if service_tgt:
principal = service_tgt[0].name.decode()
krb5.kt_add_entry(krb5_ctx, krb5_keytab, service_tgt[0], 0, 0, service_tgt[1])
# We also copy any keytab entries in our explicit keytab path or the system
# default keytab into our in-memory keytab so they can be used.
if args.keytab:
kt_path = pathlib.Path(args.keytab).resolve()
system_kt = krb5.kt_resolve(krb5_ctx, str(kt_path).encode())
else:
system_kt = krb5.kt_default(krb5_ctx)
keytab_path = system_kt.name.decode()
try:
system_kt_entries = list(system_kt)
except krb5.Krb5Error:
system_kt_entries = []
print("="*80)
print("CredSSP Test Server")
print("="*80)
print(f"Hostname: '{args.hostname}'")
print(f"Port: {args.port}")
if principal:
print(f"U2U Principal: '{principal}'")
else:
print("U2U Principal: None (Kerberos U2U authentication will not work without a valid service TGT)")
print(f"Keytab: '{keytab_path}'")
if system_kt_entries:
for entry in system_kt_entries:
print(f" {str(entry)}")
krb5.kt_add_entry(krb5_ctx, krb5_keytab, entry.principal, entry.kvno, entry.timestamp, entry.key)
else:
print(" No entries found in keytab, normal Kerberos authentication will not work.")
print("="*80)
print(f"Listening on 0.0.0.0:{args.port} - Program '{sys.executable}' - PID {os.getpid()}\n")
httpd = HTTPServer(('0.0.0.0', args.port), lambda *handler_args, **handler_kwargs: CredSSPTestHandler(
*handler_args,
credssp_kwargs={
'cert_pem': cert_pem,
'key_pem': key_pem,
'server_public_key': server_public_key,
'keytab': unique_kt_name if len(list(krb5_keytab)) else None,
'service_tgt': service_tgt,
},
**handler_kwargs
))
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n\nShutting down...")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment