Last active
August 11, 2023 13:14
-
-
Save gmalmquist/f9df224881b503bd03a3 to your computer and use it in GitHub Desktop.
Command-line stream editing using python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
from __future__ import print_function | |
from collections import defaultdict | |
import datetime | |
import io | |
import json | |
import locale | |
import os | |
import re | |
import sys | |
from textwrap import dedent | |
date = datetime.datetime.strptime | |
def union(lists): | |
ls = [] | |
for sublist in lists: | |
ls.extend(sublist) | |
return ls | |
def intersection(*args): | |
result = None | |
for arg in args: | |
if result is None: | |
result = set(arg) | |
else: | |
result = result.intersection(set(arg)) | |
return result | |
def underindent(lines): | |
def ui(line): | |
l = len(line) | |
line = line.lstrip() | |
return '{}{}'.format('_'*(l - len(line)), line) | |
return map(ui, lines) | |
def t2sort(lines, pos=-1, df='%b %d, %Y %I:%M %p'): | |
def fixlines(lines): | |
cline = '' | |
for line in lines: | |
if re.search('[0-9]+:[0-9]+ [ap]m*', line.strip()): | |
if len(cline) > 0: cline += ' ' | |
yield cline + line | |
cline = '' | |
else: | |
if len(cline) > 0: cline += ' ' | |
cline += line | |
if cline: | |
yield cline | |
lines = fixlines(lines) | |
lines = [l.strip() for l in lines if re.search('[0-9]+ [ap]m*$', l.strip())] | |
lines = [l for l in lines if l] | |
lines = sorted(lines, key=lambda l: date(l.split('\t')[pos], df)) | |
return lines | |
def after(s, p, right=False): | |
if p not in s: | |
return s | |
try: | |
i = s.rfind(p) if right else s.find(p) | |
return s[i+len(p):] | |
except: # Assume is list. | |
for i,e in enumerate(s): | |
if e == p: | |
break | |
else: | |
i = -1 | |
if i >= 0: | |
return s[i+1:] | |
return s | |
def before(s, p, right=False): | |
if p not in s: | |
return s | |
try: | |
i = s.rfind(p) if right else s.find(p) | |
if i < 0: i = len(s) | |
return s[:i] | |
except: # Assume is list. | |
for i,e in enumerate(s): | |
if e == p: | |
return s[:i] | |
return s | |
def aftf(p): | |
return lambda s: after(s, p) | |
def beff(p): | |
return lambda s: before(s, p) | |
def strip(s): | |
return s.strip() | |
def indent(s, spaces=4): | |
return ' '*4+s | |
def block_indent(s, spaces=4): | |
return '\n'.join([indent(line, spaces) for line in dedent(s).split('\n')]) | |
def numbered(path): | |
h = open(path, 'r') | |
lines = h.readlines() | |
h.close() | |
return ''.join(['%s:%00d: %s' % (path, i+1, s) for (i,s) in enumerate(lines)]) | |
def abs_listdir(path): | |
if not os.path.exists(path) or not os.path.isdir(path): | |
return [] | |
return [os.path.join(path, name) for name in os.listdir(path)] | |
def call_me_maybe(value, old_value): | |
if not value: | |
return value | |
if callable(value): | |
return value(old_value) | |
if isinstance(value, tuple) and all(callable(v) for v in value): | |
for c in value: | |
old_value = c(old_value) | |
return old_value | |
return value | |
def counts(ls): | |
cts = defaultdict(int) | |
for item in ls: | |
cts[item] += 1 | |
return ['{}: {}'.format(key, value) for key, value in sorted(cts.items(), key=lambda x: -x[1])] | |
def unique(ls): | |
seen = set() | |
results = [] | |
for item in ls: | |
if item not in seen: | |
seen.add(item) | |
results.append(item) | |
return results | |
def hist(key=None): | |
if key is None: | |
key = lambda x: x | |
def hist(ls): | |
cts = defaultdict(int) | |
for item in ls: | |
cts[key(item)] += 1 | |
max_len = max(len('key'), max(map(len, map(str, cts.keys())))) | |
max_val_len = max(len('counts'), max(map(len, map(str, cts.values())))) | |
wall_vertical = '║' | |
wall_horizontal = '═' | |
wall_both = '╬' | |
col_fmt = '{v} {col:<' + str(max_len) + '}' | |
val_fmt = '{val:<' + str(max_val_len) + '} {v}' | |
rows = [(col_fmt + ' {v} ' + val_fmt).format(col=k, v=wall_vertical, val=val) for k, val in cts.items()] | |
return [ | |
'╔' + (wall_horizontal*(max_len + 2)) + '╦' + (wall_horizontal*(max_val_len + 2)) + '╗', | |
('%s %s %s' % (col_fmt, wall_vertical, val_fmt)).format(col='key', v=wall_vertical, val='counts'), | |
'{}{}{}{}{}'.format('╠', wall_horizontal*(2+max_len), wall_both, wall_horizontal*(2+len('counts')), '╣'), | |
] + rows + [ | |
'╚' + (wall_horizontal*(max_len + 2)) + '╩' + (wall_horizontal*(max_val_len + 2)) + '╝', | |
] | |
return hist | |
def alphabet(i): | |
try: | |
i = int(i) | |
except: | |
return '' | |
result = [] | |
while i > 0: | |
result.append((i-1) % 26) | |
i = int((i-1) / 26) | |
return ''.join(chr(ord('A') + x) for x in reversed(result)) | |
def reindex_from(x): | |
return lambda lines: reindex(lines, x) | |
def reindex(lines, start_index=0): | |
pattern = '@Index[(].*?[)]' | |
index = start_index - 1 | |
for line in lines: | |
if not re.search(pattern, line): | |
yield line | |
continue | |
index += 1 | |
yield re.sub(pattern, '@Index(%s)' % index, line) | |
def hrnum(x): | |
locale.setlocale(locale.LC_ALL, 'en_US') | |
return locale.format('%f', float(str(x).strip()), grouping=True) | |
def main(): | |
lines = [] | |
while '-help' not in sys.argv[1:] and '-h' not in sys.argv[1:]: | |
try: | |
line = raw_input() | |
except: break | |
if line is None: break | |
while '\n' in line: | |
n = line.find('\n') | |
lines.append(line[:n]) | |
line = line[n+1:] | |
lines.append(line) | |
flags = set() | |
args = [] | |
for a in sys.argv[1:]: | |
if re.match('^[-][a-zA-Z]+$', a) is not None: | |
flags.update(a[1:]) | |
else: | |
args.append(a) | |
# Useful Constants | |
NL = '\n' | |
TB = '\t' | |
CN = ':' | |
BS = '\\' | |
FS = '/' | |
SR = '*' | |
SP = ' ' | |
SQ = "'" | |
DQ = '"' | |
SC = ';' | |
CC = ':' | |
if 'h' in flags or 'help' in flags: | |
print('Flags:') | |
print(' -s: Convert all lines to single NL-joined string s, and process it.') | |
print(' -f: Return lines filtered by predicate over variable s.') | |
print(' -g: Like -s, but override the lines variable instead and print out NL.join(lines)') | |
print(' : default behavior is to run command once per line, with lines[i] = s.') | |
return | |
command = 's = ' + ' '.join(args) | |
if 's' in flags: | |
s = '\n'.join(lines) | |
old_s = s | |
exec(command) | |
s = call_me_maybe(s, old_s) | |
print(s) | |
elif 'f' in flags: | |
nlines = [] | |
for i,s in enumerate(lines): | |
line = s | |
exec(command) | |
s = call_me_maybe(s, line) | |
if s: nlines.append(line) | |
print('\n'.join(nlines)) | |
elif 'g' in flags: | |
s = '\n'.join(lines) | |
old_lines = lines | |
command = 'lines = ' + ' '.join(args) | |
exec(command) | |
lines = call_me_maybe(lines, old_lines) | |
s = '\n'.join(lines) | |
print(s) | |
else: | |
for i,s in enumerate(lines): | |
old_s = s | |
exec(command) | |
s = call_me_maybe(s, old_s) | |
lines[i] = str(s) | |
print('\n'.join(lines)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment