It's pretty easy to get going with Prefect. But when jobs fail, there are no native notifications that run to let you know.
This is the easiest way to get going with a slack notification system for failed and crashed flow-runs (or any other state you care about).
- Go to https://api.slack.com/apps
- Create a new app ->
From a Manifest
-> Update the name "Prefect Workflow Failures" (or whatever you want)
In your app settings, click on "Incoming Webhooks"
Turn it on, then click "Add New Webhook to Workspace" at the bottom
Pick the channel that this webhook will go to. Pick whatever makes sense for you, or make a new slack channel. Once you do, you should see the Webhook URLs section populate
Copy that URL, and head over to prefect. From the left nav, click Settings -> Blocks, and then the + at the top to add a new block
Search for Slack Webhook (not Slack Credentials). Add a block name, drop in the webhook url you copied
I set the notify type to failure, but that's optional.
Now, in your prefect code, add a new module. I called it notify
but you can call it anything, or put it in some utility module you already have.
[! NOTE] If your flow is async, this notify function must be async. And if it's sync, the function must be sync. I'll show both, they are similar.
from prefect import Flow
from prefect.blocks.notifications import SlackWebhook
from prefect.server.schemas.core import FlowRun
from prefect.server.schemas.states import State
from prefect.settings import PREFECT_UI_URL
async def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
# Fill in the name of the prefect slack-webhook block you just made.
slack_webhook_block = await SlackWebhook.aload("prefect-slack-block-name")
# We need to type: ignore because of some prefect async issues
# For more details: https://github.com/PrefectHQ/prefect/issues/15008
await slack_webhook_block.notify( # type: ignore
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <{PREFECT_UI_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
If you have a sync flow (ie your main flow entrypoint is defined with def
instead of async def
, it's the same imports, but you'll add this function:
def notify_slack(flow: Flow, flow_run: FlowRun, state: State):
slack_webhook_block = SlackWebhook.load("prefect-flow-failures")
slack_webhook_block.notify(
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <{PREFECT_UI_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
Whichever way you define your notify, you will add it to your flow with the same syntax:
@flow(on_failure=[notify_slack], on_crashed=[notify_slack], on_cancellation=[notify_slack])
def main():
...
You can have it run on failure
, crash
, cancellation
, running
, or completion
. For similar reasons as listed above,
you might see a pylance/mypy error on that line if your function is async, which you can safely type: ignore
.
That's it! Now you'll get slack notifications on flow failures and crashes.
