<!-- livebook:{"persist_outputs":true} -->

# Getting Lazy with Dataflow Graphs in Elixir

## Intro

What do Tensorflow, Apache Airflow, Rule Engines, and Excel have in common?

Under the hood they all use DAGs to model data-flow dependencies of the program. Using graphs to model programs is great because you can modify the program at *runtime*. Lets talk about doing this in Elixir for great good.

Graph data structures have many use cases in computing. We'll focus on using Directed Acyclic Graphs (DAGs) for forward chaining dataflow models.

This notebook illustrates how to use dataflow graphs in Elixir tackle tricky problems like expert systems / rule engines, dynamic data pipelines and more.

## What are Graphs?

![graph png](https://miro.medium.com/max/908/1*2s-my83GACoXqzP5uIH9Aw.png)

[A graph](https://en.wikipedia.org/wiki/Graph_(abstract_data_type)) is

> A graph data structure consists of a finite (and possibly mutable) set of vertices (also called nodes or points), together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph. These pairs are known as edges (also called links or lines), and for a directed graph are also known as edges but also sometimes arrows or arcs.

### TLDR;

A graph is circles and lines. Nodes and connections. Vertices and edges.

## What about DAGs then?

DAG stands for [*Directed Acyclic Graph*](https://en.wikipedia.org/wiki/Directed_acyclic_graph)

i.e. Circles with lines that have an arrow between them.

The "acyclic" means no "cycles" i.e. there cannot be a "loop back" where an arrow takes you back up the graph where you've already been.

![](https://upload.wikimedia.org/wikipedia/commons/f/fe/Tred-G.svg)

## DAGs / Dataflow Graphs are everywhere in computing

* [Apache Airflow and similar ETL / Pipeline builder tools are DAGs](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html)
  * [Dagster](https://docs.dagster.io/concepts/ops-jobs-graphs/ops)
  * [Luigi](https://luigi.readthedocs.io/en/stable/workflows.html)
  * [Argo](https://argoproj.github.io/argo-workflows/core-concepts/)
* [Apache BEAM and most related high throughput data processing frameworks are DAGs](https://beam.apache.org/documentation/basics/#pipeline)
  * [Flink](https://nightlies.apache.org/flink/flink-docs-release-1.8/concepts/programming-model.html) is a leading data processing framework that implements the Apache BEAM standard.
  * Apache Spark is also dataflow dags
  * [GCP Dataflow](https://cloud.google.com/dataflow)
  * AWS Data Pipeline
* [Tensorflow is graphs](https://www.tensorflow.org/guide/intro_to_graphs)
* Serverless / Workflow systems are DAGs
  * https://blog.acolyer.org/2020/02/07/cloudburst/
  * AWS Lambda
  * Google Cloud Functions
  * [Azure Functions](https://www.microsoft.com/en-us/research/uploads/prod/2021/10/DF-Semantics-Final.pdf)
* Rule & Workflow engines are DAGs
  * [RETE Algorithm](https://en.wikipedia.org/wiki/Rete_algorithm)
  * [Drools](https://www.drools.org/)
  * [Redhat Decision Manager](https://www.redhat.com/en/technologies/jboss-middleware/decision-manager)
  * [Camunda](https://camunda.com/)
* [CPU scheduling is DAGs!](https://www.eidos.ic.i.u-tokyo.ac.jp/~huynh/files/hpdc16_poster.pdf)

## Bringing the program into the data

The common use case for DAGs in dataflow models is to represent parts of your program as *data* where
it can be transorfmed and composed at runtime.

There are other ways we can represent a program at runtime like with a list of functions.

The advantage of modeling the program as data is it allows for deferred execution. The caller can choose how and when to execute the function(s).

#### Representation of program as data

"Using a datastructure to hold a program/instructions for later use"

```elixir
funs = [
  fn num -> num + 1 end,
  fn num -> num * 2 end,
  fn num -> num - 1 end
]
```

#### Evaluation

"Using that program-as-data to do stuff at runtime"

```elixir
Enum.map(funs, & &1.(2))
```

```elixir
funs
|> Enum.map(& &1.(2))
|> Enum.sum()
```

## Deps/Setup

```elixir
Mix.install([
  {:kino, "~> 0.5.2"},
  {:libgraph, github: "bitwalker/libgraph", branch: "main"},
  {:req, "~> 0.2.0"},
  {:jason, "~> 1.1.0"},
  {:floki, "~> 0.32.0"}
])
```

### Visualizations: Graph -> Mermaid

```elixir
defmodule Kino.Mermaid do
  use Kino.JS

  def new(%Graph{} = graph) do
    graph
    |> to_mermaid()
    |> new()
  end

  def new(graph) do
    Kino.JS.new(__MODULE__, graph)
  end

  def to_mermaid(%Graph{} = graph) do
    Enum.reduce(
      Graph.edges(graph),
      "graph TD;",
      fn
        %{v1: v1, v2: v2}, acc ->
          acc <>
            "#{vertex_id(v1)}([#{vertex_name(v1)}])-->#{vertex_id(v2)}([#{vertex_name(v2)}]);"
      end
    )
  end

  defp vertex_name(:root), do: :root
  defp vertex_name(%{work: fun}), do: "step_#{fun_name(fun)}"
  defp vertex_name(%{check: fun}), do: "cond_#{fun_name(fun)}"
  defp vertex_name(fun) when is_function(fun), do: "fun_#{fun_name(fun)}-#{:erlang.phash2(fun)}"
  defp vertex_name(vertex) when is_atom(vertex), do: to_string(vertex)

  defp vertex_name(otherwise) do
    try do
      to_string(otherwise)
    catch
      _any -> :erlang.phash2(otherwise)
    end
  end

  defp vertex_id(thing) when is_atom(thing), do: to_string(thing)
  defp vertex_id(thing), do: :erlang.phash2(thing)

  defp fun_name(fun), do: Function.info(fun, :name) |> elem(1)

  asset "main.js" do
    """
    import "https://cdn.jsdelivr.net/npm/mermaid@8.13.3/dist/mermaid.min.js";

    mermaid.initialize({ startOnLoad: false });

    export function init(ctx, graph) {
      mermaid.render("graph1", graph, (svgSource, bindListeners) => {
        ctx.root.innerHTML = svgSource;
        bindListeners && bindListeners(ctx.root);
      });
    }
    """
  end
end
```

## Making a DAG

```elixir
g =
  Graph.new(type: :directed)
  |> Graph.add_edges([
    {:a, :b},
    {:a, :c},
    {:b, :d}
  ])

g
|> Kino.Mermaid.new()
```

## Making a dataflow graph of lambdas

If functions are just data and graphs are data structures - what if we put functions in our graphs?

```elixir
fun_1 = fn num -> num * 2 end
fun_2 = fn num -> num + 2 end
fun_3 = fn num -> num * 42 end
fun_4 = fn num -> num + 42 end

fun_dag =
  Graph.new(type: :directed)
  |> Graph.add_edges([
    {:root, fun_1},
    {fun_1, fun_2},
    {fun_1, fun_3},
    {fun_2, fun_4}
  ])

fun_dag
|> Kino.Mermaid.new()
```

## Evaluation of a DAG

Now that we've got lambda functions in our dag - how to we evaluate it?

1. start at the top and run that function with the input to get a new output
2. find the next step(s) by traveling down the arrows (i.e. the `out_neighbors`)
3. feed the output from the previous step into the next steps
4. Profit

TLDR; "Travel down the arrows"

```elixir
fun_dag_input = Kino.Input.number("Num")
```

```elixir
input = Kino.Input.read(fun_dag_input)
```

### Finding steps off of the root node

```elixir
neighbors_of_root = Graph.out_neighbors(fun_dag, :root)
```

### Building lazy "runnables" by pairing the function and its input to run with

```elixir
first_runnables = Enum.map(neighbors_of_root, fn fun -> {fun, input} end)
```

### Executing our runnables

```elixir
first_result =
  first_runnables
  |> Enum.map(fn {fun, input} -> fun.(input) end)
  |> List.first()
```

### Executing the next layer

```elixir
fun_from_first_runnable = first_runnables |> List.first() |> elem(0)

neighbors_of_first_runnable =
  fun_dag
  |> Graph.out_neighbors(fun_from_first_runnable)
  |> Enum.map(fn fun -> {fun, first_result} end)
  |> Enum.map(fn {fun, input} ->
    fun.(input)
  end)
```

### Executing the next layer in parallel

![](https://i.redd.it/s0lqz1g2w7k81.jpg)

```elixir
neighbors_of_first_runnable =
  fun_dag
  |> Graph.out_neighbors(fun_from_first_runnable)
  |> Task.async_stream(fn fun
    fun.(first_result)
  end)
  |> Enum.map(fn {:ok, result} -> result end)
```

### Changing our DAG

```elixir
fun_dag
|> Kino.Mermaid.new()
```

```elixir
new_fun_dag = Graph.add_edge(fun_dag, fun_from_first_runnable, fn _num -> 42 end)

new_fun_dag
|> Kino.Mermaid.new()
```

## What Else?

What other features would a DAG / Pipeline buider tool like this want?

### Joins

Steps with more than 1 parent dependency.

### Rules

Steps with constraints controlling when to execute.

### State

Accumulating data to act on or control behavior.

## Joins

```elixir
join_example =
  Graph.new()
  |> Graph.add_edges([
    {:root, :step_a},
    {:root, :step_b},
    {:step_a, :step_c},
    {:step_b, :step_c}
  ])

join_example
|> Kino.Mermaid.new()
```

Here we can only execute `step_c` when both `step_a` and `step_b` have resolved.

This means we have to keep intermediate results somewhere in memory so we can grab the result of either a or b
and use both to invoke step c.

## Rules

A rule is a step with a constraint on when to execute it.

A function is kind of like a rule where `fn :potato -> 42 end` will only return 42 
when given `:potato`.

The usual verbiage is the `lhs` and `rhs` or `condition` and `reaction`.

<!-- livebook:{"break_markdown":true} -->

> Rules engines are the OG AI solving business problems since the 60's.

<!-- livebook:{"break_markdown":true} -->

Rules are common abstractions for the representation of expert knowledge.

They're handy because they're easily composed, evaluated, stored, and organized.

### Resources to check out for rule based systems

* [RETE](https://en.wikipedia.org/wiki/Rete_algorithm)
* [LEAPS](https://dl.acm.org/doi/book/10.5555/899216)
* [Patrick Winston's MIT AI lectures](https://www.youtube.com/watch?v=TjZBTDzGeGg)
* [RETEX - Elixir RETE implementation](https://github.com/lorenzosinisi/retex)

## Compiling and Evaluating Rules

Say we want to make a graph with our example rule from before:

<!-- livebook:{"force_markdown":true} -->

```elixir
fn :potato -> 42 end
```

We know we need to evaluate the `lhs` before the `rhs` and only evaluate the `rhs` if the `lhs` returns `true`.

We can do this by breaking out our one rule into the two pieces:

<!-- livebook:{"force_markdown":true} -->

```elixir
lhs = fn :potato -> true end
rhs = fn _any -> 42 end
```

Now we need to get these into our DAG and since the `lhs` has to run first - we'll make sure the arrow goes `lhs -> rhs`.

```elixir
potato_rule =
  Graph.new()
  |> Graph.add_edges([
    {:is_potato?, 42}
  ])

potato_rule
|> Kino.Mermaid.new()
```

## Composition of Rules

```elixir
composed_rule =
  Graph.new()
  |> Graph.add_edges([
    {:root, :is_potato?},
    {:is_potato?, :boil_em_mash_em},
    {:root, :is_ham?},
    {:is_ham?, :it_is_delicious},
    {:root, :is_beet?},
    {:is_beet?, :good_in_salad}
  ])

composed_rule
|> Kino.Mermaid.new()
```

Now we could give our our Graph of Rules some inputs and get results like

```
> :ham
:it_is_delicious
> :potato
boil_em_mash_em
> :beet
:good_in_salad
```

## Conditional Expansion

For Elixir and Erlang function calls - we're scoped to that module, function, and arity
when evaluating. This has advantages because it means we can evaluate patterns top to bottom
executing the RHS of whichever pattern matches first.

In these cases - outside of optimizing your pattern matches you don't really need or want conditional expansion.

However in the case of the evaluation of *many* rules where the conditions might
overlap - its often useful to expand the expressions within the LHS into smaller 
conditions bound together in the graph.

<!-- livebook:{"break_markdown":true} -->

Here's an example of the initial approach of expanding just the LHS and RHS.

<!-- livebook:{"force_markdown":true} -->

```elixir
lhs = fn input when (input == :potato and input != :ham) or input == "potato" -> true end
rhs = fn _any -> f2 end
```

But if we also bring in another rule such as

<!-- livebook:{"force_markdown":true} -->

```elixir
other_rule = fn :potato -> 42 end
```

We might be matching against `:potato` twice for the same input and do unecessary work.

So if we're composing many rules with like conditions for the same execution context
we can do this by expanding each individual expression as its own condition.

<!-- livebook:{"break_markdown":true} -->

<!-- livebook:{"force_markdown":true} -->

```elixir
fn input
   when (input == :potato and input != :ham) or
          input == "potato" ->
  42
end

# full expansion
lhs_1 = fn input when input == :potato -> true end
lhs_1 = fn input when input != :ham -> true end
lhs_or = fn input == "potato" -> true end
```

```elixir
expanded_rule =
  Graph.new()
  |> Graph.add_edges([
    {:root, :lhs_1},
    {:root, :lhs_2},
    {:root, :other_path},
    {:lhs_1, :conjunction_2_and_1},
    {:lhs_2, :conjunction_2_and_1},
    {:conjunction_2_and_1, :rhs},
    {:other_path, :rhs}
  ])

expanded_rule
|> Kino.Mermaid.new()
```

## State Machines

Joining ongoing expressions is stateful - sort of.

To ensure our step dependencies are resolved we have to hold its dependents in memory
and wait until the final step can be executed.

A DAG of Lambda functions are easy to evaluate because they're **stateless** meaning
our scheduler can blindly parallel map over steps keeping only the 1 step in context.

<!-- livebook:{"break_markdown":true} -->

<!-- livebook:{"force_markdown":true} -->

```elixir
potato_lock =
  Dagger.state_machine(
    init: %{code: "potato", state: :locked, contents: "ham"},
    reducer: fn
      :lock, state ->
        %{state | state: :locked}

      {:unlock, input_code}, %{code: code, state: :locked} = state when input_code == code ->
        %{state | state: :unlocked}

      _input_code, %{state: :unlocked} ->
        state
    end,
    reactors: [
      fn %{state: :unlocked, contents: contents} -> contents end,
      fn %{state: :locked} -> {:error, :locked} end
    ]
  )
```

```elixir
lock_state_machine =
  Graph.new()
  |> Graph.add_edges([
    {:root, :arity_check_1},
    {:arity_check_1, :is_lock_command?},
    {:arity_check_1, :is_unlock_command?},
    {:arity_check_1, :is_code_correct?},
    {:is_lock_command?, :lock_conjunction},
    {:is_unlock_command?, :lock_conjunction},
    {:is_code_correct?, :lock_conjunction},
    {:lock_conjunction, :lock_reducer},
    {:root, :reactor_lhs_clause_1_matched?},
    {:root, :reactor_lhs_clause_2_matched?},
    {:reactor_lhs_clause_1_matched?, :reactor_rhs_1},
    {:reactor_lhs_clause_2_matched?, :reactor_rhs_2}
  ])

Kino.Mermaid.new(lock_state_machine)
```

## DIY Text Processing

```elixir
defmodule TextProcessing do
  def tokenize(text) do
    text
    |> String.downcase()
    |> String.split(~R/[^[:alnum:]\-]/u, trim: true)
  end

  def count_words(list_of_words) do
    list_of_words
    |> Enum.reduce(Map.new(), fn word, map ->
      Map.update(map, word, 1, &(&1 + 1))
    end)
  end

  def count_uniques(word_count) do
    Enum.count(word_count)
  end

  def first_word(list_of_words) do
    List.first(list_of_words)
  end

  def last_word(list_of_words) do
    List.last(list_of_words)
  end
end
```

```elixir
text = "boil em mash em stick em in a stew. Po Tay Toes."

text
|> TextProcessing.tokenize()
|> TextProcessing.count_words()
|> TextProcessing.count_uniques()
```

```elixir
text
|> TextProcessing.tokenize()
|> TextProcessing.count_words()
```

```elixir
text
|> TextProcessing.tokenize()
|> TextProcessing.first_word()
```

```elixir
text
|> TextProcessing.tokenize()
|> TextProcessing.last_word()
```

We tokenized 3 times and counted words twice!

What if we made a `Pipeline` module so we can pipeline while we pipeline?

## Pipe all the things

```elixir
defmodule Fact do
  defstruct ~w(
    value
    runnable
  )a
end

defmodule Step do
  defstruct ~w(
    work
  )a

  def new(work), do: %__MODULE__{work: work}

  def run(%__MODULE__{work: work} = step, %Fact{value: input} = input_fact) do
    %Fact{value: apply(work, [input]), runnable: {step, input_fact}}
  end

  def run(%__MODULE__{work: work} = step, input) do
    %Fact{value: apply(work, [input]), runnable: {step, %Fact{value: input}}}
  end
end

defmodule Rule do
  defstruct ~w(
    lhs
    rhs
  )a

  def new(lhs, rhs) do
    %__MODULE__{lhs: lhs, rhs: rhs}
  end

  def check(%__MODULE__{} = rule, input) do
    apply(rule.lhs, [input])
  end

  def run(%__MODULE__{} = rule, input) do
    if check(rule, input) do
      apply(rule.rhs, input)
    end
  end

  def to_pipeline(%__MODULE__{} = rule) do
    Pipeline.new()
    |> Pipeline.add_step(Condition.new(rule.lhs), Step.new(rule.rhs))
  end
end

defmodule Condition do
  defstruct ~w(
    check
  )a

  def new(check), do: %__MODULE__{check: check}

  def run(%Condition{} = condition, %Fact{} = fact) do
    %Fact{value: run(condition.check, fact.value)}
  end

  def run(condition, input) do
    with true <- apply(condition.check, [input]) do
      true
    else
      _otherise ->
        false
    end
  end
end

defmodule Pipeline do
  defstruct ~w(
    flow
    facts
  )a

  def new() do
    %__MODULE__{
      flow: Graph.new(type: :directed) |> Graph.add_vertex(:root),
      facts: []
    }
  end

  def run({%Step{} = step, %Fact{} = fact} = _runnable) do
    Step.run(step, fact)
  end

  def add_step(%__MODULE__{flow: flow} = pipeline, step) do
    %__MODULE__{pipeline | flow: Graph.add_edge(flow, :root, step)}
  end

  def add_step(%__MODULE__{} = pipeline, fun) when is_function(fun) do
    add_step(pipeline, Step.new(fun))
  end

  def add_step(%__MODULE__{flow: flow} = pipeline, parent_step, child_step) do
    if Graph.has_vertex?(flow, parent_step) do
      %__MODULE__{pipeline | flow: Graph.add_edge(flow, parent_step, child_step)}
    else
      pipeline
      |> add_step(parent_step)
      |> add_step(parent_step, child_step)
    end
  end

  @doc """
  Merges the second pipeline into the first.
  """
  def merge(%__MODULE__{} = pipeline_1, %__MODULE__{} = pipeline_2) do
    new_flow =
      Enum.reduce(Graph.edges(pipeline_2.flow), pipeline_1.flow, fn edge, flow ->
        Graph.add_edge(flow, edge)
      end)

    %__MODULE__{pipeline_1 | flow: new_flow}
  end

  def next_steps(%__MODULE__{flow: flow}, step) do
    Graph.out_neighbors(flow, step)
  end

  def next_runnables(
        %__MODULE__{} = pipeline,
        %Fact{runnable: {previous_step, _previous_input}} = fact
      ) do
    pipeline
    |> next_steps(previous_step)
    |> Enum.map(fn step ->
      {step, fact}
    end)
  end

  @doc """
  Returns a list of runnables (`work, input` pairs).
  """
  def next_runnables(%__MODULE__{} = pipeline, some_input) do
    pipeline
    |> next_steps(:root)
    |> Enum.map(fn step ->
      {step, %Fact{value: some_input}}
    end)
  end
end
```

```elixir
tokenize_step = Step.new(&TextProcessing.tokenize/1)
count_words_step = Step.new(&TextProcessing.count_words/1)
count_uniques_step = Step.new(&TextProcessing.count_uniques/1)
first_word_step = Step.new(&TextProcessing.first_word/1)
last_word_step = Step.new(&TextProcessing.last_word/1)

text_processing_pipeline =
  Pipeline.new()
  |> Pipeline.add_step(tokenize_step)
  |> Pipeline.add_step(tokenize_step, count_words_step)
  |> Pipeline.add_step(tokenize_step, first_word_step)
  |> Pipeline.add_step(tokenize_step, last_word_step)
  |> Pipeline.add_step(count_words_step, count_uniques_step)

text_processing_pipeline.flow
|> Kino.Mermaid.new()
```

```elixir
next_runnables = Pipeline.next_runnables(text_processing_pipeline, text)
```

```elixir
results_1 =
  next_runnables
  |> Enum.map(fn runnable ->
    Pipeline.run(runnable)
  end)
```

```elixir
next_runnables =
  results_1
  |> Enum.flat_map(fn %Fact{} = fact ->
    Pipeline.next_runnables(text_processing_pipeline, fact)
  end)

results_2 =
  next_runnables
  |> Enum.map(&Pipeline.run/1)
```

## Resources

### Videos

* For AI, rule based systems, knowledge representation and more: [The late, great Patrick Winston's MIT AI lectures](https://www.youtube.com/watch?v=TjZBTDzGeGg&list=PLUl4u3cNGP63gFHB6xb-kVBiQHYe_4hSi)
* For understanding programming paradigms, concurrency, dataflow, backwards chaining, and more: [Peter Van Roy's Youtube Channel](https://www.youtube.com/user/PeterVanRoy/videos)

### Papers

* [Cloudburst: Stateful Functions-as-a-Service](https://arxiv.org/abs/2001.04592)
* [The Dataflow Model](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf)
* [A distributed execution engine supporting
  data-dependent control flow](https://www.cl.cam.ac.uk/~ey204/pubs/PhD/Derek.pdf)
* [Forward Chaining with State Monad](https://link.springer.com/chapter/10.1007/978-3-319-39384-1_38)
* [Durable Functions: Semantics for Stateful Serverless](https://www.microsoft.com/en-us/research/uploads/prod/2021/10/DF-Semantics-Final.pdf)

### Cool Projects

* https://eigr.io/
* https://cloudstate.io/
* https://vaxine.io/