Created
January 21, 2024 05:23
-
-
Save skrawcz/2f30866f9506d910b2d8a05a34bff6b9 to your computer and use it in GitHub Desktop.
Example using parallelism with Hamilton
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# functions.py - declare and link your transformations as functions.... | |
import pandas as pd | |
from hamilton.htypes import Parallelizable, Collect | |
def motor(motor_list: list[int]) -> Parallelizable[int]: | |
for _motor in motor_list: | |
yield _motor | |
def _is_motor_on(motor: int ) -> bool: | |
return motor % 2 == 0 | |
def motor_status(motor: int) -> dict: | |
# logic to check | |
return { | |
"motor_id": motor, | |
"is_on": _is_motor_on(motor) | |
} | |
def aggregate_statuses(motor_status: Collect[dict]) -> list[dict]: | |
return list(motor_status) | |
# def on_motor(motor_status: Collect[dict]) -> Parallelizable[int]: | |
def on_motor(aggregate_statuses: list[dict]) -> Parallelizable[int]: | |
for motor_dict in aggregate_statuses: | |
# for motor_dict in motor_status: | |
if motor_dict["is_on"]: | |
yield motor_dict["motor_id"] | |
def status_check_1(on_motor: int) -> float: | |
# some status check. | |
return 2.3 * on_motor | |
def status_check_2(on_motor: int, status_check_1: float) -> str: | |
return f"some result based on {on_motor} and {status_check_1}" | |
def status_result(on_motor: int, status_check_1: float, status_check_2: str) -> dict: | |
return locals() | |
def on_motor_statuses(status_result: Collect[dict]) -> pd.DataFrame: | |
return pd.DataFrame(status_result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# And run them! | |
import functions | |
from hamilton import base | |
from hamilton import driver | |
from hamilton.execution import executors | |
dr = ( | |
driver.Builder() | |
.enable_dynamic_execution(allow_experimental_mode=True) | |
.with_modules(functions) | |
# .with_remote_executor(executors.SynchronousLocalTaskExecutor()) | |
.with_adapters(base.PandasDataFrameResult()) | |
.build() | |
) | |
result = dr.execute( | |
['on_motor_statuses'], | |
inputs={'motor_list': [1, 2, 3, 4, 5]} | |
) | |
print(result) | |
dr.display_all_functions( | |
"graph.dot", {"format": "png"}, orient="TB", show_legend=False) |
And only even motors (given the code) get through:
on_motor status_check_1 status_check_2
0 2 4.6 some result based on 2 and 4.6
1 4 9.2 some result based on 4 and 9.2
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is what the graph would look like.