Created
May 20, 2020 18:48
-
-
Save eonil/bb3fc5efd4dd1b619e92ff975ba4b780 to your computer and use it in GitHub Desktop.
How to write imperative I/O control program with GCD (spawned thread) and Combine.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
final class Task<I,O> { | |
let control = PassthroughSubject<Int,Error>() | |
let report = PassthroughSubject<Int,Error>() | |
init() { | |
let i = Channel(control) | |
let o = Channel(report) | |
DispatchQueue.global().async { [i,o] in | |
do { | |
/// Here we can easily write programs with arbitrary complexity. | |
let m = try i.read() | |
guard m == 111 else { throw TaskError.unexpectedInput(m) } | |
switch m { | |
case 222: | |
for _ in 0..<100 { | |
let x = try i.read() | |
o.send(x * 2) | |
o.send(x * 10 + 1) | |
} | |
o.send(completion: .finished) | |
return | |
case 333: | |
guard m == 444 else { throw TaskError.unexpectedInput(m) } | |
o.send(999) | |
o.send(888) | |
guard m == 555 else { throw TaskError.unexpectedInput(m) } | |
o.send(111) | |
default: | |
o.send(completion: .finished) | |
return | |
} | |
} | |
catch let err { | |
o.send(completion: .failure(err)) | |
} | |
} | |
} | |
deinit { | |
} | |
} | |
enum TaskError<I>: Error { | |
case unexpectedInput(I) | |
} | |
final class Channel<T> { | |
let gcdq = DispatchQueue(label: "Channel") | |
let sema = DispatchSemaphore(value: 0) | |
let pass: PassthroughSubject<T,Error> | |
var pipes = [AnyCancellable]() | |
var latestValue: T? | |
var latestCompletion: Subscribers.Completion<Error>? | |
init(_ x:PassthroughSubject<T,Error>) { | |
pass = x | |
weak var ws = self | |
pass.receive(on: gcdq).sink( | |
receiveCompletion: { m in ws?.processCompletion(m) }, | |
receiveValue: { m in ws?.process(m) }) | |
.store(in: &pipes) | |
} | |
private func processCompletion(_ m:Subscribers.Completion<Error>) { | |
latestCompletion = m | |
sema.signal() | |
} | |
private func process(_ m:T) { | |
latestValue = m | |
sema.signal() | |
} | |
func read() throws -> T { | |
sema.wait() | |
return try gcdq.sync { | |
if let m = latestValue { | |
latestValue = nil | |
return m | |
} | |
if let m = latestCompletion { | |
throw ChannelCompletionError.complete(m) | |
} | |
throw ChannelCompletionError.cancel | |
} | |
} | |
func send(_ m:T) { | |
pass.send(m) | |
} | |
func send(completion x:Subscribers.Completion<Error>) { | |
pass.send(completion: x) | |
} | |
} | |
enum ChannelCompletionError: Error { | |
case complete(Subscribers.Completion<Error>) | |
case cancel | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment