Skip to content

Instantly share code, notes, and snippets.

@Paulius-Maruska
Last active March 3, 2016 09:02
Show Gist options
  • Save Paulius-Maruska/552d6521eb710a4ec4c8 to your computer and use it in GitHub Desktop.
Save Paulius-Maruska/552d6521eb710a4ec4c8 to your computer and use it in GitHub Desktop.
smart mediator experiment
.idea
.venv
__pycache__
*.pyi

Mediator test

Experiment #1

Idea

A sort of mediator object, that could register new events/functions on the fly (using mediator as a decorator) and then those functions (or objects) would immediately become available as if they were methods of the mediator object.

Conclusion

Python 3 is a powerful tool - this works in code, although awkward imports are needed, so that the functions get the chance to register themselves with the mediator object.

Experiment #2

Idea

Have PyCharm show the registered functions as methods of the mediator in auto-completions, mouse-overs etc.

Conclusion

The dynamically registered methods do not show up in PyCharm, even when checkbox Collect run-time types information for code insight is checked in File > Settings > Build, Execution, Deployment > Python Debugger.

Idea #2

PyCharm supports type hinting with python stubs as described in pep-0484. So the idea is to generate such stub file for PyCharm and see if PyCharm is able to pick it up and understand it.

Conclusion

PyCharm EAP kind of supports it, but it is also confused AF. In some files - syntax highlight stops working, in some other files - everything works fine.

TL;DR - It's complicated.

PyCharm seems to fully support it, although the fact that CONTROL+LCLICK on a mediator method takes you to the stub file, instead of an actual function is somewhat annoying.

System Information

System

$ uname -srvmo
Linux 3.19.0-51-generic #58~14.04.1-Ubuntu SMP Fri Feb 26 22:02:58 UTC 2016 x86_64 GNU/Linux

PyCharm EAP

PyCharm PY-145.61.29
Build #PY-145.61, built on February 26, 2016
JRE: 1.8.0_76-release-b18 amd64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o

PyCharm

