Created
October 19, 2023 04:52
-
-
Save rajrao/4667fc8f94dcdeebb773f28c69515e4a to your computer and use it in GitHub Desktop.
Exponential Backoff with Jitter as used by Airflow Sensors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# based on https://github.com/apache/airflow/blob/9ac742885ffb83c15f7e3dc910b0cf9df073407a/airflow/sensors/base.py#L251C13-L251C13 | |
import hashlib | |
from datetime import datetime, timedelta | |
from time import sleep | |
poke_interval = 5 # Time in seconds that the job should wait in between each tries | |
timeout = 60 # Time, in seconds before the task times out and fails. | |
started_at = datetime.now() | |
sleep(1) | |
exponential_backoff = True | |
def run_duration() -> float: | |
# If we are in reschedule mode, then we have to compute diff | |
# based on the time in a DB, so can't use time.monotonic | |
return (datetime.now() - started_at).total_seconds() | |
for try_number in range(100): | |
if run_duration() > timeout: | |
print("timeout") | |
break | |
min_backoff = int(poke_interval * (2 ** (try_number - 2))) | |
run_hash = int( | |
hashlib.sha1(f"{started_at}#{try_number}".encode()).hexdigest(), | |
16, | |
) | |
modded_hash = min_backoff + run_hash % min_backoff | |
delay_backoff_in_seconds = min(modded_hash, timedelta.max.total_seconds() - 1) | |
new_interval = min(timeout - int(run_duration()), delay_backoff_in_seconds) | |
sleep_time = new_interval if exponential_backoff else poke_interval | |
print("try: %s run duration: %s backoff interval is %s normal interval is %s" | |
% (try_number, run_duration(), new_interval, poke_interval)) | |
sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment