<!-- livebook:{"persist_outputs":true} --> # Getting Lazy with Dataflow Graphs in Elixir ## Intro What do Tensorflow, Apache Airflow, Rule Engines, and Excel have in common? Under the hood they all use DAGs to model data-flow dependencies of the program. Using graphs to model programs is great because you can modify the program at *runtime*. Lets talk about doing this in Elixir for great good. Graph data structures have many use cases in computing. We'll focus on using Directed Acyclic Graphs (DAGs) for forward chaining dataflow models. This notebook illustrates how to use dataflow graphs in Elixir tackle tricky problems like expert systems / rule engines, dynamic data pipelines and more. ## What are Graphs?  [A graph](https://en.wikipedia.org/wiki/Graph_(abstract_data_type)) is > A graph data structure consists of a finite (and possibly mutable) set of vertices (also called nodes or points), together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph. These pairs are known as edges (also called links or lines), and for a directed graph are also known as edges but also sometimes arrows or arcs. ### TLDR; A graph is circles and lines. Nodes and connections. Vertices and edges. ## What about DAGs then? DAG stands for [*Directed Acyclic Graph*](https://en.wikipedia.org/wiki/Directed_acyclic_graph) i.e. Circles with lines that have an arrow between them. The "acyclic" means no "cycles" i.e. there cannot be a "loop back" where an arrow takes you back up the graph where you've already been.  ## DAGs / Dataflow Graphs are everywhere in computing * [Apache Airflow and similar ETL / Pipeline builder tools are DAGs](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html) * [Dagster](https://docs.dagster.io/concepts/ops-jobs-graphs/ops) * [Luigi](https://luigi.readthedocs.io/en/stable/workflows.html) * [Argo](https://argoproj.github.io/argo-workflows/core-concepts/) * [Apache BEAM and most related high throughput data processing frameworks are DAGs](https://beam.apache.org/documentation/basics/#pipeline) * [Flink](https://nightlies.apache.org/flink/flink-docs-release-1.8/concepts/programming-model.html) is a leading data processing framework that implements the Apache BEAM standard. * Apache Spark is also dataflow dags * [GCP Dataflow](https://cloud.google.com/dataflow) * AWS Data Pipeline * [Tensorflow is graphs](https://www.tensorflow.org/guide/intro_to_graphs) * Serverless / Workflow systems are DAGs * https://blog.acolyer.org/2020/02/07/cloudburst/ * AWS Lambda * Google Cloud Functions * [Azure Functions](https://www.microsoft.com/en-us/research/uploads/prod/2021/10/DF-Semantics-Final.pdf) * Rule & Workflow engines are DAGs * [RETE Algorithm](https://en.wikipedia.org/wiki/Rete_algorithm) * [Drools](https://www.drools.org/) * [Redhat Decision Manager](https://www.redhat.com/en/technologies/jboss-middleware/decision-manager) * [Camunda](https://camunda.com/) * [CPU scheduling is DAGs!](https://www.eidos.ic.i.u-tokyo.ac.jp/~huynh/files/hpdc16_poster.pdf) ## Bringing the program into the data The common use case for DAGs in dataflow models is to represent parts of your program as *data* where it can be transorfmed and composed at runtime. There are other ways we can represent a program at runtime like with a list of functions. The advantage of modeling the program as data is it allows for deferred execution. The caller can choose how and when to execute the function(s). #### Representation of program as data "Using a datastructure to hold a program/instructions for later use" ```elixir funs = [ fn num -> num + 1 end, fn num -> num * 2 end, fn num -> num - 1 end ] ``` #### Evaluation "Using that program-as-data to do stuff at runtime" ```elixir Enum.map(funs, & &1.(2)) ``` ```elixir funs |> Enum.map(& &1.(2)) |> Enum.sum() ``` ## Deps/Setup ```elixir Mix.install([ {:kino, "~> 0.5.2"}, {:libgraph, github: "bitwalker/libgraph", branch: "main"}, {:req, "~> 0.2.0"}, {:jason, "~> 1.1.0"}, {:floki, "~> 0.32.0"} ]) ``` ### Visualizations: Graph -> Mermaid ```elixir defmodule Kino.Mermaid do use Kino.JS def new(%Graph{} = graph) do graph |> to_mermaid() |> new() end def new(graph) do Kino.JS.new(__MODULE__, graph) end def to_mermaid(%Graph{} = graph) do Enum.reduce( Graph.edges(graph), "graph TD;", fn %{v1: v1, v2: v2}, acc -> acc <> "#{vertex_id(v1)}([#{vertex_name(v1)}])-->#{vertex_id(v2)}([#{vertex_name(v2)}]);" end ) end defp vertex_name(:root), do: :root defp vertex_name(%{work: fun}), do: "step_#{fun_name(fun)}" defp vertex_name(%{check: fun}), do: "cond_#{fun_name(fun)}" defp vertex_name(fun) when is_function(fun), do: "fun_#{fun_name(fun)}-#{:erlang.phash2(fun)}" defp vertex_name(vertex) when is_atom(vertex), do: to_string(vertex) defp vertex_name(otherwise) do try do to_string(otherwise) catch _any -> :erlang.phash2(otherwise) end end defp vertex_id(thing) when is_atom(thing), do: to_string(thing) defp vertex_id(thing), do: :erlang.phash2(thing) defp fun_name(fun), do: Function.info(fun, :name) |> elem(1) asset "main.js" do """ import "https://cdn.jsdelivr.net/npm/mermaid@8.13.3/dist/mermaid.min.js"; mermaid.initialize({ startOnLoad: false }); export function init(ctx, graph) { mermaid.render("graph1", graph, (svgSource, bindListeners) => { ctx.root.innerHTML = svgSource; bindListeners && bindListeners(ctx.root); }); } """ end end ``` ## Making a DAG ```elixir g = Graph.new(type: :directed) |> Graph.add_edges([ {:a, :b}, {:a, :c}, {:b, :d} ]) g |> Kino.Mermaid.new() ``` ## Making a dataflow graph of lambdas If functions are just data and graphs are data structures - what if we put functions in our graphs? ```elixir fun_1 = fn num -> num * 2 end fun_2 = fn num -> num + 2 end fun_3 = fn num -> num * 42 end fun_4 = fn num -> num + 42 end fun_dag = Graph.new(type: :directed) |> Graph.add_edges([ {:root, fun_1}, {fun_1, fun_2}, {fun_1, fun_3}, {fun_2, fun_4} ]) fun_dag |> Kino.Mermaid.new() ``` ## Evaluation of a DAG Now that we've got lambda functions in our dag - how to we evaluate it? 1. start at the top and run that function with the input to get a new output 2. find the next step(s) by traveling down the arrows (i.e. the `out_neighbors`) 3. feed the output from the previous step into the next steps 4. Profit TLDR; "Travel down the arrows" ```elixir fun_dag_input = Kino.Input.number("Num") ``` ```elixir input = Kino.Input.read(fun_dag_input) ``` ### Finding steps off of the root node ```elixir neighbors_of_root = Graph.out_neighbors(fun_dag, :root) ``` ### Building lazy "runnables" by pairing the function and its input to run with ```elixir first_runnables = Enum.map(neighbors_of_root, fn fun -> {fun, input} end) ``` ### Executing our runnables ```elixir first_result = first_runnables |> Enum.map(fn {fun, input} -> fun.(input) end) |> List.first() ``` ### Executing the next layer ```elixir fun_from_first_runnable = first_runnables |> List.first() |> elem(0) neighbors_of_first_runnable = fun_dag |> Graph.out_neighbors(fun_from_first_runnable) |> Enum.map(fn fun -> {fun, first_result} end) |> Enum.map(fn {fun, input} -> fun.(input) end) ``` ### Executing the next layer in parallel  ```elixir neighbors_of_first_runnable = fun_dag |> Graph.out_neighbors(fun_from_first_runnable) |> Task.async_stream(fn fun fun.(first_result) end) |> Enum.map(fn {:ok, result} -> result end) ``` ### Changing our DAG ```elixir fun_dag |> Kino.Mermaid.new() ``` ```elixir new_fun_dag = Graph.add_edge(fun_dag, fun_from_first_runnable, fn _num -> 42 end) new_fun_dag |> Kino.Mermaid.new() ``` ## What Else? What other features would a DAG / Pipeline buider tool like this want? ### Joins Steps with more than 1 parent dependency. ### Rules Steps with constraints controlling when to execute. ### State Accumulating data to act on or control behavior. ## Joins ```elixir join_example = Graph.new() |> Graph.add_edges([ {:root, :step_a}, {:root, :step_b}, {:step_a, :step_c}, {:step_b, :step_c} ]) join_example |> Kino.Mermaid.new() ``` Here we can only execute `step_c` when both `step_a` and `step_b` have resolved. This means we have to keep intermediate results somewhere in memory so we can grab the result of either a or b and use both to invoke step c. ## Rules A rule is a step with a constraint on when to execute it. A function is kind of like a rule where `fn :potato -> 42 end` will only return 42 when given `:potato`. The usual verbiage is the `lhs` and `rhs` or `condition` and `reaction`. <!-- livebook:{"break_markdown":true} --> > Rules engines are the OG AI solving business problems since the 60's. <!-- livebook:{"break_markdown":true} --> Rules are common abstractions for the representation of expert knowledge. They're handy because they're easily composed, evaluated, stored, and organized. ### Resources to check out for rule based systems * [RETE](https://en.wikipedia.org/wiki/Rete_algorithm) * [LEAPS](https://dl.acm.org/doi/book/10.5555/899216) * [Patrick Winston's MIT AI lectures](https://www.youtube.com/watch?v=TjZBTDzGeGg) * [RETEX - Elixir RETE implementation](https://github.com/lorenzosinisi/retex) ## Compiling and Evaluating Rules Say we want to make a graph with our example rule from before: <!-- livebook:{"force_markdown":true} --> ```elixir fn :potato -> 42 end ``` We know we need to evaluate the `lhs` before the `rhs` and only evaluate the `rhs` if the `lhs` returns `true`. We can do this by breaking out our one rule into the two pieces: <!-- livebook:{"force_markdown":true} --> ```elixir lhs = fn :potato -> true end rhs = fn _any -> 42 end ``` Now we need to get these into our DAG and since the `lhs` has to run first - we'll make sure the arrow goes `lhs -> rhs`. ```elixir potato_rule = Graph.new() |> Graph.add_edges([ {:is_potato?, 42} ]) potato_rule |> Kino.Mermaid.new() ``` ## Composition of Rules ```elixir composed_rule = Graph.new() |> Graph.add_edges([ {:root, :is_potato?}, {:is_potato?, :boil_em_mash_em}, {:root, :is_ham?}, {:is_ham?, :it_is_delicious}, {:root, :is_beet?}, {:is_beet?, :good_in_salad} ]) composed_rule |> Kino.Mermaid.new() ``` Now we could give our our Graph of Rules some inputs and get results like ``` > :ham :it_is_delicious > :potato boil_em_mash_em > :beet :good_in_salad ``` ## Conditional Expansion For Elixir and Erlang function calls - we're scoped to that module, function, and arity when evaluating. This has advantages because it means we can evaluate patterns top to bottom executing the RHS of whichever pattern matches first. In these cases - outside of optimizing your pattern matches you don't really need or want conditional expansion. However in the case of the evaluation of *many* rules where the conditions might overlap - its often useful to expand the expressions within the LHS into smaller conditions bound together in the graph. <!-- livebook:{"break_markdown":true} --> Here's an example of the initial approach of expanding just the LHS and RHS. <!-- livebook:{"force_markdown":true} --> ```elixir lhs = fn input when (input == :potato and input != :ham) or input == "potato" -> true end rhs = fn _any -> f2 end ``` But if we also bring in another rule such as <!-- livebook:{"force_markdown":true} --> ```elixir other_rule = fn :potato -> 42 end ``` We might be matching against `:potato` twice for the same input and do unecessary work. So if we're composing many rules with like conditions for the same execution context we can do this by expanding each individual expression as its own condition. <!-- livebook:{"break_markdown":true} --> <!-- livebook:{"force_markdown":true} --> ```elixir fn input when (input == :potato and input != :ham) or input == "potato" -> 42 end # full expansion lhs_1 = fn input when input == :potato -> true end lhs_1 = fn input when input != :ham -> true end lhs_or = fn input == "potato" -> true end ``` ```elixir expanded_rule = Graph.new() |> Graph.add_edges([ {:root, :lhs_1}, {:root, :lhs_2}, {:root, :other_path}, {:lhs_1, :conjunction_2_and_1}, {:lhs_2, :conjunction_2_and_1}, {:conjunction_2_and_1, :rhs}, {:other_path, :rhs} ]) expanded_rule |> Kino.Mermaid.new() ``` ## State Machines Joining ongoing expressions is stateful - sort of. To ensure our step dependencies are resolved we have to hold its dependents in memory and wait until the final step can be executed. A DAG of Lambda functions are easy to evaluate because they're **stateless** meaning our scheduler can blindly parallel map over steps keeping only the 1 step in context. <!-- livebook:{"break_markdown":true} --> <!-- livebook:{"force_markdown":true} --> ```elixir potato_lock = Dagger.state_machine( init: %{code: "potato", state: :locked, contents: "ham"}, reducer: fn :lock, state -> %{state | state: :locked} {:unlock, input_code}, %{code: code, state: :locked} = state when input_code == code -> %{state | state: :unlocked} _input_code, %{state: :unlocked} -> state end, reactors: [ fn %{state: :unlocked, contents: contents} -> contents end, fn %{state: :locked} -> {:error, :locked} end ] ) ``` ```elixir lock_state_machine = Graph.new() |> Graph.add_edges([ {:root, :arity_check_1}, {:arity_check_1, :is_lock_command?}, {:arity_check_1, :is_unlock_command?}, {:arity_check_1, :is_code_correct?}, {:is_lock_command?, :lock_conjunction}, {:is_unlock_command?, :lock_conjunction}, {:is_code_correct?, :lock_conjunction}, {:lock_conjunction, :lock_reducer}, {:root, :reactor_lhs_clause_1_matched?}, {:root, :reactor_lhs_clause_2_matched?}, {:reactor_lhs_clause_1_matched?, :reactor_rhs_1}, {:reactor_lhs_clause_2_matched?, :reactor_rhs_2} ]) Kino.Mermaid.new(lock_state_machine) ``` ## DIY Text Processing ```elixir defmodule TextProcessing do def tokenize(text) do text |> String.downcase() |> String.split(~R/[^[:alnum:]\-]/u, trim: true) end def count_words(list_of_words) do list_of_words |> Enum.reduce(Map.new(), fn word, map -> Map.update(map, word, 1, &(&1 + 1)) end) end def count_uniques(word_count) do Enum.count(word_count) end def first_word(list_of_words) do List.first(list_of_words) end def last_word(list_of_words) do List.last(list_of_words) end end ``` ```elixir text = "boil em mash em stick em in a stew. Po Tay Toes." text |> TextProcessing.tokenize() |> TextProcessing.count_words() |> TextProcessing.count_uniques() ``` ```elixir text |> TextProcessing.tokenize() |> TextProcessing.count_words() ``` ```elixir text |> TextProcessing.tokenize() |> TextProcessing.first_word() ``` ```elixir text |> TextProcessing.tokenize() |> TextProcessing.last_word() ``` We tokenized 3 times and counted words twice! What if we made a `Pipeline` module so we can pipeline while we pipeline? ## Pipe all the things ```elixir defmodule Fact do defstruct ~w( value runnable )a end defmodule Step do defstruct ~w( work )a def new(work), do: %__MODULE__{work: work} def run(%__MODULE__{work: work} = step, %Fact{value: input} = input_fact) do %Fact{value: apply(work, [input]), runnable: {step, input_fact}} end def run(%__MODULE__{work: work} = step, input) do %Fact{value: apply(work, [input]), runnable: {step, %Fact{value: input}}} end end defmodule Rule do defstruct ~w( lhs rhs )a def new(lhs, rhs) do %__MODULE__{lhs: lhs, rhs: rhs} end def check(%__MODULE__{} = rule, input) do apply(rule.lhs, [input]) end def run(%__MODULE__{} = rule, input) do if check(rule, input) do apply(rule.rhs, input) end end def to_pipeline(%__MODULE__{} = rule) do Pipeline.new() |> Pipeline.add_step(Condition.new(rule.lhs), Step.new(rule.rhs)) end end defmodule Condition do defstruct ~w( check )a def new(check), do: %__MODULE__{check: check} def run(%Condition{} = condition, %Fact{} = fact) do %Fact{value: run(condition.check, fact.value)} end def run(condition, input) do with true <- apply(condition.check, [input]) do true else _otherise -> false end end end defmodule Pipeline do defstruct ~w( flow facts )a def new() do %__MODULE__{ flow: Graph.new(type: :directed) |> Graph.add_vertex(:root), facts: [] } end def run({%Step{} = step, %Fact{} = fact} = _runnable) do Step.run(step, fact) end def add_step(%__MODULE__{flow: flow} = pipeline, step) do %__MODULE__{pipeline | flow: Graph.add_edge(flow, :root, step)} end def add_step(%__MODULE__{} = pipeline, fun) when is_function(fun) do add_step(pipeline, Step.new(fun)) end def add_step(%__MODULE__{flow: flow} = pipeline, parent_step, child_step) do if Graph.has_vertex?(flow, parent_step) do %__MODULE__{pipeline | flow: Graph.add_edge(flow, parent_step, child_step)} else pipeline |> add_step(parent_step) |> add_step(parent_step, child_step) end end @doc """ Merges the second pipeline into the first. """ def merge(%__MODULE__{} = pipeline_1, %__MODULE__{} = pipeline_2) do new_flow = Enum.reduce(Graph.edges(pipeline_2.flow), pipeline_1.flow, fn edge, flow -> Graph.add_edge(flow, edge) end) %__MODULE__{pipeline_1 | flow: new_flow} end def next_steps(%__MODULE__{flow: flow}, step) do Graph.out_neighbors(flow, step) end def next_runnables( %__MODULE__{} = pipeline, %Fact{runnable: {previous_step, _previous_input}} = fact ) do pipeline |> next_steps(previous_step) |> Enum.map(fn step -> {step, fact} end) end @doc """ Returns a list of runnables (`work, input` pairs). """ def next_runnables(%__MODULE__{} = pipeline, some_input) do pipeline |> next_steps(:root) |> Enum.map(fn step -> {step, %Fact{value: some_input}} end) end end ``` ```elixir tokenize_step = Step.new(&TextProcessing.tokenize/1) count_words_step = Step.new(&TextProcessing.count_words/1) count_uniques_step = Step.new(&TextProcessing.count_uniques/1) first_word_step = Step.new(&TextProcessing.first_word/1) last_word_step = Step.new(&TextProcessing.last_word/1) text_processing_pipeline = Pipeline.new() |> Pipeline.add_step(tokenize_step) |> Pipeline.add_step(tokenize_step, count_words_step) |> Pipeline.add_step(tokenize_step, first_word_step) |> Pipeline.add_step(tokenize_step, last_word_step) |> Pipeline.add_step(count_words_step, count_uniques_step) text_processing_pipeline.flow |> Kino.Mermaid.new() ``` ```elixir next_runnables = Pipeline.next_runnables(text_processing_pipeline, text) ``` ```elixir results_1 = next_runnables |> Enum.map(fn runnable -> Pipeline.run(runnable) end) ``` ```elixir next_runnables = results_1 |> Enum.flat_map(fn %Fact{} = fact -> Pipeline.next_runnables(text_processing_pipeline, fact) end) results_2 = next_runnables |> Enum.map(&Pipeline.run/1) ``` ## Resources ### Videos * For AI, rule based systems, knowledge representation and more: [The late, great Patrick Winston's MIT AI lectures](https://www.youtube.com/watch?v=TjZBTDzGeGg&list=PLUl4u3cNGP63gFHB6xb-kVBiQHYe_4hSi) * For understanding programming paradigms, concurrency, dataflow, backwards chaining, and more: [Peter Van Roy's Youtube Channel](https://www.youtube.com/user/PeterVanRoy/videos) ### Papers * [Cloudburst: Stateful Functions-as-a-Service](https://arxiv.org/abs/2001.04592) * [The Dataflow Model](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf) * [A distributed execution engine supporting data-dependent control flow](https://www.cl.cam.ac.uk/~ey204/pubs/PhD/Derek.pdf) * [Forward Chaining with State Monad](https://link.springer.com/chapter/10.1007/978-3-319-39384-1_38) * [Durable Functions: Semantics for Stateful Serverless](https://www.microsoft.com/en-us/research/uploads/prod/2021/10/DF-Semantics-Final.pdf) ### Cool Projects * https://eigr.io/ * https://cloudstate.io/ * https://vaxine.io/