Created
May 5, 2021 16:20
-
-
Save lauralorenz/ecedeea5e9c7f58edbb3bba7c8d00ce0 to your computer and use it in GitHub Desktop.
Prefect on Data Science flows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tempfile | |
import prefect | |
from prefect.storage import S3 | |
from prefect import task, Flow, Parameter | |
from prefect.engine.serializers import Serializer | |
from prefect.engine.results import S3Result | |
from yellowbrick.datasets import load_mushroom | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from yellowbrick.classifier import ClassificationReport | |
from sklearn.metrics import f1_score | |
from sklearn.pipeline import Pipeline | |
from sklearn.svm import LinearSVC, NuSVC, SVC | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier | |
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier | |
MARKDOWN = """# {estimator_name} | |
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}"> | |
## Model Location: {model_location} | |
""" | |
class NoOpSerializer(Serializer): | |
"""A `Serializer` that does nothing.""" | |
def serialize(self, value): | |
return value | |
def deserialize(self, value): | |
return value | |
@task(result=S3Result(bucket="visualizer-demo-models", location="{flow_run_name}/{task_run_id}")) | |
def visualize_model(estimator, **kwargs): | |
""" | |
Test various estimators. | |
""" | |
X, y = load_mushroom() | |
y = LabelEncoder().fit_transform(y) | |
model = Pipeline([ | |
('one_hot_encoder', OneHotEncoder()), | |
('estimator', estimator) | |
]) | |
# Instantiate the classification model and visualizer | |
visualizer = ClassificationReport( | |
model, classes=['edible', 'poisonous'], | |
cmap="YlGn", size=(600, 360), **kwargs | |
) | |
visualizer.fit(X, y) | |
visualizer.score(X, y) | |
tmp = tempfile.NamedTemporaryFile(suffix=".png") | |
visualizer.show(outpath=tmp, clear_figure=True) | |
tmp.seek(0) | |
visualizer_result = S3Result(bucket="visualizer-demo-images", location="{flow_run_name}/"+estimator.__class__.__name__+".png", serializer=NoOpSerializer()) | |
result = visualizer_result.write(tmp.read(), **prefect.context) | |
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__, | |
image=result.location, model_location="{flow_run_name}/{task_run_id}".format(**prefect.context))) | |
return model | |
@task | |
def map_model_parameters(**kwargs): | |
model_choices = [ | |
SVC(gamma=kwargs['svc_gamma']), NuSVC(gamma=kwargs['nusvc_gamma']), LinearSVC(), | |
SGDClassifier(max_iter=kwargs['sgdclassifier_maxiter'], tol=1e-3), KNeighborsClassifier(), | |
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3), | |
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300), | |
RandomForestClassifier(n_estimators=300) | |
] | |
return model_choices | |
flow = Flow("compute-estimator-flow", storage=S3(bucket="visualizer-demo-flows", stored_as_script=True, local_script_path="./flow.py")) | |
with flow: | |
# Parameters | |
svc_gamma = Parameter('svc_gamma', default='auto') | |
nusvc_gamma = Parameter('nusvc_gamma', default='auto') | |
sgdclassifier_maxiter = Parameter('sgdclassifier max_iter', default=100) | |
model_choices = map_model_parameters(svc_gamma=svc_gamma, nusvc_gamma=nusvc_gamma, sgdclassifier_maxiter=sgdclassifier_maxiter) | |
visualizers = visualize_model.map(model_choices) | |
from prefect.run_configs import ECSRun | |
flow.run_config = ECSRun( | |
run_task_kwargs={ | |
"cluster":"prefect-demo-cluster", | |
"networkConfiguration": {'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}}, | |
}, | |
task_role_arn="arn:aws:iam::{{your project here}}:role/prefect-demo-fargate-task-role", | |
image='public.ecr.aws/i0y3l0j3/visualizer-demo-images:latest') | |
if __name__ == "__main__": | |
flow.register("data-science-flows") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tempfile | |
import prefect | |
from prefect import task, Flow, Parameter | |
from prefect.engine.serializers import Serializer | |
from prefect.engine.results import S3Result | |
from yellowbrick.datasets import load_mushroom | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from yellowbrick.classifier import ClassificationReport | |
from sklearn.metrics import f1_score | |
from sklearn.pipeline import Pipeline | |
from sklearn.svm import LinearSVC, NuSVC, SVC | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier | |
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier | |
MARKDOWN = """# {estimator_name} | |
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}"> | |
""" | |
class NoOpSerializer(Serializer): | |
"""A `Serializer` that does nothing.""" | |
def serialize(self, value): | |
return value | |
def deserialize(self, value): | |
return value | |
@task | |
def visualize_model(estimator, **kwargs): | |
""" | |
Test various estimators. | |
""" | |
X, y = load_mushroom() | |
y = LabelEncoder().fit_transform(y) | |
model = Pipeline([ | |
('one_hot_encoder', OneHotEncoder()), | |
('estimator', estimator) | |
]) | |
# Instantiate the classification model and visualizer | |
visualizer = ClassificationReport( | |
model, classes=['edible', 'poisonous'], | |
cmap="YlGn", size=(600, 360), **kwargs | |
) | |
visualizer.fit(X, y) | |
visualizer.score(X, y) | |
# Save the visualization using the Result API | |
tmp = tempfile.NamedTemporaryFile(suffix=".png") | |
visualizer.show(outpath=tmp, clear_figure=True) | |
tmp.seek(0) | |
visualizer_result = S3Result(bucket="visualizer-demo-images", | |
location="{flow_run_name}/"+estimator.__class__.__name__+".png", | |
serializer=NoOpSerializer()) | |
result = visualizer_result.write(tmp.read(), **prefect.context) | |
# Save the report for this task run with the Artifacts API | |
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__, | |
image=result.location)) | |
with Flow("estimator-flow") as f: | |
model_choices = [ | |
SVC(gamma='auto'), NuSVC(gamma='auto'), LinearSVC(), | |
SGDClassifier(max_iter=100, tol=1e-3), KNeighborsClassifier(), | |
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3), | |
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300), | |
RandomForestClassifier(n_estimators=300) | |
] | |
visualizers = visualize_model.map(model_choices) | |
f.register("data-science-flows") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tempfile | |
import prefect | |
from prefect import task, Flow, Parameter | |
from prefect.engine.serializers import Serializer | |
from prefect.engine.results import S3Result | |
from yellowbrick.datasets import load_mushroom | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.metrics import f1_score | |
from sklearn.pipeline import Pipeline | |
from yellowbrick.classifier import ClassificationReport | |
from sklearn.svm import LinearSVC, NuSVC, SVC | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier | |
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier | |
MARKDOWN = """# {estimator_name} | |
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}"> | |
""" | |
class NoOpSerializer(Serializer): | |
"""A `Serializer` that does nothing.""" | |
def serialize(self, value): | |
return value | |
def deserialize(self, value): | |
return value | |
@task | |
def visualize_model(estimator, **kwargs): | |
""" | |
Test various estimators. | |
""" | |
X, y = load_mushroom() | |
y = LabelEncoder().fit_transform(y) | |
model = Pipeline([ | |
('one_hot_encoder', OneHotEncoder()), | |
('estimator', estimator) | |
]) | |
# Instantiate the classification model and visualizer | |
visualizer = ClassificationReport( | |
model, classes=['edible', 'poisonous'], | |
cmap="YlGn", size=(600, 360), **kwargs | |
) | |
visualizer.fit(X, y) | |
visualizer.score(X, y) | |
# Save the visualization using the Result API | |
tmp = tempfile.NamedTemporaryFile(suffix=".png") | |
visualizer.show(outpath=tmp, clear_figure=True) | |
tmp.seek(0) | |
visualizer_result = S3Result(bucket="visualizer-demo-images", location="{flow_run_name}/"+estimator.__class__.__name__+".png", serializer=NoOpSerializer()) | |
result = visualizer_result.write(tmp.read(), **prefect.context) | |
# Save the report for this task run with the Artifacts API | |
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__, | |
image=result.location)) | |
return model | |
@task | |
def map_model_parameters(**kwargs): | |
model_choices = [ | |
SVC(gamma=kwargs['svc_gamma']), NuSVC(gamma=kwargs['nusvc_gamma']), LinearSVC(), | |
SGDClassifier(max_iter=kwargs['sgdclassifier_maxiter'], tol=1e-3), KNeighborsClassifier(), | |
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3), | |
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300), | |
RandomForestClassifier(n_estimators=300) | |
] | |
return model_choices | |
with Flow("estimator-flow-parameterized") as f: | |
# Parameters | |
svc_gamma = Parameter('svc_gamma', default='auto') | |
nusvc_gamma = Parameter('nusvc_gamma', default='auto') | |
sgdclassifier_maxiter = Parameter('sgdclassifier max_iter', default=100) | |
model_choices = map_model_parameters(svc_gamma=svc_gamma, nusvc_gamma=nusvc_gamma, sgdclassifier_maxiter=sgdclassifier_maxiter) | |
visualizers = visualize_model.map(model_choices) | |
f.register("data-science-flows") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tempfile | |
import prefect | |
from prefect import task, Flow, Parameter | |
from prefect.engine.serializers import Serializer | |
from prefect.engine.results import S3Result | |
from yellowbrick.datasets import load_mushroom | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from yellowbrick.classifier import ClassificationReport | |
from sklearn.metrics import f1_score | |
from sklearn.pipeline import Pipeline | |
from sklearn.svm import LinearSVC, NuSVC, SVC | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression, SGDClassifier | |
from sklearn.ensemble import BaggingClassifier, ExtraTreesClassifier, RandomForestClassifier | |
MARKDOWN = """# {estimator_name} | |
<img src="https://visualizer-demo-images.s3.amazonaws.com/{image}"> | |
## Model Location: {model_location} | |
""" | |
class NoOpSerializer(Serializer): | |
"""A `Serializer` that does nothing.""" | |
def serialize(self, value): | |
return value | |
def deserialize(self, value): | |
return value | |
@task(result=S3Result(bucket="visualizer-demo-models", location="{flow_run_name}/{task_run_id}")) | |
def visualize_model(estimator, **kwargs): | |
""" | |
Test various estimators. | |
""" | |
X, y = load_mushroom() | |
y = LabelEncoder().fit_transform(y) | |
model = Pipeline([ | |
('one_hot_encoder', OneHotEncoder()), | |
('estimator', estimator) | |
]) | |
# Instantiate the classification model and visualizer | |
visualizer = ClassificationReport( | |
model, classes=['edible', 'poisonous'], | |
cmap="YlGn", size=(600, 360), **kwargs | |
) | |
visualizer.fit(X, y) | |
visualizer.score(X, y) | |
# Save the visualization using the Result API | |
tmp = tempfile.NamedTemporaryFile(suffix=".png") | |
visualizer.show(outpath=tmp, clear_figure=True) | |
tmp.seek(0) | |
visualizer_result = S3Result(bucket="visualizer-demo-images", location="{flow_run_name}/"+estimator.__class__.__name__+".png", serializer=NoOpSerializer()) | |
result = visualizer_result.write(tmp.read(), **prefect.context) | |
# Save the report for this task run with the Artifacts API | |
prefect.artifacts.create_markdown(MARKDOWN.format(estimator_name=estimator.__class__.__name__, | |
image=result.location, | |
model_location="{flow_run_name}/{task_run_id}".format(**prefect.context))) | |
return model | |
@task | |
def map_model_parameters(**kwargs): | |
model_choices = [ | |
SVC(gamma=kwargs['svc_gamma']), NuSVC(gamma=kwargs['nusvc_gamma']), LinearSVC(), | |
SGDClassifier(max_iter=kwargs['sgdclassifier_maxiter'], tol=1e-3), KNeighborsClassifier(), | |
LogisticRegression(solver='lbfgs'), LogisticRegressionCV(cv=3), | |
BaggingClassifier(), ExtraTreesClassifier(n_estimators=300), | |
RandomForestClassifier(n_estimators=300) | |
] | |
return model_choices | |
with Flow("estimator-flow-model-storage") as f: | |
# Parameters | |
svc_gamma = Parameter('svc_gamma', default='auto') | |
nusvc_gamma = Parameter('nusvc_gamma', default='auto') | |
sgdclassifier_maxiter = Parameter('sgdclassifier max_iter', default=100) | |
model_choices = map_model_parameters(svc_gamma=svc_gamma, nusvc_gamma=nusvc_gamma, sgdclassifier_maxiter=sgdclassifier_maxiter) | |
visualizers = visualize_model.map(model_choices) | |
f.register("data-science-flows") | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import prefect | |
from prefect import task, Flow, Parameter | |
from prefect.engine.results import PrefectResult, S3Result | |
import pandas as pd | |
from yellowbrick.datasets import load_mushroom | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
MARKDOWN="""# {prediction_h} | |
Shape: {shape} | |
Surface: {surface} | |
Color: {color} | |
""" | |
@task(result=PrefectResult()) | |
def predict(model_location, shape, surface, color): | |
# TODO: don't rebuild encoders | |
X, y = load_mushroom() | |
encoder = OneHotEncoder().fit(X) | |
labelEncoder = LabelEncoder().fit(y) | |
result = S3Result(bucket="visualizer-demo-models") | |
model = result.read(location=model_location).value | |
test_df = pd.DataFrame(data=[[shape, surface, color]],columns=['shape','surface','color']) | |
prediction = model.predict(test_df) | |
prediction_h = labelEncoder.inverse_transform(prediction)[0] | |
prefect.artifacts.create_markdown(MARKDOWN.format(prediction_h=prediction_h, | |
shape=shape, | |
surface=surface, | |
color=color)) | |
return prediction_h | |
with Flow("predict-flow") as f: | |
model_location = Parameter('model_location', default='mysterious-skink/07fbf311-e0fa-40b8-a221-2e4d4342d30e') | |
shape = Parameter('shape', default='convex') | |
surface = Parameter('surface', default='smooth') | |
color = Parameter('color', default='yellow') | |
predict(model_location, shape, surface, color) | |
# poisionous: convex, scaly, yellow | |
# edible: convex, smooth, yellow | |
f.register("data-science-flows") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment