Created
June 26, 2023 15:57
-
-
Save VaslD/a32b2ab63f0afcfd5760c3855989c7a2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Network | |
public typealias TCPClient = TCPConnection | |
/// TCP 节点(客户端)连接 | |
/// | |
/// 由于 TCP 是全双工通讯,服务端也需要作为节点建立连接。 | |
public final class TCPConnection: Sendable { | |
let queue: DispatchQueue | |
let connection: NWConnection | |
public init(_ connection: NWConnection) { | |
let pointer = Unmanaged.passUnretained(connection).toOpaque() | |
self.queue = DispatchQueue(label: "NWConnection (\(pointer))") | |
self.connection = connection | |
} | |
public init?(host: String, port: Int) { | |
guard let unsigned = UInt16(exactly: port), let port = NWEndpoint.Port(rawValue: unsigned) else { | |
return nil | |
} | |
self.queue = DispatchQueue(label: "NWConnection (\(host):\(port))") | |
self.connection = NWConnection(host: NWEndpoint.Host(host), port: port, using: .tcp) | |
} | |
public var state: NWConnection.State { | |
self.connection.state | |
} | |
public var peer: NWEndpoint { | |
self.connection.endpoint | |
} | |
public var localEndpoint: NWEndpoint? { | |
self.connection.currentPath?.localEndpoint | |
} | |
public var remoteEndpoint: NWEndpoint? { | |
self.connection.currentPath?.remoteEndpoint | |
} | |
/// 建立连接 | |
/// | |
/// 此方法需要 `await`,将在连接成功或异常后返回。当连接正在建立时,继续(并发)调用此方法将立即导致 `CancellationError` | |
/// 错误。在已建立的连接上调用此方法无效。 | |
public func connect() async throws { | |
guard self.connection.stateUpdateHandler == nil else { | |
throw CancellationError() | |
} | |
guard self.connection.state != .ready else { | |
return | |
} | |
return try await withUnsafeThrowingContinuation { continuation in | |
self.connection.stateUpdateHandler = { | |
switch $0 { | |
case .ready: | |
self.connection.stateUpdateHandler = nil | |
continuation.resume() | |
case let .failed(error): | |
self.connection.stateUpdateHandler = nil | |
continuation.resume(throwing: error) | |
case .cancelled: | |
self.connection.stateUpdateHandler = nil | |
continuation.resume(throwing: CancellationError()) | |
case let .waiting(error): | |
self.connection.stateUpdateHandler = nil | |
self.connection.cancel() | |
continuation.resume(throwing: error) | |
default: | |
break | |
} | |
} | |
self.connection.start(queue: self.queue) | |
} | |
} | |
/// 断开连接 | |
public func disconnect() { | |
self.connection.cancel() | |
} | |
/// 发送一条消息 | |
public func send(_ data: some DataProtocol) async throws { | |
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in | |
self.connection.send(content: data, completion: .contentProcessed { | |
if let error = $0 { | |
continuation.resume(throwing: error) | |
return | |
} | |
continuation.resume() | |
}) | |
} | |
} | |
/// 发送终止消息 | |
/// | |
/// 如需断开连接,提前调用此方法非必需;但一旦调用此方法,连接不可用于发送其他消息。 | |
public func finish() async throws { | |
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in | |
self.connection.send(content: nil, contentContext: .finalMessage, completion: .contentProcessed { | |
if let error = $0 { | |
continuation.resume(throwing: error) | |
return | |
} | |
continuation.resume() | |
}) | |
} | |
} | |
/// 获取接收到消息的异步流 | |
/// | |
/// ```swift | |
/// for try await message in connection.receive() { | |
/// // Read message… | |
/// } | |
/// ``` | |
/// | |
/// 如果接收过程中连接层出现异常,异步流将被错误中断,通常抛出 | |
/// `NWError`。取决于错误原因,重复调用此方法可能能够从异常中恢复、继续接收消息,也可能会立即再次出错。建议查询 | |
/// ``state`` 和关联错误以决定代码如何继续。 | |
public func receive() -> AsyncThrowingStream<Data, Error> { | |
AsyncThrowingStream { | |
try Task.checkCancellation() | |
return try await withUnsafeThrowingContinuation { continuation in | |
self.connection.receive(minimumIncompleteLength: 0, maximumLength: .max) { | |
_ = $1 | |
if let error = $3 { | |
continuation.resume(throwing: error) | |
return | |
} | |
if $2 { | |
self.connection.cancel() | |
} | |
continuation.resume(returning: $0) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment