Skip to content

Instantly share code, notes, and snippets.

@rajrao
Created October 19, 2023 04:52
Show Gist options
  • Save rajrao/4667fc8f94dcdeebb773f28c69515e4a to your computer and use it in GitHub Desktop.
Save rajrao/4667fc8f94dcdeebb773f28c69515e4a to your computer and use it in GitHub Desktop.
Exponential Backoff with Jitter as used by Airflow Sensors
# based on https://github.com/apache/airflow/blob/9ac742885ffb83c15f7e3dc910b0cf9df073407a/airflow/sensors/base.py#L251C13-L251C13
import hashlib
from datetime import datetime, timedelta
from time import sleep
poke_interval = 5 # Time in seconds that the job should wait in between each tries
timeout = 60 # Time, in seconds before the task times out and fails.
started_at = datetime.now()
sleep(1)
exponential_backoff = True
def run_duration() -> float:
# If we are in reschedule mode, then we have to compute diff
# based on the time in a DB, so can't use time.monotonic
return (datetime.now() - started_at).total_seconds()
for try_number in range(100):
if run_duration() > timeout:
print("timeout")
break
min_backoff = int(poke_interval * (2 ** (try_number - 2)))
run_hash = int(
hashlib.sha1(f"{started_at}#{try_number}".encode()).hexdigest(),
16,
)
modded_hash = min_backoff + run_hash % min_backoff
delay_backoff_in_seconds = min(modded_hash, timedelta.max.total_seconds() - 1)
new_interval = min(timeout - int(run_duration()), delay_backoff_in_seconds)
sleep_time = new_interval if exponential_backoff else poke_interval
print("try: %s run duration: %s backoff interval is %s normal interval is %s"
% (try_number, run_duration(), new_interval, poke_interval))
sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment