-
-
Save jeanmonet/6757a3f38a15621d33b13179416eb8db to your computer and use it in GitHub Desktop.
Find runs of consecutive items in a numpy array.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python 3.9 | |
from __future__ import annotations | |
import numpy as np | |
def find_runs(array: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
""" | |
Find runs of consecutive items in an array that are equal to each other. | |
Ie. find equal elements that are adjacent to each other. | |
https://gist.github.com/alimanfoo/c5977e87111abe8127453b21204c1065 | |
""" | |
# ensure array | |
array = np.asanyarray(array) | |
if array.ndim != 1: | |
raise ValueError('only 1D array supported') | |
size = array.shape[0] | |
# handle empty array | |
if size == 0: | |
return np.array([]), np.array([]), np.array([]) | |
else: | |
# find run starts | |
loc_run_start = np.empty(size, dtype=bool) | |
loc_run_start[0] = True # The first element is necessarily the start of a run | |
# Key: two consecutive elements not equal ==> True = 1 | |
# Store result in loc_run_start where first item already set to True | |
np.not_equal(array[:-1], array[1:], out=loc_run_start[1:]) | |
# Key: index of each element in loc_run_start that is not equal to the next element | |
# ^-- This means a new run starts at each such point. | |
# If "False" means element is equal to previous element, thus part of a run | |
run_starts = np.nonzero(loc_run_start)[0] | |
# find run values | |
run_values = array[loc_run_start] | |
# find run lengths | |
run_lengths = np.diff( # Diff of two consecutive elements (indices) | |
# ^-- Calculate the n-th discrete difference along the given axis. | |
np.append(run_starts, size)) # Appends the last element index (index of x[-1]) | |
return run_values, run_starts, run_lengths | |
def find_runs_of_item(array: np.ndarray, item) -> tuple[np.ndarray, np.ndarray]: | |
""" | |
Find start positions and lengths of consecutive occurrences of a given element, inside an array. | |
>>> x = np.array([9, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 0, 1, 2, 3, 4, 5, 9, 9, 9, 9]) | |
>>> run_starts, run_lengths = find_runs_of_item(x, 9) | |
>>> # (array([ 0, 11, 20]), array([2, 3, 4])) | |
>>> find_runs_of_item(x, 0) | |
>>> # (array([ 2, 14]), array([1, 1])) | |
>>> find_runs_of_item(x, 100) | |
>>> # (array([], dtype=int64), array([], dtype=int64)) | |
""" | |
item_indexes = np.where(array == item)[0] | |
if item_indexes.size == 0: | |
# Empty result | |
return item_indexes, item_indexes | |
consecutive_diffs = np.diff(item_indexes) | |
run_interrupts = np.where(consecutive_diffs > 1)[0] + 1 | |
run_interrupts = np.insert(run_interrupts, | |
[0, len(run_interrupts)], # where to insert | |
[0, len(item_indexes)]) # what to insert | |
run_starts = item_indexes[run_interrupts[:-1]] | |
run_lengths = np.diff(run_interrupts) | |
return run_starts, run_lengths | |
def find_run_of_item_with_min_length(array: np.ndarray, item, min_run_len: int = 2) -> tuple[np.ndarray, np.ndarray]: | |
""" | |
Find runs of a minimum length of a certain item inside a given array. | |
>>> x = np.array([9, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 0, 1, 2, 3, 4, 5, 9, 9, 9, 9]) | |
>>> find_run_of_item_with_min_length(array, item=9, min_run_len=3) | |
>>> # (array([11, 20]), array([3, 4]))* | |
>>> find_run_of_item_with_min_length(array, 9, 6) # item not found or no run with such length | |
>>> # (array([], dtype=int64), array([], dtype=int64)) | |
""" | |
run_starts, run_lengths = find_runs_of_item(array, item) | |
ix = np.where(run_lengths >= min_run_len)[0] | |
return run_starts[ix], run_lengths[ix] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment