Last active
March 23, 2023 13:40
-
-
Save InBrewJ/4c3441fc216a0c55d67d33a155691a19 to your computer and use it in GitHub Desktop.
An evening's noodling into a generic pipeline runner in JS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ETLite.js is a cool name, eh? | |
// A slight bastardisation of the 'Chain of Responsibility' pattern | |
// Inspired by: | |
// https://github.com/InBrewJ/ts-for-dsa-minimal/blob/master/typescript-tasks/003-fizzbuzz-chain-responsibility/003-fizzbuzz-chain-responsibility.ts | |
// In a sense, is this is a 'Tree of Responsibility'? | |
// It's kind of like a DAG. It IS a DAG? | |
// We essentially want a tree of stages | |
// At the end of the pipeline, dump a report | |
// top level stage keeps track of how many stages there are? Maybe? Is this important? | |
// Or is the top level stage the Integrator? | |
const pprint = (obj) => { | |
return JSON.stringify(obj); | |
}; | |
const throwsError = (input) => { | |
console.log(`throwsError :: input => ${pprint(input)}`); | |
throw new Error("You throw me right round"); | |
}; | |
const createLocationsLookupTable = (locationsInput) => { | |
console.log( | |
`createLocationsLookupTable :: sharedInput => ${pprint( | |
locationsInput | |
)} (something v synchronous)` | |
); | |
const juicyData = [ | |
{ | |
dataOne: 1, | |
filterMe: "bye", | |
}, | |
{ | |
dataOne: 1, | |
}, | |
].map((item) => { | |
return { | |
...item, | |
...locationsInput, | |
}; | |
}); | |
const lookupTable = new Map([ | |
[1, "sunny"], | |
[2, "raining"], | |
]); | |
return { | |
juicyData, | |
lookupTable, | |
}; | |
}; | |
const asyncNetworkCall = async () => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
console.log("resolving inFunctionAsync..."); | |
resolve([ | |
{ | |
dataTwo: 2, | |
}, | |
{ | |
dataTwo: 2, | |
meFilter: "hey", | |
}, | |
]); | |
}, 1000); | |
}); | |
}; | |
const getLocations = (connection) => (input) => { | |
console.log(`getLocations :: input ${pprint(input)}`); | |
return { | |
shared: "sharedKey", | |
}; | |
}; | |
// do this concurrently with getDataOne (or at least try) | |
const getChargePoints = | |
(connection) => | |
async ({ juicyData, lookupTable }) => { | |
console.log(`getChargePoints :: juicyData => ${pprint(juicyData)}`); | |
console.log(`getChargePoints :: lookupTable => ${Array.from(lookupTable)}`); | |
try { | |
const inFunctionAsync = await asyncNetworkCall(); | |
const blendedForNoReason = [...inFunctionAsync, ...juicyData]; | |
const mutatedForNoReason = blendedForNoReason.map((item, index) => { | |
return { | |
...item, | |
dataOne: lookupTable.get(item.dataOne) || item.dataOne, | |
dataTwo: lookupTable.get(item.dataTwo) || item.dataTwo, | |
}; | |
}); | |
return mutatedForNoReason; | |
} catch (error) { | |
console.log("NOPE in getChargePoints"); | |
console.log(error); | |
} | |
}; | |
const pushUpdates = (connection) => async (input) => { | |
console.log(`pushUpdates :: pushing ${pprint(input)} to XDX`); | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
console.log("resolving pushUpdates..."); | |
resolve([ | |
{ | |
pushResponse: "pushed, well done!", | |
}, | |
]); | |
}, 2000); | |
}); | |
}; | |
class AbstractStage { | |
#nextHandler = undefined; | |
#handlerFn = (e) => { | |
// This is perhaps a bad idea... | |
console.log(`WARNING -> handler for ${this.#name} is passthrough default!`); | |
return e; | |
}; | |
#name = ""; | |
#reporter = undefined; | |
constructor(name, reporter) { | |
this.#name = name; | |
this.#reporter = reporter; | |
} | |
setHandler(handlerFn) { | |
this.#handlerFn = handlerFn; | |
} | |
setNext(handler) { | |
this.#nextHandler = handler; | |
return handler; | |
} | |
async execute(thing) { | |
if (this.#handlerFn === undefined) { | |
throw Error("You need to set handlerFn!"); | |
} | |
// call handlerFn, pass result onto next stage | |
// it might be a promise, it might not be | |
// we don't care, always Promise.resolve it | |
// | |
const result = this.#handlerFn(thing); | |
return Promise.resolve(result) | |
.then((r) => { | |
this.#reporter( | |
`${this.#name} stage has finished with response => ${pprint(r)}` | |
); | |
if (this.#nextHandler !== undefined) { | |
this.#nextHandler.execute(r); | |
} | |
}) | |
.catch((e) => { | |
this.#reporter(`${this.#name} errored out! :: ${e.message}`); | |
}); | |
} | |
} | |
// Effectively nominal types | |
class Stage extends AbstractStage {} | |
class FetchStage extends AbstractStage {} | |
class PushStage extends AbstractStage {} | |
// #transformerFn is always synchronous? | |
class Transformer extends AbstractStage { | |
#transformerFn = undefined; | |
setTransformer(transformer) { | |
this.#transformerFn = transformer; | |
} | |
async execute(thing) { | |
const transformedResult = this.#transformerFn(thing); | |
await super.execute(transformedResult); | |
} | |
} | |
const byMeFilter = (x) => x.meFilter === undefined; | |
const byFilterMe = (x) => x.filterMe === undefined; | |
class Pipeline { | |
// made of many AsyncStage and SyncStage? | |
// reports on the progress of the stage at the end | |
#pipeline = undefined; | |
#report = []; | |
#reporter = (msg) => { | |
this.#report.push(msg); | |
}; | |
build() { | |
const getLocationsStage = new FetchStage("getLocations", this.#reporter); | |
getLocationsStage.setHandler( | |
// getChargePoints = handler with HTTP (or something else?) connection | |
// note than at HTTP connection may have pagination (case by case) | |
getLocations("locations_connection_object") | |
); | |
const createLocationsLookupTableStage = new Stage( | |
"createLocationsLookupTable", | |
this.#reporter | |
); | |
createLocationsLookupTableStage.setHandler(createLocationsLookupTable); | |
const getChargepointsStage = new FetchStage( | |
"getChargepointsStage", | |
this.#reporter | |
); | |
getChargepointsStage.setHandler( | |
// getChargePoints = handler with HTTP (or something else?) connection | |
// note than at HTTP connection may have pagination (case by case) | |
getChargePoints("chargepoints_connection_object") | |
); | |
const filterWithinIntervalStage = new Transformer( | |
"filterWithinIntervalStage", | |
this.#reporter | |
); | |
// bit weird that you have to set the handler on a Transformer, eh? | |
// filterWithinIntervalStage.setHandler((e) => e); | |
filterWithinIntervalStage.setTransformer((e) => { | |
console.log("Running filterWithinIntervalTransformer"); | |
return e.filter(byFilterMe).filter(byMeFilter); | |
}); | |
const pushUpdatesStage = new PushStage("pushUpdatesStage", this.#reporter); | |
pushUpdatesStage.setHandler(pushUpdates("xdx_connection_object")); | |
const stages = [ | |
getLocationsStage, | |
createLocationsLookupTableStage, | |
getChargepointsStage, | |
// filterWithinIntervalStage, | |
pushUpdatesStage, | |
]; | |
const [firstStage, ...rest] = stages; | |
const final = new Stage("Final stage, report results", this.#reporter); | |
final.setHandler(() => { | |
const topReportString = | |
"*".repeat(30) + " PIPELINE REPORT " + "*".repeat(30); | |
console.log(topReportString); | |
console.log(this.#report.join("\n")); | |
console.log("*".repeat(topReportString.length)); | |
}); | |
rest | |
.reduce( | |
(pipelineInProgress, stage) => pipelineInProgress.setNext(stage), | |
firstStage | |
) | |
.setNext(final); | |
this.#pipeline = firstStage; | |
// alternative pipeline construction, simpler to read but | |
// less parametric... | |
// getLocationsStage | |
// .setNext(createLocationsLookupTableStage) | |
// .setNext(getChargepointsStage) | |
// .setNext(filterWithinIntervalStage) | |
// .setNext(pushUpdatesStage) | |
// .setNext(final); | |
// How do we know when the pipeline has terminated? | |
// this.#pipeline = getLocationsStage; | |
} | |
// run | |
async run() { | |
await this.#pipeline.execute(); | |
} | |
} | |
/// main | |
(async () => { | |
const myFirstIntegrator = new Pipeline(); | |
myFirstIntegrator.build(); | |
myFirstIntegrator.run(); | |
})(); | |
// possible syntax for DAG creation: | |
// presumably a DAG needs some sort of cycle detection to validate DAGness | |
// graph algo help? | |
// - traverse graph (DFS?) | |
// - add vertices to <array> | |
// - if any duplicates appear in <array> (binary search, nlogn), graph has cycles - not a DAG? | |
// - does a vertex / node some hash to record 'uniqueness'? | |
// each node can have N children (at same level in tree) | |
class PipelineStage { | |
// parent must be set first, bc it runs first | |
#parent = undefined; | |
// queue? | |
// or is this a .setNext list Pipeline thing? | |
#children = []; | |
constructor() {} | |
async run() { | |
// return result from #parent | |
// if children.length pass result into first stage of each child pipeline | |
// if no children, return the result | |
} | |
addStage(pipelineStage) { | |
if (this.#parent === undefined) { | |
this.#parent = pipelineStage; | |
return; | |
} | |
this.#children.push(pipelineStage); | |
} | |
} | |
const dag0 = [ | |
{ | |
parent: sharedFirstStage, | |
// outputs of 'parent' are available to first stage of all 'children' pipelines | |
children: [ | |
[stage1x, stage2x, stage3x], // pipeline stage with only children? | |
[stage1y, stage2y, stage3y], // pipeline stage with only children? | |
], | |
}, | |
]; | |
// const dag00 = [ | |
// { | |
// rootStage: sharedFirstStage, | |
// // outputs of 'parent' are available to first stage of all 'children' pipelines | |
// subPipelines: [ | |
// [stage1x, stage2x, stage3x], | |
// [stage1y, stage2y, stage3y], | |
// ], | |
// }, | |
// ]; | |
// const dag = { | |
// sharedFirstStage: { | |
// pipeline1: [stage1, stage2, stage3], | |
// pipeline2: [stage1, stage2, stage3], | |
// pipeline3: [stage1, stage2, stage3], | |
// }, | |
// }; | |
// const dag2 = { | |
// sharedFirstStage: { | |
// pipeline1: [ | |
// { | |
// sharedStage1: { | |
// pipelineSub1: [stageSub1, stageSub2, stageSub3], | |
// }, | |
// }, | |
// stage2, | |
// stage3, | |
// ], | |
// }, | |
// }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment