Created
December 25, 2018 15:17
-
-
Save linjunpop/3cf6275c8823310f7c33539753d9500a to your computer and use it in GitHub Desktop.
Building A continues data processing system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Archiver.Fetcher do | |
use GenStage | |
def start_link(args) do | |
GenStage.start_link(__MODULE__, args, name: __MODULE__) | |
end | |
def init(state), do: {:producer, state} | |
def handle_demand(demand, state) do | |
items = Database.get_items() | |
{:noreply, items, state} | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Archiver.Uploader do | |
use GenStage | |
def start_link(args) do | |
GenStage.start_link(__MODULE__, args, name: __MODULE__) | |
end | |
def init(_state) do | |
{:consumer, :the_state_does_not_matter, subscribe_to: [Archiver.Fetcher]} | |
end | |
def handle_events(items, _from, _state) do | |
items | |
|> Enum.map(&Task.start_link(Dropbox, :upload, [&1])) | |
{:noreply, [], :the_state_does_not_matter} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment