Skip to content

Instantly share code, notes, and snippets.

@kubosuke
Last active September 9, 2024 05:24
Show Gist options
  • Select an option

  • Save kubosuke/c8bbc83367d1084ae26e07bc2487e0d5 to your computer and use it in GitHub Desktop.

Select an option

Save kubosuke/c8bbc83367d1084ae26e07bc2487e0d5 to your computer and use it in GitHub Desktop.
Delete stale metrics from pushgateway
from datetime import datetime, timedelta
import os
import re
from typing import List
import requests
RETENTION_PERIOD_DAYS = 7
PUSHGATEWAY_METRICS_URL = "https://xx.com/metrics"
def list_push_time_seconds_list(session: requests.Session) -> List[str]:
res = session.get(PUSHGATEWAY_METRICS_URL)
res.raise_for_status()
return filter(lambda s: s.startswith("push_time_seconds"), res.text.splitlines())
def is_metrics_stale(timestamp: float) -> bool:
return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=RETENTION_PERIOD_DAYS)
def create_url_from_prom_metrics(prom_metrics: str) -> str:
pattern = r"\{(.*?)\}"
match = re.search(pattern, prom_metrics)
metrics_info_map = {}
if match:
for info in match.group(1).split(","):
key, value = info.split("=")
value = value.removeprefix('"').removesuffix('"')
metrics_info_map.update({key: value})
return f"{PUSHGATEWAY_METRICS_URL}/job/{metrics_info_map['job']}/instance/{metrics_info_map['instance']}"
if __name__ == "__main__":
with requests.session() as session:
for push_time_seconds in list_push_time_seconds_list(session=session):
push_time_info, push_time_epoch = push_time_seconds.split()
if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)):
url = create_url_from_prom_metrics(push_time_info)
res = session.delete(url)
res.raise_for_status()
print(f"DELETE {url}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment