Skip to content

Instantly share code, notes, and snippets.

@kubosuke
Last active September 9, 2024 05:24
  • Select an option

Select an option

Revisions

  1. kubosuke revised this gist Dec 7, 2023. 1 changed file with 21 additions and 28 deletions.
    49 changes: 21 additions & 28 deletions delete-stale-metrics-from-pushgateway.py
    Original file line number Diff line number Diff line change
    @@ -1,41 +1,34 @@
    from datetime import datetime, timedelta
    import os
    import re
    from typing import List
    import requests
    from prometheus_client.parser import text_string_to_metric_families

    RETENTION_PERIOD_DAYS = 7
    PUSHGATEWAY_METRICS_URL = "https://xx.com/metrics"

    PUSHGATEWAY_METRICS_URL = "https://xxx.com/metrics"
    PUSH_TIME_METRICS_NAME = "push_time_seconds"

    def list_push_time_seconds_list(session: requests.Session) -> List[str]:
    res = session.get(PUSHGATEWAY_METRICS_URL)
    res.raise_for_status()
    return filter(lambda s: s.startswith("push_time_seconds"), res.text.splitlines())

    def is_metrics_stale(timestamp: float, retention_period_days: int = 7) -> bool:
    """Return True if the metrics is stale.
    def is_metrics_stale(timestamp: float) -> bool:
    return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=RETENTION_PERIOD_DAYS)
    Args:
    timestamp (float): epoch timestamp in prom format. ex: 1701841866.00605802

    def create_url_from_prom_metrics(prom_metrics: str) -> str:
    pattern = r"\{(.*?)\}"
    match = re.search(pattern, prom_metrics)
    metrics_info_map = {}
    if match:
    for info in match.group(1).split(","):
    key, value = info.split("=")
    value = value.removeprefix('"').removesuffix('"')
    metrics_info_map.update({key: value})
    return f"{PUSHGATEWAY_METRICS_URL}/job/{metrics_info_map['job']}/instance/{metrics_info_map['instance']}"
    Returns:
    bool: True if the stale than RETENTION_PERIOD_DAYS(days).
    """
    return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=retention_period_days)


    if __name__ == "__main__":
    with requests.session() as session:
    for push_time_seconds in list_push_time_seconds_list(session=session):
    push_time_info, push_time_epoch = push_time_seconds.split()
    if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)):
    url = create_url_from_prom_metrics(push_time_info)
    res = session.delete(url)
    res.raise_for_status()
    print(f"DELETE {url}")
    metrics_list_response = session.get(PUSHGATEWAY_METRICS_URL)
    metrics_list_response.raise_for_status()
    for family in text_string_to_metric_families(metrics_list_response.text):
    for sample in family.samples:
    if sample.name.startswith(PUSH_TIME_METRICS_NAME):
    if is_metrics_stale(sample.value):
    url = f"{PUSHGATEWAY_METRICS_URL}/job/{sample.labels['job']}/instance/{sample.labels['instance']}"
    res = session.delete(url)
    res.raise_for_status()
    print(f"DELETE {url}")
  2. kubosuke revised this gist Dec 6, 2023. 1 changed file with 3 additions and 2 deletions.
    5 changes: 3 additions & 2 deletions delete-stale-metrics-from-pushgateway.py
    Original file line number Diff line number Diff line change
    @@ -1,10 +1,11 @@
    from datetime import datetime, timedelta
    import os
    import re
    from typing import List
    import requests

    RETENTION_PERIOD_DAYS = 7
    PUSHGATEWAY_METRICS_URL = "https://example.com/metrics"
    PUSHGATEWAY_METRICS_URL = "https://xx.com/metrics"


    def list_push_time_seconds_list(session: requests.Session) -> List[str]:
    @@ -33,7 +34,7 @@ def create_url_from_prom_metrics(prom_metrics: str) -> str:
    with requests.session() as session:
    for push_time_seconds in list_push_time_seconds_list(session=session):
    push_time_info, push_time_epoch = push_time_seconds.split()
    if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)) or True:
    if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)):
    url = create_url_from_prom_metrics(push_time_info)
    res = session.delete(url)
    res.raise_for_status()
  3. kubosuke created this gist Dec 6, 2023.
    40 changes: 40 additions & 0 deletions delete-stale-metrics-from-pushgateway.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    from datetime import datetime, timedelta
    import re
    from typing import List
    import requests

    RETENTION_PERIOD_DAYS = 7
    PUSHGATEWAY_METRICS_URL = "https://example.com/metrics"


    def list_push_time_seconds_list(session: requests.Session) -> List[str]:
    res = session.get(PUSHGATEWAY_METRICS_URL)
    res.raise_for_status()
    return filter(lambda s: s.startswith("push_time_seconds"), res.text.splitlines())


    def is_metrics_stale(timestamp: float) -> bool:
    return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=RETENTION_PERIOD_DAYS)


    def create_url_from_prom_metrics(prom_metrics: str) -> str:
    pattern = r"\{(.*?)\}"
    match = re.search(pattern, prom_metrics)
    metrics_info_map = {}
    if match:
    for info in match.group(1).split(","):
    key, value = info.split("=")
    value = value.removeprefix('"').removesuffix('"')
    metrics_info_map.update({key: value})
    return f"{PUSHGATEWAY_METRICS_URL}/job/{metrics_info_map['job']}/instance/{metrics_info_map['instance']}"


    if __name__ == "__main__":
    with requests.session() as session:
    for push_time_seconds in list_push_time_seconds_list(session=session):
    push_time_info, push_time_epoch = push_time_seconds.split()
    if is_metrics_stale(float(str(push_time_epoch).removesuffix("e+09")) * pow(10, 9)) or True:
    url = create_url_from_prom_metrics(push_time_info)
    res = session.delete(url)
    res.raise_for_status()
    print(f"DELETE {url}")