Skip to content

Instantly share code, notes, and snippets.

@typenoob
Last active October 30, 2025 03:31
Show Gist options
  • Select an option

  • Save typenoob/7598e5b6639191ef9af7fea874f1c694 to your computer and use it in GitHub Desktop.

Select an option

Save typenoob/7598e5b6639191ef9af7fea874f1c694 to your computer and use it in GitHub Desktop.
华为交换机TLS连接限制
- sal中需包含 RSA+SHA256
- cipher中需包含 AES256-SHA256:AES256-SHA:AES128-SHA中的一个
- max_sal=18
@typenoob
Copy link
Copy Markdown
Author

typenoob commented Oct 30, 2025

from dataclasses import dataclass
import os
import re
import requests
from typing import Optional, Dict, Any
from urllib.parse import parse_qs
import ssl
import urllib.request
import requests
from urllib3.util.ssl_ import create_urllib3_context
import warnings

# 禁用 SSL 警告
warnings.filterwarnings('ignore', message='Unverified HTTPS request')

DriverSkipError = Exception("Driver skip error")
DriverNotMatchError = Exception("Driver not match error")
DriverAuthError = Exception("Driver auth error")


class CustomSSLContext(requests.adapters.HTTPAdapter):
    def init_poolmanager(self, *args, **kwargs):
        # 创建 SSL 上下文
        context = create_urllib3_context()

        # 设置 TLS 版本
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        context.set_server_sigalgs(
            "RSA+SHA384:RSA+SHA512:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:DSA+SHA256:RSA+SHA256")
        context.set_ciphers("AES256-SHA256:AES256-SHA:AES128-SHA")
        context.max_version = ssl.TLSVersion.TLSv1_2
        context.minimum_version = ssl.TLSVersion.TLSv1_2
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE

        kwargs['ssl_context'] = context
        return super().init_poolmanager(*args, **kwargs)


@dataclass
class DeviceConfig:
    username: str
    password: str
    payload: str
    login_uri: str
    config_uri: str
    test_uri: str


class HWClient:
    def __init__(self):
        self.name = "HUAWEI"
        self.ip = None
        self.token = None
        self.session = None
        self.config = DeviceConfig(
            username=os.getenv("USERNAME", ""),
            password=os.getenv("PASSWORD", ""),
            payload="UserName={}&Password={}&Edition=0",
            login_uri="https://{}/login.cgi?_=0.5615007049300567",
            config_uri="https://{}/config.cgi?_=0.05967055919149211",
            test_uri="https://{}/simple/style/default/image/a.png"
        )
        self._init_session()

    def _init_session(self):
        """初始化 requests session"""
        self.session = requests.Session()
        self.session.mount('https://', CustomSSLContext())
        # 配置 session 选项
        self.session.verify = False  # 忽略 SSL 证书验证
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0",
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
        })
        # 设置超时
        self.session.request = lambda method, url, **kwargs: requests.Session.request(
            self.session, method, url, **kwargs
        )

    def init(self, args: Dict[str, Any]) -> None:
        self.ip = args["ip"]

        # 这里假设 _log 模块的功能,需要根据实际情况调整
        # if entry, exists := _log.GetEntryByIP(c.ip); exists && entry.Code == 0:
        #     return DriverSkipError

        if self.test_connection() is not None:
            return DriverNotMatchError

        self.auth()
        if not self.token:
            print(f"密码错误,交换机ip:{self.ip}")
            return DriverAuthError

        return None

    def destroy(self):
        """清理资源"""
        if self.session:
            self.session.close()
            self.session = None

    def auth(self):
        """认证获取token"""
        url = self.config.login_uri.format(self.ip)
        payload = self.config.payload.format(self.config.username, self.config.password)

        try:
            response = self.session.post(
                url,
                data=payload,
                headers={
                    "Content-Type": "application/x-www-form-urlencoded; text/xml; charset=UTF-8"
                }
            )

            if response.status_code == 200:
                # 解析token
                params = parse_qs(response.text)
                self.token = params.get("Token", [None])[0]
                print(f"认证成功,Token: {self.token}")
            else:
                self.token = None
                print(f"认证失败,HTTP状态码: {response.status_code}")

        except requests.exceptions.RequestException as e:
            print(f"认证请求失败: {e}")
            self.token = None

    def test_connection(self) -> Optional[Exception]:
        """测试连接"""
        url = self.config.test_uri.format(self.ip)

        try:
            # 使用更短的超时时间进行连接测试
            response = self.session.get(url, timeout=3)
            print(f"连接测试成功,HTTP状态码: {response.status_code}")
            if response.status_code == 200:
                return None
            else:
                return Exception("driver not match")

        except requests.exceptions.Timeout:
            print("连接测试超时")
            return Exception("connection timeout")
        except requests.exceptions.RequestException as e:
            print(f"连接测试失败: {e}")
            return Exception("connection failed")

    def get_host_name(self) -> Optional[str]:
        """获取主机名"""
        if not self.token:
            print("未认证,无法获取主机名")
            return None

        url = self.config.config_uri.format(self.ip)

        payload = '''MessageID=292&<rpc message-id="292" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
        <edit-config operation="merge">
        <target><running/></target><error-option>stop-on-error</error-option>
        <config><featurename istop="true" type="cli"><display>current-configuration | include sysname</display></featurename></config>
        </edit-config>
        </rpc>]]>]]>'''

        try:
            response = self.session.post(
                url,
                data=payload,
                headers={
                    "Content-Type": "application/x-www-form-urlencoded; text/xml; charset=UTF-8",
                    "Token": self.token
                },
                cookies={"UserName": self.config.username}
            )

            if response.status_code == 200:
                # 更新 token
                token_match = re.search(r'Token=([A-Za-z0-9]+)', response.text)
                if token_match:
                    self.token = token_match.group(1)

                host_name = self.extract_name_from_response(response.text)
                return host_name
            else:
                print(f"获取主机名失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            print(f"获取主机名失败: {e}")
            return None

    def get_device_name(self) -> Optional[str]:
        """获取设备名"""
        if not self.token:
            print("未认证,无法获取设备名")
            return None

        url = self.config.config_uri.format(self.ip)

        payload = '''MessageID=293&<rpc message-id="293" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
        <edit-config operation="merge">
        <target><running/></target><error-option>stop-on-error</error-option>
        <config><featurename istop="true" type="cli"><display>version | include Switch</display></featurename></config>
        </edit-config>
        </rpc>]]>]]>'''

        try:
            response = self.session.post(
                url,
                data=payload,
                headers={
                    "Content-Type": "application/x-www-form-urlencoded; text/xml; charset=UTF-8",
                    "Token": self.token
                },
                cookies={"UserName": self.config.username}
            )

            if response.status_code == 200:
                # 更新 token
                token_match = re.search(r'Token=([A-Za-z0-9]+)', response.text)
                if token_match:
                    self.token = token_match.group(1)

                device_name = self.extract_name_from_response(response.text)
                return device_name
            else:
                print(f"获取设备名失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            print(f"获取设备名失败: {e}")
            return None

    def get_name(self) -> str:
        return self.name

    @staticmethod
    def extract_name_from_response(response_text: str) -> str:
        lines = response_text.strip().split('\n')

        if not lines:
            raise ValueError("提取主机名或设备类型失败!响应内容为空")

        # 取最后一行
        last_line = lines[-1].strip()

        # 用空格分割
        words = last_line.split()

        # 检查是否有至少2个单词
        if len(words) >= 2:
            return words[1]  # 返回第二个单词
        else:
            raise ValueError("提取主机名或设备类型失败!最后一行格式不正确")

    def __enter__(self):
        """支持上下文管理器"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文时清理资源"""
        self.destroy()


# 注册函数(如果需要保持类似的注册机制)
def register_client():
    """注册客户端工厂函数"""
    return HWClient()


if __name__ == "__main__":
    os.environ['USERNAME'] = 'admin'
    os.environ['PASSWORD'] = 'P@ssw5rd'

    # 使用上下文管理器确保资源清理
    with HWClient() as client:
        # 使用示例
        result = client.init({"ip": "192.168.128.19"})
        if result is None:
            print("初始化成功")
            host_name = client.get_host_name()
            device_name = client.get_device_name()
            print(f"主机名: {host_name}")
            print(f"设备名: {device_name}")
        else:
            print(f"初始化失败: {result}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment