Last active
March 18, 2018 23:22
-
-
Save Trevoke/dfa6d886f12d85d7d1d54103d4a808b5 to your computer and use it in GitHub Desktop.
producer_consumer does not modify events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Usage: mix run lib/producer_consumer.exs | |
# | |
# Hit Ctrl+C twice to stop it. | |
# | |
# This is a base example where a producer A emits items, | |
# which are amplified by a producer consumer B and printed | |
# by consumer C. | |
defmodule A do | |
use GenStage | |
def init(counter) do | |
{:producer, counter} | |
end | |
def handle_demand(demand, counter) when demand > 0 do | |
# If the counter is 3 and we ask for 2 items, we will | |
# emit the items 3 and 4, and set the state to 5. | |
events = Enum.to_list(counter..counter+demand-1) | |
# events = Enum.take(events, 5) | |
IO.puts "\n\nA ------------------------------" | |
<> "\ndemand: #{demand}, counter: #{counter}" | |
# <> "\nevents: #{inspect(events, limit: :infinity)}" | |
<> "\n[#{List.first(events)} .. #{List.last(events)}], length: #{length(events)}" | |
{:noreply, events, counter + demand} | |
end | |
end | |
defmodule B do | |
use GenStage | |
def init(number) do | |
{:producer_consumer, number} | |
end | |
def handle_events(events, _from, number) do | |
# If we receive [0, 1, 2], this will transform | |
# it into [0, 1, 2, 1, 2, 3, 2, 3, 4]. | |
# new_events = | |
# for event <- events, | |
# entry <- event..event+number, | |
# do: entry | |
new_events = events | |
tab = " " | |
IO.puts "\n\n#{tab}B ------------------------------" | |
# <> "\n#{tab}number: #{number}" | |
# <> "\nevents: #{inspect(events, limit: :infinity)}" | |
<> "\n#{tab}[#{List.first(events)} .. #{List.last(events)}], length: #{length(events)}" | |
# <> "\nnew_events: #{inspect(new_events, limit: :infinity)}" | |
<> "\n#{tab}[#{List.first(new_events)} .. #{List.last(new_events)}], length: #{length(new_events)}" | |
{:noreply, new_events, number} | |
end | |
end | |
defmodule C do | |
use GenStage | |
def init(:ok) do | |
{:consumer, :the_state_does_not_matter} | |
end | |
def handle_events(events, _from, state) do | |
tab = " " | |
IO.puts "\n\n#{tab}#{tab}C 1------------------------------" | |
# Wait | |
:timer.sleep(5000) # 10000 is 10 seconds | |
IO.puts "\n\n#{tab}#{tab}C 2------------------------------" | |
<> "\n#{tab}#{tab}state: #{state}" | |
# <> "\nevents: #{inspect(events, limit: :infinity)}" | |
<> "\n#{tab}#{tab}[#{List.first(events)} .. #{List.last(events)}], length: #{length(events)}" | |
# We are a consumer, so we would never emit items. | |
{:noreply, [], state} | |
end | |
end | |
{:ok, a} = GenStage.start_link(A, 0) # starting from zero | |
{:ok, b} = GenStage.start_link(B, 2) # expand by 2 | |
{:ok, c} = GenStage.start_link(C, :ok) # state does not matter | |
IO.puts " B 400-700 C 300-500" | |
GenStage.sync_subscribe(b, to: a, min_demand: 400, max_demand: 700) | |
GenStage.sync_subscribe(c, to: b, min_demand: 300, max_demand: 500) # max_demand cannot be more than 700 | |
Process.sleep(:infinity) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment