Skip to content

Instantly share code, notes, and snippets.

@kubosuke
Last active September 9, 2024 05:24
Show Gist options
  • Select an option

  • Save kubosuke/c8bbc83367d1084ae26e07bc2487e0d5 to your computer and use it in GitHub Desktop.

Select an option

Save kubosuke/c8bbc83367d1084ae26e07bc2487e0d5 to your computer and use it in GitHub Desktop.
Delete stale metrics from pushgateway
from datetime import datetime, timedelta
import re
from typing import List
import requests
RETENTION_PERIOD_DAYS = 7
PUSHGATEWAY_METRICS_URL = "https://example.com/metrics"
def list_push_time_seconds_list(session: requests.Session) -> List[str]:
res = session.get(PUSHGATEWAY_METRICS_URL)
res.raise_for_status()
return filter(lambda s: s.startswith("push_time_seconds"), res.text.splitlines())
def is_metrics_stale(timestamp: float) -> bool:
return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=RETENTION_PERIOD_DAYS)
def create_url_from_prom_metrics(prom_metrics: str) -> str:
pattern = r"\{(.*?)\}"
match = re.search(pattern, prom_metrics)
metrics_info_map = {}
if match:
for info in match.group(1).split(","):
key, value = info.split("=")
value = value.removeprefix('"').removesuffix('"')
metrics_info_map.update({key: value})
return f"{PUSHGATEWAY_METRICS_URL}/job/{metrics_info_map['job']}/instance/{metrics_info_map['instance']}"
if __name__ == "__main__":
with requests.session() as session:
for push_time_seconds in list_push_time_seconds_list(session=session):
push_time_info, push_time_epoch = push_time_seconds.split()
if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)) or True:
url = create_url_from_prom_metrics(push_time_info)
res = session.delete(url)
res.raise_for_status()
print(f"DELETE {url}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment