- Sheet of semantics for various methods under the hyper and race paradigms
- Prototyping work by lizmat
All these names are provisional.
A role is extracted containing various of the methods in Seq
. Working name: Seqqy
(after Setty
, Baggy
), but
hope we'll end up with something better. For now, the structure is more important than the naming. The following
methods currently in Seq
will move to this role:
- Array
- List
- Slip
- Str
- Stringy
- AT-POS
- EXISTS-POS
- eager
- fmt
- gist
- invert (why is this in Seq, not Any?)
- new-consumed
This role will also do PositionalBindFailover
, which provides:
- cache
- list
Type representing an operation in a hyper pipeline. Does Seqqy
. Used to build up a chain of
operations that should operate under the hyper paradigm.
Type representing an operation in a race pipeline. Does Seqqy
. Used to build up a chain of
operations that should operate under the race paradigm.
Like Seq
, both HyperSeq
and RaceSeq
should be immutable types (so, we want more of a
chain style construction than the mutable @!actions
approach that has been suggested).
As with Seq
, no work happens until something either obtains a sequential iterator or
sinks the chain (we may implement the latter in terms of .iterator.sink-all
anyway).
For hyper
/race
pipelines terminated with a sequential iteration, the rate of this iteration
is used as a back-pressure mechanism. The sequential iteration determines when we start new
batches of work, and we will never send off new batches for parallel processing when there
are more than $degree
outstanding completed batches.
For example, if the degree is 4, then we would immediately start producing 4 batches and start
workers to process them. The consumer starts eating the first, so we schedule another batch to
be produced and worked on. The other batches complete in the meantime, and then the 5th. If we
have a slow consumer, it may still be processing the results of the first batch. There are now
4 that it has not even started eating, so we don't start any more parallel work. When the consumer
then starts to eat the second batch, we are down to 4 outstanding completed batches, and have
currently no work ongoing, so we schedule 4 more batches to the worked on. These produce results.
If the consumer were to still be on the second batch after this, we'd now be up to 6 outstanding
batches, and would not schedule any more work until the consumer has eaten 2 more, so we're back
down to four outstanding batches. This means that, memory wise, we have a limit of 2 * $degree
batches in memory at a time as a result of the parallel pipeline.
A work batch represents a batch of items that will be processed by a worker. For now we keep this as a Rakudo internal type.
class Rakudo::Internals::HyperWorkBatch {
# The items in the batch.
has IterationBuffer $.items;
# Sequence number of the batch, starting from zero.
has int $.sequence-number;
# Is this the first batch that was produced at the last fork point or the last batch that the
# fork point will produce?
has Bool $.first;
has Bool $.last;
# Any extra state that an operation wishes to convey as a result of processing the batch.
has Mu $.extra;
# A callback set by the coordinator so it can take action if needed when the batch has
# been consumed.
has &.consumed-callback is rw;
method mark-consumed(--> Nil) {
.() with &!consumed-callback;
}
}
There are a number of repeating patterns that show up across operations that we want to write parallel implementations of. These will be captured by roles, which will be implemented concretely by operations. Note that these are intended to be lower level that the higher level operations users perform. Some user-level operations will produce multiple work stages. Work stages form a fork/join graph, which in the simplest cases will be a single fork, some work on batches, and a join, but may in more complex cases have a number of join/fork points. Examples will be given after each of the types of work stage has been introduced.
Work stages form a linked list, with each stage pointing to the one before it. This is contained in a role.
role Rakudo::Internals::HyperWorkStage {
has Rakudo::Internals::HyperWorkStage $.source;
}
This is a stage that wants to work at an individual batch level. It works in-place on the
Rakudo::Internals::HyperWorkBatch
instance that it is passed.
role Rakudo::Internals::HyperProcessor does Rakudo::Internals::HyperWorkStage {
method process-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
}
There are no operation-agnostic implementations of this role.
This is a stage that takes a sequence of things and breaks them up into batches. A pipeline must start with one of these.
role Rakudo::Interanls::HyperBatcher does Rakudo::Internals::HyperWorkStage {
has $!sequence = 0;
method next-sequence-number() {
$!sequence++
}
method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { ... }
}
Many sequences will start with a Rakudo::Internals::HyperBatcher::Iterator
, which makes batches
out of an iterator. This would look something like:
class Rakudo::Interanls::HyperBatcher does Rakudo::Interanls::HyperBatcher {
my constant NO_LOOKAHEAD = Mu.new;
has Iterator $.iterator is required;
has $!lookahead = NO_LOOKAHEAD;
method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) {
my IterationBuffer $items .= new;
my Bool $first;
my Bool $last;
if $!lookahead === NO_LOOKAHEAD {
$first = True;
if $!iterator.push-exectly($batch-size, $items) === IterationEnd {
$last = True;
}
else {
$!lookahead = $!iterator.pull-one;
$last = True if $!lookahead === IterationEnd;
}
}
else {
$first = False;
$items.push($!lookahead);
if $!iterator.push-exectly($batch-size - 1, $items) === IterationEnd {
$last = True;
}
else {
$!lookahead = $!iterator.pull-one;
$last = True if $!lookahead === IterationEnd;
}
}
my $sequence-number = self.next-sequence-number();
return Rakudo::Interanls::HyperBatcher.new(:$sequence-number, :$items, :$first, :$last);
}
}
A hyper-batcher that didn't yet produce its last batch, but that isn't ready to produce another
one, can return a Rakudo::Interanls::HyperBatcher
type object.
This is a stage that takes a sequence of batches and joins them together. When no more batches
will be delivered, batches-completed
will be called. (Note that this is distinct from the
last
property on an individual batch; the worker working on the last batch may complete
ahead of other works, and so the batch marked last by the producer may well not be the last
one that accept-batch
will be called with.)
role Rakudo::Internals::HyperJoiner does Rakudo::Internals::HyperWorkStage {
method accept-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
method batches-completed(--> Nil) { ... }
}
A common implementation will be that placed at the end of a race
pipeline that will be
consumed in a sequential iteration. For that reason, it also implements the Iterator
API.
class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner does Iterator {
# Note we can use a ConcurrentQueue REPR object rather than a Channel, for optimization.
has Channel $.batches;
method accept-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
$batches.send($batch);
}
method batches-completed(--> Nil) { $batches.send(Mu) };
has IterationBuffer $!current-items;
method pull-one() {
until $!current-items { # Handles empty batches
my $batch = $!batches.receive // return IterationEnd;
$batch.mark-consumed();
$!current-items = $batch.items;
}
$!current-items.shift
}
}
Just like HyperJoiner
, except accept-batch
will be called with the batches in their
sequence order.
A HyperSeq
and a RaceSeq
contain the current head of the work stage chain, together
a configuration object that is obtained from the prior one.
my class HyperConfiguration {
has int $.batch;
has Int $.degree;
}
class HyperSeq {
has HyperConfiguration $.configuration;
has Rakudo::Internals::HyperWorkStage $!work-stage-head;
method !work-stage-head() { $!work-stage-head }
...
}
class RaceSeq {
has HyperConfiguration $.configuration;
has Rakudo::Internals::HyperWorkStage $!work-stage-head;
method !work-stage-head() { $!work-stage-head }
...
}
A pipeline is composed by passing the tail end of it to Rakudo::Internals::HyperCoordinator.start
.
This starts processing, and is in charge of coordinating the operation, including launching
workers.
class RaceSeq {
method iterator() {
my $iter = Rakudo::Internals::RaceToIterator.new(source => $!work-stage-head);
Rakudo::Internals::HyperCoordinator.start($iter);
$iter
}
}
Operations would be composed by implementing various stages. For example:
- Both hyper and race
map
on single items would be implemented as aHyperProcessor
- A
sort
with a projection would be implemented by aHyperProcessor
that does the projection of each item in the batch, putting it into theextra
slot, together with something doing bothHyperJoiner
andHyperBatchProducer
. This would only be able to start producing batches once it as received all input batches. - Hyper
unique
would be implemented by aHyperProcessor
that does theunique
on each block, followed by aSequencedHyperJoiner
that does a finaluniq
run over the items, maintinaing the state between them. For raceunique
it's just aHyperJoiner
. (There may be a way to share out the central "already seen" view to the workers also.)
The composer coordinates the overall operation. It is responsible for:
- Producing batches when we need them, and making sure we're only ever producing one at
a time from a given
BatchProducer
. - Making sure that we only call
accept-batch
on a given batcher one batch at a time, and only callbatches-completed
when we won't callaccept-batch
again. - Making sure that a sequence of
BatchProcessor
s are executed in a single worker for a given batch, for better locality of reference. - When an object both a batch joiner and a batch producer, making sure that we give it a chance to produce a batch each time after we've fed it a batch.
What about Sequency?