Created
February 13, 2019 13:55
-
-
Save tonywok/8eef917bc100b3f49f94e3802a8b0895 to your computer and use it in GitHub Desktop.
dag-spike
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "tsort" | |
require 'bundler/inline' | |
gemfile do | |
source 'https://rubygems.org' | |
gem 'pry' | |
gem 'pry-byebug' | |
end | |
class Operation | |
attr_reader :display_name | |
def to_s | |
"<##{self.class} #{display_name} #{done}>" | |
end | |
end | |
class BuildTable < Operation | |
attr_reader :table, :done | |
def initialize(table, done) | |
@table = table | |
@done = done | |
end | |
def display_name | |
table.name | |
end | |
def done? | |
done | |
end | |
def finish! | |
@done = true | |
end | |
end | |
class BuildFunction < Operation | |
attr_reader :function, :display_name, :done | |
def initialize(function, done) | |
@function = function | |
@done = done | |
end | |
def display_name | |
function.name | |
end | |
def done? | |
done | |
end | |
def finish! | |
@done = true | |
end | |
end | |
class Table | |
attr_reader :name | |
def initialize(name) | |
@name = name | |
end | |
def dependencies | |
if name == "A" | |
[] | |
elsif name == "B" | |
[] | |
elsif name == "C" | |
["B"] | |
elsif name == "D" | |
["C"] | |
else | |
[] | |
end | |
end | |
end | |
class Function | |
attr_reader :name | |
def initialize(name) | |
@name = name | |
end | |
end | |
class Dag | |
include TSort | |
def self.build(**kwargs) | |
dag = new(**kwargs) | |
yield dag | |
dag | |
end | |
attr_reader :deps, :concurrency | |
def initialize(concurrency: 1) | |
@deps = {} | |
@deps.default = [] | |
@operations = {} | |
@concurrency = concurrency | |
end | |
def build_table!(table_name, state = false) | |
table = Table.new(table_name) | |
key = [BuildTable, table_name] | |
@deps[key] = table.dependencies.map { |name| [BuildTable, name] } | |
@operations[key] = BuildTable.new(table, state) | |
end | |
def build_function!(function_name, state = false) | |
key = [BuildFunction, function_name] | |
@deps[key] = [] | |
@operations[key] = BuildFunction.new(Function.new(function_name), state) | |
end | |
def operations | |
tsort.map { |key| operation(key) } | |
end | |
def next_group | |
workable.first(concurrency) | |
end | |
def finish!(key) | |
operation(key).finish! | |
end | |
private | |
def workable | |
@workable ||= tsort.lazy.select do |key| | |
op = operation(key) | |
!op.done? && dependent_operations(op).all?(&:done?) | |
end.map { |key| operation(key) } | |
end | |
def dependent_operations(op) | |
@deps[key(op)].map { |key| operation(key) } | |
end | |
def operation(key) | |
@operations[key] | |
end | |
def key(operation) | |
[operation.class, operation.display_name] | |
end | |
def tsort_each_child(key, &b) | |
@deps[key].each(&b) | |
end | |
def tsort_each_node(&b) | |
@deps.each_key(&b) | |
end | |
end | |
dag = Dag.build(concurrency: 1) do |dag| | |
dag.build_function!("berp") | |
dag.build_function!("derp") | |
dag.build_table!("D") | |
dag.build_table!("C") | |
dag.build_table!("A") | |
dag.build_table!("1") | |
dag.build_table!("2") | |
dag.build_table!("B") | |
end | |
puts dag.operations | |
# dag = Dag.build_from_task(task) | |
# tasks = dag.next_group.map do |operation| | |
# build.tasks.pending.find_by!(kind: operation.kind, name: operation.display_name) | |
# end | |
# tasks.update_all(:status => "enqueued") | |
# tasks.each { |task| EtlTaskJob.perform_later(task) } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment