Skip to content

Instantly share code, notes, and snippets.

@ma-ric
Last active September 16, 2015 14:57

Revisions

  1. ma-ric revised this gist Sep 16, 2015. 1 changed file with 0 additions and 22 deletions.
    22 changes: 0 additions & 22 deletions reload_util.py
    Original file line number Diff line number Diff line change
    @@ -46,28 +46,6 @@ def reload_module(module):
    importlib.reload(module)


    # def sub_packages(package):
    # for item_name in dir(package):
    # if not item_name.startswith('__'):
    # item = getattr(package, item_name)
    # if issubpackage(package, item):
    # print(item.__name__)
    # yield item
    #
    #
    # def reload_package(package):
    # modul_list = []
    # package_stack = [package]
    # while len(package_stack) > 0:
    # package = package_stack.pop()
    # modul_list.append(package)
    # for sp in sub_packages(package):
    # package_stack.append(sp)
    #
    # for m in reversed(modul_list):
    # reload_module(m)
    #

    def is_sub_module(module0, module1):
    assert is_module(module0)
    assert is_module(module1)
  2. ma-ric created this gist Sep 16, 2015.
    224 changes: 224 additions & 0 deletions reload_util.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,224 @@
    # The MIT License (MIT)
    #
    # Copyright (c) 2015 ma-ric
    #
    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the "Software"), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:
    #
    # The above copyright notice and this permission notice shall be included in
    # all copies or substantial portions of the Software.
    #
    # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    # THE SOFTWARE.


    """Reload package and contained subpackages and modules"""

    __author__ = 'ma-ric'

    import types
    import importlib
    import sys
    from collections import deque


    def is_module(item):
    return isinstance(item, types.ModuleType)


    def is_package(item):
    return (is_module(item) and
    (item.__package__ == item.__name__))


    def reload_module(module):
    assert is_module(module)
    print("reload module:", module.__name__)
    importlib.reload(module)


    # def sub_packages(package):
    # for item_name in dir(package):
    # if not item_name.startswith('__'):
    # item = getattr(package, item_name)
    # if issubpackage(package, item):
    # print(item.__name__)
    # yield item
    #
    #
    # def reload_package(package):
    # modul_list = []
    # package_stack = [package]
    # while len(package_stack) > 0:
    # package = package_stack.pop()
    # modul_list.append(package)
    # for sp in sub_packages(package):
    # package_stack.append(sp)
    #
    # for m in reversed(modul_list):
    # reload_module(m)
    #

    def is_sub_module(module0, module1):
    assert is_module(module0)
    assert is_module(module1)
    p0 = module0.__package__
    p1 = module1.__package__
    if p1:
    return p1.startswith(p0)
    return False


    def get_module(item):
    if is_module(item):
    return item
    else:
    module_name = getattr(item, '__module__', None)
    return sys.modules[module_name] if module_name else None


    def print_modules(modules, *title):
    args = list(title)
    args.append([m.__name__ for m in modules])
    print(*args)


    def module_key(module):
    return module.__name__


    class PackageReload:
    def __init__(self, package):
    assert is_package(package)
    self.package = package
    self.used_modules = dict()
    self.module_usage = dict()

    def used_package_modules(self, module):
    modules = set()
    for item_name in dir(module):
    if not item_name.startswith('__'):
    item = getattr(module, item_name)
    m = get_module(item)
    # if not m:
    # print("NOT in a module", item)
    if m and m is not module and is_sub_module(self.package, m):
    modules.add(m)
    return modules

    def register_used_package_modules(self, parent_module):
    key = module_key(parent_module)
    um = self.used_package_modules(parent_module)
    # print_modules(um, "register_used_package_module", key)
    self.used_modules[key] = um
    return um

    def is_registered(self, module):
    assert is_module(module)
    um = self.used_modules.get(module_key(module))
    return um is not None

    def collect_dependence_data(self):
    # print("collect_dependence_data")
    um = self.register_used_package_modules(self.package)
    unvisited_sub_modules = deque(um)
    # print_modules(unvisited_sub_modules, "unvisited_sub_modules")

    while unvisited_sub_modules:
    parent_module = unvisited_sub_modules.pop()
    # print("parent_module", parent_module)
    if not self.is_registered(parent_module):
    um = self.register_used_package_modules(parent_module)
    new_sub_modules = [m for m in um if is_sub_module(parent_module, m)]
    # print_modules(new_sub_modules, "new_sub_modules")
    unvisited_sub_modules.extend(new_sub_modules)
    # else:
    # print("skip, parent_module already registered")
    # print_modules(unvisited_sub_modules, "unvisited_sub_modules")

    # print("collect_dependence_data finished")
    # print_modules(unvisited_sub_modules, "unvisited_sub_modules")
    self.print_used_modules()

    def print_used_modules(self):
    print("used_modules")
    for m, used_modules in self.used_modules.items():
    print("\t", m)
    for um in used_modules:
    print("\t\t", um.__name__)
    print("-"*50)

    def index_module_usage(self):
    for module_user, used_modules in self.used_modules.items():
    # to make sure that non-used modules has an empty entry in module_usage
    self.module_usage.setdefault(module_user, set())

    # fill module_uage
    for um in used_modules:
    key = module_key(um)
    module_users = self.module_usage.setdefault(key, set())
    module_users.add(module_user)
    self.print_module_usage()

    def print_module_usage(self):
    print("module_usage")
    for m, module_users in self.module_usage.items():
    print("\t", m)
    for mu in module_users:
    print("\t\t", mu)
    print("-"*50)

    def get_reload_order(self):
    self.collect_dependence_data()
    self.index_module_usage()

    reload_order = deque()
    independent = deque()
    not_reloaded_dep = dict()

    for m, dep in self.used_modules.items():
    dep_set = set((module_key(d) for d in dep))
    # print_modules(nrd, "nrd", m)
    not_reloaded_dep[m] = dep_set
    if not dep_set:
    independent.append(m)

    # print("independent", independent)
    while independent:
    reload_order.extend(independent)
    new_independent = deque()
    for m in independent:
    module_users = self.module_usage[m]
    for u in module_users:
    dep_set = not_reloaded_dep[u]
    dep_set.remove(m)
    if not dep_set:
    new_independent.append(u)
    independent = new_independent
    # print("independent", independent)

    # print("reload_order", reload_order)
    return reload_order

    def reload(self):
    reload_order = self.get_reload_order()
    for module_name in reload_order:
    m = sys.modules[module_name]
    reload_module(m)


    def reload_package(package):
    pr = PackageReload(package)
    pr.reload()


    __all__ = [reload_module, reload_package]