PyCharm 5.0.4
Build #PY-143.1919, built on January 28, 2016
JRE: 1.7.0_95-b00 amd64
JVM: OpenJDK 64-Bit Server VM by Oracle Corporation
from mediator import mediator
@mediator
def min_numbers(*args):
"""returns a smallest number
:param args: numbers.
:return: smallest number.
"""
return min(args)
@mediator
def max_numbers(*args):
"""returns a biggest number.
:param args: numbers.
:return: biggest number.
"""
return max(args)
@mediator
def sum_numbers(*args):
return sum(args)
@mediator
def avg_numbers(*args):
s = mediator.sum_numbers(*args)
c = mediator.len_numbers(*args)
if c == 0:
return 0
return s / c
@mediator
def group1_fun(n: str, i: (int, float), *args, **kwargs) -> tuple:
"""Docstring for :func:`group1_fun`.
:param n: some text.
:param i: some number.
:param args: anything else.
:param kwargs: anything else.
:return: a tuple (:param:`n`, :param:`i`, :param:`args`, :param:`kwargs`).
"""
return n, i, args, kwargs
@mediator
def group1_gun(n: str, i: (int, float),
*args, **kwargs) -> tuple:
"""Docstring for :func:`group1_gun`.
:param n: some text.
:param i: some number.
:param args: anything else.
:param kwargs: anything else.
:return: calls `group2_fun` with the same parameters and returns whatever it returns.
"""
return mediator.group2_fun(n, i, *args, **kwargs)
from mediator import mediator
@mediator
def len_numbers(*args):
return len(args)
@mediator
def analyse_numbers(*args):
_min = mediator.min_numbers(*args)
_max = mediator.max_numbers(*args)
_sum = mediator.sum_numbers(*args)
_len = mediator.len_numbers(*args)
_avg = mediator.avg_numbers(*args)
return _min, _max, _sum, _len, _avg
@mediator
def group2_fun(n: str, i: (int, float), *args, **kwargs) -> tuple:
"""Docstring for :func:`group2_fun`.
:param n: some text.
:param i: some number.
:param args: anything else.
:param kwargs: anything else.
:return: a tuple (:param:`n`, :param:`i`, :param:`args`, :param:`kwargs`).
"""
return n, i, args, kwargs
@mediator
def group2_gun(n: str, i: (int, float),
*args, **kwargs) -> tuple:
"""Docstring for :func:`group2_gun`.
:param n: some text.
:param i: some number.
:param args: anything else.
:param kwargs: anything else.
:return: calls `group1_fun` with the same parameters and returns whatever it returns.
"""
return mediator.group1_fun(n, i, *args, **kwargs)
import logging
import sys
import time
import timeit
from mediator import mediator
from stubber import Stubber
import group1, group2 # this import is needed to register all functions
def setup_logging():
formatter = logging.Formatter(
fmt='[ %(asctime)s.%(msecs)d %(levelname)8s ] %(message)s',
datefmt='%H:%M:%S',
)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
handler.setLevel(logging.NOTSET)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.NOTSET)
def fib(n: int) -> tuple:
if n == 1:
return (1,)
elif n == 2:
return (1, 1)
elif n > 2:
seq = fib(n - 1)
return seq + (seq[-2] + seq[-1],)
else:
return tuple()
def _benchmark():
c = 1000000
mediator.log_calls(False)
w = timeit.timeit(
"mediator.sum_numbers%s" % repr(fib(15)),
"from mediator import mediator\nimport group1, group2\nmediator.log_calls(False)\n",
number=c,
)
wo = timeit.timeit(
"group1.sum_numbers%s" % repr(fib(15)),
"from mediator import mediator\nimport group1, group2\nmediator.log_calls(False)\n",
number=c
)
mediator.log_calls(True)
return c, w, wo
def benchmark():
c, w, wo = _benchmark()
return 'runs: %s\nw dec: %s\nw/o dec: %s\n' % (repr(c), repr(w), repr(wo))
def _analyse():
args = fib(15)
return mediator.analyse_numbers(*args)
def analyse():
analysis = _analyse()
return 'analyse result: %s\n' % repr(analysis)
def group_calls():
g1fr = mediator.group1_fun("str", 1, 2, 3, a="foo")
g2fr = mediator.group2_fun("str", 1, 2, 3, a="foo")
g1gr = mediator.group1_gun("str", 1, 2, 3, a="foo")
g2gr = mediator.group2_gun("str", 1, 2, 3, a="foo")
expected = ("str", 1, (2, 3), {'a': 'foo'})
g1f = g1fr == expected
g2f = g2fr == expected
g1g = g1gr == expected
g2g = g2gr == expected
return 'group calls: %s' % repr(all([g1f, g2f, g1g, g2g]))
def mediator_repr():
stub = Stubber(mediator)
text = stub.stub
return '%s\n' % text
def main():
setup_logging()
repr_filename = 'mediator.pyi'
to_call = (
(benchmark, None),
(analyse, None),
(group_calls, None),
(mediator_repr, repr_filename),
)
for tc, fn in to_call:
output = tc()
if fn is None:
print('%s\n' % output)
else:
with open(fn, "w") as f:
f.write(output)
time.sleep(0.1)
return 0
if __name__ == '__main__':
sys.exit(main())
import collections
import functools
import inspect
import itertools
import logging
log = logging.getLogger(__name__)
class Mediator:
__log_calls = True
def __init__(self):
self.__log_calls = True
def log_calls(self, val: bool) -> bool:
old, self.__log_calls = self.__log_calls, val
return old
def __str__(self):
return "Mediator <%i functions registered>" % len(self.__functions)
def __call__(self, function):
def log_decorator(f):
@functools.wraps(f)
def inner(*args, **kwargs):
sa = ''
if self.__log_calls:
gargs = ("%s" % repr(a) for a in args)
gkwargs = ("%s=%s" % (k, repr(v)) for k, v in kwargs.items())
sa = ", ".join(itertools.chain(gargs, gkwargs))
log.info("Calling `%s(%s)`", f.__name__, sa)
result = f(*args, **kwargs)
if self.__log_calls:
log.info("Returning %s from `%s(%s)`", repr(result), f.__name__, sa)
return result
return inner
name = function.__name__
setattr(self, name, log_decorator(function))
return function
mediator = Mediator()
runs: 1000000
w dec: 1.027644331999909
w/o dec: 0.5370932949999769
[ 10:13:29.350 INFO ] Calling `analyse_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `min_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Returning 1 from `min_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `max_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Returning 610 from `max_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `sum_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Returning 1596 from `sum_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `len_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Returning 15 from `len_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `avg_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `sum_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Returning 1596 from `sum_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.351 INFO ] Calling `len_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.352 INFO ] Returning 15 from `len_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.352 INFO ] Returning 106.4 from `avg_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
[ 10:13:29.352 INFO ] Returning (1, 610, 1596, 15, 106.4) from `analyse_numbers(1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)`
analyse result: (1, 610, 1596, 15, 106.4)
[ 10:13:29.452 INFO ] Calling `group1_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.452 INFO ] Returning ('str', 1, (2, 3), {'a': 'foo'}) from `group1_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.452 INFO ] Calling `group2_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.452 INFO ] Returning ('str', 1, (2, 3), {'a': 'foo'}) from `group2_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.452 INFO ] Calling `group1_gun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Calling `group2_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Returning ('str', 1, (2, 3), {'a': 'foo'}) from `group2_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Returning ('str', 1, (2, 3), {'a': 'foo'}) from `group1_gun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Calling `group2_gun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Calling `group1_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Returning ('str', 1, (2, 3), {'a': 'foo'}) from `group1_fun('str', 1, 2, 3, a='foo')`
[ 10:13:29.453 INFO ] Returning ('str', 1, (2, 3), {'a': 'foo'}) from `group2_gun('str', 1, 2, 3, a='foo')`
group calls: True
#!/bin/sh
. .venv/bin/activate
python main.py > output
cat output
import contextlib
import inspect
import re
class Stubber:
def __init__(self, obj: object):
self.obj = obj
self.result = []
self.indent = 0
self.inspect_fix = re.compile(r"<class '([^']+)'>")
def get_indent(self) -> str:
return ' ' * 4 * self.indent
def add_line(self, line: str = ''):
if line != '':
line = self.get_indent() + line
self.result.append(line)
def type_name(self, typ: type) -> str:
s = typ.__qualname__
if inspect.isbuiltin(typ):
s = '{module_name}.{type_name}'.format(module_name=typ.__module__, type_name=typ.__qualname__)
return s
def signature(self, obj: object, ensure_self=True) -> str:
signature = inspect.signature(obj)
if ensure_self and signature.parameters.get('self') is None:
param_self = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
params = [param_self] + list(signature.parameters.values())
signature = signature.replace(parameters=params)
sig = str(signature)
sig = self.inspect_fix.sub(r"\1", sig)
return sig
@contextlib.contextmanager
def indent_ctx(self):
self.indent += 1
yield
self.indent -= 1
def _docstring(self, obj: object):
doc = obj.__doc__
if doc is not None:
doc = inspect.cleandoc(doc)
doc = doc.rstrip() + '\n'
doc = '"""{doc}"""'.format(doc=doc)
with self.indent_ctx():
for line in doc.splitlines():
self.add_line(line)
def _pass(self):
with self.indent_ctx():
self.add_line("pass")
def _function(self, obj: object):
with self.indent_ctx():
name = obj.__name__
signature = self.signature(obj)
self.add_line()
self.add_line("def {name}{signature}:".format(name=name, signature=signature))
self._docstring(obj)
self._pass()
def _method(self, obj: object):
self._function(obj)
def _members(self):
members = inspect.getmembers(self.obj)
for member_name, member in members:
if inspect.ismethod(member):
self._method(member)
if inspect.isfunction(member):
self._function(member)
def _class(self):
self.add_line('class {class_name}:'.format(class_name=self.type_name(self.obj.__class__)))
self._docstring(self.obj)
self._members()
self.add_line()
@property
def stub(self) -> str:
if len(self.result) == 0:
self._class()
return '\n'.join(self.result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment