# Get the file name without extension: import os print os.path.splitext("path_to_file")[0] #Convert String to unicode: "".join(["\\x%02x" % ord(i) for i in my_code]) #Get File Directory from file path: "/".join(file_name.split("/")[0:-1]) # Get File Name from file path file_name.split("/")[-1].split(".")[0] #Python Logging: import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Python Argument Parsing import argparse parser = argparse.ArgumentParser(description='PyFlow: Debugger') parser.add_argument('-f', '--file', help='Input File for debugging', required=True) parser.add_argument('-e', help="Run Flask", default=False, action="store_true") args = vars(parser.parse_args()) # Check if file exists: os.path.isfile(fname) # Sorting - Descending order a= [1,5,3,6,8,3,4,5,7] a.sort(reverse=True) # Check common items between two lists from collections import Counter A = [2,4,4,5,6,7,8,9] B = [1,3,4,5,4,3,5,6] print(len(list((Counter(A) & Counter(B)).elements()))) # Modify each element in a list list_mod = [x.__add__(1) for x in list_mod] # Open webpage in default browser import webbrowser webbrowser.open(url) # Function Overloading - Single Dispatch from functools import singledispatch class abc(object): @singledispatch def __init__(self, arg, verbose=False): if verbose: print("Let me just say,", end=" ") print(arg) pass @__init__.register(int) def _(self, arg, verbose=False): if verbose: print("Strength in numbers, eh?", end=" ") print(arg) @__init__.register(list) def _(self, arg, verbose=False): if verbose: print("Enumerate this:") for i, elem in enumerate(arg): print(i, elem) abc('abc', verbose=True) # Adding two sets a = {0,1,1,1,2,3} b = {1,2,3,4} print(a | b) # Jinja 2 Template Render from jinja2 import Template template = Template('Hello {{ name }}!') print(template.render(name='John Doe')) # Removing None values from a dictionary filtered = {k: v for k, v in original.items() if v is not None} original.clear() original.update(filtered) # Accessing Dictionary d = { 'k1': 'v1', 'k2': 'v2' } # Accessing keys for k in d: print(k) # Accessing values for v in d.values(): print(v) # Accessing key with values for k, v in d.items(): print(k, v) # Preserver json ordering in json loads import json json.load(filename, object_pairs_hook=collections.OrderDict) # Execute a file in python exec(open('./filename.py').read()) # Concatenate list import itertools lists = [['hello'], ['world', 'foo', 'bar']] combined = list(itertools.chain.from_iterable(lists)) # combined = [item for sublist in lists for item in sublist] # Padding function a += [''] * (N - len(a)) # Singleton Design Patter class Singleton(type): _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] class Logger(metaclass=Singleton): pass # Ordered Dictionary from collections import OrderedDict d = OrderedDict([('b',2), ('a', 1)]) # Unit Test import unittest from primes import is_prime class PrimesTestCase(unittest.TestCase): """Tests for `primes.py`.""" def test_is_five_prime(self): """Is five successfully determined to be prime?""" self.assertTrue(is_prime(5)) if __name__ == '__main__': unittest.main() # DocTest def square(x): """Return the square of x. >>> square(2) 4 >>> square(-2) 4 """ return x * x if __name__ == '__main__': import doctest doctest.testmod() # Shape of a Listception import numpy as np a = np.array([[1,2,3],[1,2,3]]) len(a.shape) # ZIP sample = [(2, 9), (2, 9), (8, 9), (10, 9), (23, 26), (1, 9), (43, 44)] first,snd = zip(*sample) print first,snd (2, 2, 8, 10, 23, 1, 43) (9, 9, 9, 9, 26, 9, 44) # Two Lists from list comprehension rr,tt = zip(*[(i*10, i*12) for i in xrange(4)]) # Generate Combinations import itertools a = [[1,2,3],[4,5,6],[7,8,9,10]] list(itertools.product(*a)) # Python Curses - Static Placement with Dynamic display import time import curses stdscr = curses.initscr() stdscr.addstr(0, 0, "Hello") stdscr.refresh() time.sleep(1) stdscr.addstr(0, 0, "World! (with curses)") stdscr.refresh() # Python blink string def blink(char): print(char, end = '\r') time.sleep(0.5) print(' ' * 50, end = '\r') time.sleep(0.5) # Store shell command output to a variable import subprocess output = subprocess.check_output("cat /etc/services", shell=True) # Simple Threading from threading import Thread class MyThread(Thread): def __init__(self): ''' Constructor. ''' Thread.__init__(self) def run(self): pass thread = MyThread() thread.start() thread.join() # Convert Bytes to string in python3 str(bytes_string,'utf-8') # Python check if directory exists import os print(os.path.isdir("/home/el")) print(os.path.exists("/home/el/myfile.txt")) # Custom Thread with stop # http://stackoverflow.com/questions/16262132/how-terminate-python-thread-without-checking-flag-continuously import threading class My_Thread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.process = None def run(self): print "Starting " + self.name cmd = [ "bash", 'process.sh'] self.process = p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in iter(p.stdout.readline, b''): print ("-- " + line.rstrip()) print "Exiting " + self.name def stop(self): print "Trying to stop thread " if self.process is not None: self.process.terminate() self.process = None thr = My_Thread() thr.start() time.sleep(30) thr.stop() thr.join() # Global Package variables #a.py print(foo) #b.py import __builtin__ __builtin__.foo = 1 import a # Redirect STDOUT (print statements) to a file import sys sys.stdout = open(r'stdout.txt', 'w') # Thread Pool Executor in Python import concurrent.futures from time import sleep job_list = [3, 2, 1, 4, 5, 20] job_result = [] def run(id, sl): sleep(sl) print('Job ', id, ' completed') return id**2 with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor: for job_id, job in enumerate(job_list): job_result.append(executor.submit(run, job_id, job)) print([i.result() for i in job_result]) # Dict Comprehension d = {n: n**2 for n in range(5)} # Append Dict to another Dictionary x = OrderedDict([(1, 'a'), (2, 'b')]) y = OrderedDict([(3, 'a'), (4, 'b')]) x = OrderedDict(**x, **y) # Get biggest number float('inf') # Get smallest number float('-inf') # Simple Python Multi Processing from multiprocessing import Process, Pipe def fn(arg . . .): pass p = Process(target=fn, args=(arg ...)) p.start() p.join() #Find Slope slope = lambda a, b: (float(a[1]) - float(b[1])) / (float(a[0]) - float(b[0])) # Create One-hot vectors: a = np.array([1, 0, 3]) b = np.zeros((3, 4)) b[np.arange(3), a] = 1 # setInterval Function import threading def set_interval(func, sec): def func_wrapper(): set_interval(func, sec) func() t = threading.Timer(sec, func_wrapper) t.start() return t # Patch a function in python import types class Foo(object): def call_patched_bar(self, objfn_args:dict): self.bar(**objfn_args) pass def bar(target,x): print("x=",x) print("called from", target) def patch_me(target): target.bar = types.MethodType(bar,target) # Patch original class patch_me(Foo) Foo().call_patched_bar(objfn_args={'x':6}) # Patch object of a class f = Foo() patch_me(f) f.call_patched_bar(objfn_args={'x':6}) # Timing an expression import timeit class Foo(object): @staticmethod def bar(): pass f = Foo() # Which is a good design? # a. Foo.bar() # b. f.bar() exp1 = timeit.repeat('Foo.bar()', 'from __main__ import Foo', repeat=500, number=100000) exp2 = timeit.repeat('f.bar()', 'from __main__ import f', repeat=500, number=100000) print('min execution time for a single instance:') print(min(exp1)) print(min(exp2)) print('max execution time for a single instance:') print(max(exp1)) print(max(exp2)) print('total execution time:') print(sum(exp1)) print(sum(exp2)) # With block example class Foo(object): def __init__(self): print('Init called') def sample_method(self): print('Sample metod called') def __enter__(self): print('Enter called') return self def __exit__(self, exc_type, exc_val, exc_tb): print('Exit called') def __del__(self): print('Del called') print('Basic Object creation:\n') f = Foo() del f print('\n\nWith Block object creation:\n') with Foo() as f: pass # Custom Iterator class Bar(object): def __init__(self): self.idx = 0 self.data = range(4) def __iter__(self): return self def __next__(self): self.idx += 1 try: return self.data[self.idx-1] except IndexError: self.idx = 0 raise StopIteration # Done iterating. next = __next__ # python2.x compatibility. # Typehint a lambda function from typing import Callable is_even: Callable[[int], bool] = lambda x: (x % 2 == 0) # Merge List of List x = [[1,2,3], [4,5,6]] merged_list = sum(x, []) # Function Overloading using Lambda Fn def overload(*functions): return lambda *args, **kwargs: functions[len(args)](*args, **kwargs) function_to_override = overload( None, # For self argument lambda self, param1: function_to_override_one_param(param1), lambda self, param1, param2: function_to_override_one_param(param1, param2) ) """ Async Function Setup """ import threading class AsyncCall(object): def __init__(self, fn, callback=None): self.callable = fn self.callback = callback self.result = None def __call__(self, *args, **kwargs): self.Thread = threading.Thread(target=self.run, name=self.callable.__name__, args=args, kwargs=kwargs) self.Thread.start() return self def wait(self, timeout=None): self.Thread.join(timeout) if self.Thread.isAlive(): raise TimeoutError() else: return self.result def run(self, *args, **kwargs): self.result = self.callable(*args, **kwargs) if self.callback: self.callback(self.result) class AsyncMethod(object): def __init__(self, fn, callback=None): self.callable = fn self.callback = callback def __call__(self, *args, **kwargs): return AsyncCall(self.callable, self.callback)(*args, **kwargs) def async(f=None, callback=None): """ | **@author:** Prathyush SP | | Custom Exception Decorator. :param f: Function :param callback: Callaback Function """ if f is None: return partial(async, f=f, callback=callback) @wraps(f) def wrapped(*args, **kwargs): return AsyncMethod(f, callback)(*args, **kwargs) return wrapped @async def fnc(): for i in range(10): print(i) time.sleep(2) print('abcd') fnc() print('def') """" Async Function Setup End """" # Enum of Enum's import enum class Foo(enum.Enum): var_a = enum.auto() class Bar(enum.Enum): var_b = enum.auto() def __getattr__(self, item): if item != '_value_': return getattr(self.value, item) raise AttributeError # Raise only singe exception from multiple hierarchy try: 1/0 except Exception as e: raise ValueError('Value Error') from None # Module Path - Place this in base __init__.py MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) # Recurring Default Dictionary in Python class RecurringDefaultDict(dict): """Implementation of perl's autovivification feature.""" def __getitem__(self, item): try: return dict.__getitem__(self, item) except KeyError: value = self[item] = type(self)() return value x = RecurringDefaultDict() x[1][2][3][4]= 25 # Check if -ve no in python list any(n < 0 for n in any_list) # Disable import for a function from contextlib import contextmanager @contextmanager def custom_metric(): import tensorflow t = tensorflow.metrics delattr(tensorflow, "metrics") yield None tensorflow.metrics = t with custom_metric(): import tensorflow as tf try: print(tf.metrics) except AttributeError: print("Cannot import metrics inside custom metric") import tensorflow as tf print(tf.metrics) # Test if a function is printing the required log import logging from testfixtures import LogCapture logger = logging.getLogger('') with LogCapture() as logs: # my awesome code logger.error('My code logged an error') assert 'My code logged an error' in str(logs) # Simplest form of batching def batch(iterable, n=1): l = len(iterable) for ndx in range(0, l, n): yield iterable[ndx:min(ndx + n, l)] for x in batch(list(range(0, 10)), 3): print(x) # Find all the imported modules from modulefinder import ModuleFinder finder = ModuleFinder() finder.run_script("./main.py") for name, mod in finder.modules.items(): print(name) # Thread Safe Writer class SafeWriter: def __init__(self, *args): self.filewriter = open(*args) self.queue = Queue() self.finished = False Thread(name="SafeWriter", target=self.internal_writer).start() def write(self, data): self.queue.put(data) def internal_writer(self): while not self.finished: try: data = self.queue.get(True, 1) except Empty: continue self.filewriter.write(data) self.queue.task_done() def close(self): self.queue.join() self.finished = True self.filewriter.close() # Timeout Whileloop import time timeout = 10 # [seconds] timeout_start = time.time() while time.time() < timeout_start + timeout: pass