Last active
September 9, 2024 05:24
Revisions
-
kubosuke revised this gist
Dec 7, 2023 . 1 changed file with 21 additions and 28 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,41 +1,34 @@ from datetime import datetime, timedelta import os import requests from prometheus_client.parser import text_string_to_metric_families PUSHGATEWAY_METRICS_URL = "https://xxx.com/metrics" PUSH_TIME_METRICS_NAME = "push_time_seconds" def is_metrics_stale(timestamp: float, retention_period_days: int = 7) -> bool: """Return True if the metrics is stale. Args: timestamp (float): epoch timestamp in prom format. ex: 1701841866.00605802 Returns: bool: True if the stale than RETENTION_PERIOD_DAYS(days). """ return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=retention_period_days) if __name__ == "__main__": with requests.session() as session: metrics_list_response = session.get(PUSHGATEWAY_METRICS_URL) metrics_list_response.raise_for_status() for family in text_string_to_metric_families(metrics_list_response.text): for sample in family.samples: if sample.name.startswith(PUSH_TIME_METRICS_NAME): if is_metrics_stale(sample.value): url = f"{PUSHGATEWAY_METRICS_URL}/job/{sample.labels['job']}/instance/{sample.labels['instance']}" res = session.delete(url) res.raise_for_status() print(f"DELETE {url}") -
kubosuke revised this gist
Dec 6, 2023 . 1 changed file with 3 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,10 +1,11 @@ from datetime import datetime, timedelta import os import re from typing import List import requests RETENTION_PERIOD_DAYS = 7 PUSHGATEWAY_METRICS_URL = "https://xx.com/metrics" def list_push_time_seconds_list(session: requests.Session) -> List[str]: @@ -33,7 +34,7 @@ def create_url_from_prom_metrics(prom_metrics: str) -> str: with requests.session() as session: for push_time_seconds in list_push_time_seconds_list(session=session): push_time_info, push_time_epoch = push_time_seconds.split() if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)): url = create_url_from_prom_metrics(push_time_info) res = session.delete(url) res.raise_for_status() -
kubosuke created this gist
Dec 6, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,40 @@ from datetime import datetime, timedelta import re from typing import List import requests RETENTION_PERIOD_DAYS = 7 PUSHGATEWAY_METRICS_URL = "https://example.com/metrics" def list_push_time_seconds_list(session: requests.Session) -> List[str]: res = session.get(PUSHGATEWAY_METRICS_URL) res.raise_for_status() return filter(lambda s: s.startswith("push_time_seconds"), res.text.splitlines()) def is_metrics_stale(timestamp: float) -> bool: return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=RETENTION_PERIOD_DAYS) def create_url_from_prom_metrics(prom_metrics: str) -> str: pattern = r"\{(.*?)\}" match = re.search(pattern, prom_metrics) metrics_info_map = {} if match: for info in match.group(1).split(","): key, value = info.split("=") value = value.removeprefix('"').removesuffix('"') metrics_info_map.update({key: value}) return f"{PUSHGATEWAY_METRICS_URL}/job/{metrics_info_map['job']}/instance/{metrics_info_map['instance']}" if __name__ == "__main__": with requests.session() as session: for push_time_seconds in list_push_time_seconds_list(session=session): push_time_info, push_time_epoch = push_time_seconds.split() if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)) or True: url = create_url_from_prom_metrics(push_time_info) res = session.delete(url) res.raise_for_status() print(f"DELETE {url}")