Last active
June 4, 2024 15:21
-
-
Save buehler/97da456110c696f6ecbbabf0b70229ed to your computer and use it in GitHub Desktop.
Generic Dart Isolate Worker, allows spawning a new isolate in dart and compute _anything_ in futures or streams that can be transmitted to isolates. Works with closures instead of messages.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'dart:isolate'; | |
class IsolateWorker { | |
IsolateWorker._(this._receivePort, this._sendPort); | |
final ReceivePort _receivePort; | |
final SendPort _sendPort; | |
bool _isDisposed = false; | |
static Future<IsolateWorker> create({String? debugName}) async { | |
final receivePort = ReceivePort(); | |
await Isolate.spawn( | |
_workerEntry, | |
receivePort.sendPort, | |
debugName: debugName, | |
); | |
return IsolateWorker._(receivePort, await receivePort.first as SendPort); | |
} | |
static void _workerEntry(SendPort sendPort) { | |
final receivePort = ReceivePort(); | |
sendPort.send(receivePort.sendPort); | |
late final StreamSubscription sub; | |
sub = receivePort.listen((message) async { | |
switch (message) { | |
case _ComputeCall(:final sendPort, :final computation): | |
sendPort.send(await computation()); | |
break; | |
case _StreamComputeCall(:final sendPort, :final computation): | |
await for (final value in computation()) { | |
sendPort.send(value); | |
} | |
sendPort.send(_StreamDone()); | |
break; | |
default: | |
sub.cancel(); | |
receivePort.close(); | |
} | |
}); | |
} | |
Future<R> compute<R>(FutureOr<R> Function() computation) async { | |
if (_isDisposed) { | |
throw StateError('Worker is already disposed'); | |
} | |
final sendPort = ReceivePort(); | |
_sendPort.send(_ComputeCall(sendPort.sendPort, computation)); | |
return await sendPort.first as R; | |
} | |
Stream<R> computeStream<R>(Stream<R> Function() computation) async* { | |
if (_isDisposed) { | |
throw StateError('Worker is already disposed'); | |
} | |
final sendPort = ReceivePort(); | |
_sendPort.send(_StreamComputeCall(sendPort.sendPort, computation)); | |
await for (final value in sendPort) { | |
if (value is _StreamDone) { | |
break; | |
} | |
yield value as R; | |
} | |
} | |
void dispose() { | |
try { | |
_isDisposed = true; | |
_sendPort.send(null); | |
_receivePort.close(); | |
} catch (e) { | |
print('Failed to dispose worker (possible double-dispose?): $e'); | |
} | |
} | |
} | |
class _ComputeCall<R> { | |
_ComputeCall(this.sendPort, this.computation); | |
final SendPort sendPort; | |
final FutureOr<R> Function() computation; | |
} | |
class _StreamComputeCall<R> { | |
_StreamComputeCall(this.sendPort, this.computation); | |
final SendPort sendPort; | |
final Stream<R> Function() computation; | |
} | |
class _StreamDone {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment