Skip to content

Instantly share code, notes, and snippets.

@alanyee
Last active September 20, 2024 20:21

Revisions

  1. alanyee revised this gist Jan 7, 2021. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions tail.py
    Original file line number Diff line number Diff line change
    @@ -9,8 +9,7 @@

    SLEEP = 5

    def progress(profile_name, group_name, instance_id):
    """Main function here"""
    def main(profile_name, group_name, instance_id):
    client = boto3.session.Session(profile_name=profile_name).client('logs')
    # this can be `boto3.client(logs)` instead of using `session`
    filter_logs_events_kwargs = {
  2. alanyee renamed this gist Jan 7, 2021. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  3. alanyee created this gist Jan 7, 2021.
    81 changes: 81 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    """Using boto3, instead of awscli, to tail logs. Based on https://github.com/aws/aws-cli/blob/v2/awscli/customizations/logs/tail.py"""

    from collections import defaultdict
    import sys
    import time

    import boto3
    from botocore.exceptions import ClientError, ParamValidationError

    SLEEP = 5

    def progress(profile_name, group_name, instance_id):
    """Main function here"""
    client = boto3.session.Session(profile_name=profile_name).client('logs')
    # this can be `boto3.client(logs)` instead of using `session`
    filter_logs_events_kwargs = {
    "logGroupName": group_name,
    "logStreamNames": # list,
    "interleaved": True,
    "startTime": int(time.time()) * 1000
    }
    return _filter_log_events(client, filter_logs_events_kwargs)


    def _filter_log_events(client, filter_logs_events_kwargs):
    try:
    for event in _do_filter_log_events(client, filter_logs_events_kwargs):
    yield event
    except KeyboardInterrupt:
    # The only way to exit from the --follow is to Ctrl-C. So
    # we should exit the iterator rather than having the
    # KeyboardInterrupt propogate to the rest of the command.
    return

    def _get_latest_events_and_timestamp(event_ids_per_timestamp):
    if event_ids_per_timestamp:
    # Keep only ids of the events with the newest timestamp
    newest_timestamp = max(event_ids_per_timestamp.keys())
    event_ids_per_timestamp = defaultdict(
    set, {newest_timestamp: event_ids_per_timestamp[newest_timestamp]}
    )
    return event_ids_per_timestamp

    def _reset_filter_log_events_params(fle_kwargs, event_ids_per_timestamp):
    # Remove nextToken and update startTime for the next request
    # with the timestamp of the newest event
    if event_ids_per_timestamp:
    fle_kwargs['startTime'] = max(
    event_ids_per_timestamp.keys()
    )
    fle_kwargs.pop('nextToken', None)

    def _do_filter_log_events(client, filter_logs_events_kwargs):
    event_ids_per_timestamp = defaultdict(set)
    while True:
    try:
    response = client.filter_log_events(
    **filter_logs_events_kwargs)
    except (ClientError, ParamValidationError):
    sys.exit("Invalid MFA one time pass code")

    for event in response['events']:
    # For the case where we've hit the last page, we will be
    # reusing the newest timestamp of the received events to keep polling.
    # This means it is possible that duplicate log events with same timestamp
    # are returned back which we do not want to yield again.
    # We only want to yield log events that we have not seen.
    if event['eventId'] not in event_ids_per_timestamp[event['timestamp']]:
    event_ids_per_timestamp[event['timestamp']].add(event['eventId'])
    yield event
    event_ids_per_timestamp = _get_latest_events_and_timestamp(
    event_ids_per_timestamp
    )
    if 'nextToken' in response:
    filter_logs_events_kwargs['nextToken'] = response['nextToken']
    else:
    _reset_filter_log_events_params(
    filter_logs_events_kwargs,
    event_ids_per_timestamp
    )
    time.sleep(SLEEP)