Created
December 17, 2021 23:49
-
-
Save jlinoff/d8bc23bea99f3660a4abb48d625a5b05 to your computer and use it in GitHub Desktop.
genric tools for local github repository analysis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
list all repos. | |
''' | |
import datetime | |
import argparse | |
import getpass | |
import inspect | |
import os | |
import re | |
import sys | |
import time | |
from typing import TextIO, Tuple, Optional | |
import github # pylint: disable=unused-import) | |
from github import Github | |
import dateutil | |
import dateutil.parser | |
NOWARN = bool(int(os.getenv('NOWARN', '0'))) | |
__version__ = '0.1.0' | |
def info(msg: str, ofp: TextIO = sys.stderr, end='\n', level=1): | |
'''Print a verbose info message. | |
Args: | |
msg: The message | |
ofp: The output file. | |
end: The message terminator. | |
level: The level of the caller in the stack. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print(f'INFO:{lnum}: {msg}', file=ofp, end=end) | |
def warn(msg: str, ofp: TextIO = sys.stderr, end='\n', level=1): | |
'''Print a verbose warning message. | |
Args: | |
msg: The message | |
ofp: The output file. | |
end: The message terminator. | |
level: The level of the caller in the stack. | |
''' | |
if not NOWARN: | |
print('\x1b[35m', end='', file=ofp) | |
lnum = inspect.stack()[level].lineno | |
print(f'WARNING:{lnum}: {msg}', file=ofp, end=end) | |
print('\x1b[0m', end='', file=ofp) | |
def err(msg: str, ofp: TextIO = sys.stderr, end='\n', level=1): | |
'''Error message. | |
Args: | |
msg: The message | |
ofp: The output file. | |
end: The message terminator. | |
level: The level of the caller in the stack. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print('\x1b[31m', end='', file=ofp) | |
print(f'ERROR:{lnum}: {msg}', file=ofp, end=end) | |
print('\x1b[0m', end='', file=ofp) | |
def getopts() -> argparse.ArgumentParser: | |
''' | |
Command line options. | |
''' | |
def gettext(string): | |
''' | |
Convert to upper case to make things consistent. | |
''' | |
lookup = { | |
'usage: ': 'USAGE:', | |
'positional arguments': 'POSITIONAL ARGUMENTS', | |
'optional arguments': 'OPTIONAL ARGUMENTS', | |
'show this help message and exit': 'Show this help message and exit.\n ', | |
} | |
return lookup.get(string, string) | |
argparse._ = gettext # to capitalize help headers | |
base = os.path.basename(sys.argv[0]) | |
#name = os.path.splitext(base)[0] | |
usage = f'\n {base} [OPTIONS]' | |
desc = 'DESCRIPTION:{0}'.format('\n '.join(__doc__.split('\n'))) | |
epilog = f''' | |
EXAMPLES: | |
# Example 1: help | |
$ {base} -h | |
# Example 2: list all eng repos - it will prompt for the token | |
$ {base} -U jlinoff https://git.eng.esentire.com/api/v3 '^eng/' | |
# Example 3: long list of all eng repos - it will prompt for the token | |
$ {base} -l -U jlinoff https://git.eng.esentire.com/api/v3 '^eng/' | |
# Example 4: verbose long list of all eng repos - it will prompt for the token | |
$ {base} -vl -U jlinoff https://git.eng.esentire.com/api/v3 '^eng/' | |
# Example 5: list all repos created in October 2021 | |
$ {base} -vl -q 'created:2021-10-01..2021-10-31' -U jlinoff https://git.eng.esentire.com/api/v3 '^eng/' | |
# Example 6: list all repos created in October 2021 in the "eng" org | |
$ {base} -vl -q 'org:eng created:2021-10-01..2021-10-31' -U jlinoff https://git.eng.esentire.com/api/v3 | |
# Example 7. include pull requests | |
$ {base} -v -lll -q 'org:eng' -U jlinoff https://git.eng.esentire.com/api/v3 | |
# Example 8. include pull requests and branches | |
$ {base} -v -llll -q 'org:eng' -U jlinoff https://git.eng.esentire.com/api/v3 | |
# Example 9. include pull requests, even the closed ones and use color | |
$ {base} -v --all-prs --colorize -lll -q 'org:eng' -U jlinoff https://git.eng.esentire.com/api/v3 | |
VERSION | |
{base} {1} | |
LICENSE | |
Copyright (c) 2018 by Joe Linoff | |
MIT Open Source | |
'''.format(base, __version__) | |
afc = argparse.RawTextHelpFormatter | |
parser = argparse.ArgumentParser(formatter_class=afc, | |
description=desc[:-2], | |
usage=usage, | |
epilog=epilog.rstrip() + '\n ') | |
parser.add_argument('--all-prs', | |
action='store_true', | |
help='''\ | |
Report all of the PRs not just the ones that are open. | |
This could produce a lot of output. | |
''') | |
parser.add_argument('-c', '--colorize', | |
action='store_true', | |
help='''\ | |
Colorize the output to make it a bit more readable. | |
''') | |
parser.add_argument('-l', '--long', | |
action='count', | |
help='''\ | |
list more details. Can be specified multiple times for more details. | |
Each additional level of detail slows things down. | |
1. date created (created_at) | |
2. date updated (updated_at) | |
3. date pushed (pushed_at) | |
4. size in 1KB units (1MB = 1000) | |
5. active state: ('archived', 'active') | |
6. visibility: ('public', 'private') | |
7. number of open issues | |
8. number of branches (-ll) | |
9. number of pull requests (-ll) | |
10. repo_id | |
11. full_name | |
12. description (-ll) | |
13. URL (-ll) | |
if you specify -lll, detailed information | |
about pull requests will be included. | |
1. type identifier: "P:" | |
2. days open | |
3. pull request id | |
4. number of commits | |
5. state | |
6. draft state | |
7. created date | |
8. closed date | |
9. merged date | |
10. number of individual reviewers | |
11. number of team reviwers | |
12. user | |
13. title | |
14. URL | |
if you specify -llll, detailed information | |
about branches will be included. | |
1. type identifier: "B:" | |
2. branch name | |
3. protected or unprotected | |
4. commit SHA | |
''') | |
parser.add_argument('-q', '--query', | |
action='store', | |
default='size:>=0', | |
help='''\ | |
gituhb api listing query string sent as | |
and argument to the search_repositories | |
interface. | |
For mode details see: | |
https://docs.github.com/en/search-github/searching-on-github/searching-for-repositories | |
Default: "%(default)s" | |
If you want a specific organization try: "org:ORG". | |
''') | |
parser.add_argument('-T', '--token', | |
action='store', | |
help='''\ | |
The access token. | |
It will prompt if not specified. | |
''') | |
parser.add_argument('-U', '--username', | |
action='store', | |
help='''\ | |
The username. | |
''') | |
parser.add_argument('-v', '--verbose', | |
action='count', | |
default=0, | |
help='''\ | |
Increase the level of verbosity. | |
''') | |
parser.add_argument('-V', '--version', | |
action='version', | |
version=f'%(prog)s version {__version__}', | |
help='''\ | |
Show program's version number and exit. | |
''') | |
parser.add_argument('URL', | |
nargs=1, | |
help='''\ | |
The repository. | |
''') | |
parser.add_argument('MATCH', | |
nargs='?', | |
default='.', | |
help='''\ | |
Only list repos that match. | |
''') | |
opts = parser.parse_args() | |
return opts | |
def check_username(username: str, gob: github.Github): | |
'''check the username''' | |
try: | |
_user = gob.get_user(username) | |
except Exception as exc: # pylint: disable=broad-except | |
err(f'connection failed: {exc}') | |
sys.exit(1) # pylint does not know that err() exits | |
def get_username_and_token(opts: argparse.ArgumentParser) -> Tuple[str, str]: | |
''' get the username and token''' | |
if opts.username: | |
username = opts.username | |
else: | |
username = input('username?') | |
if opts.token: | |
token = opts.token | |
else: | |
token = getpass.getpass('token? ') | |
return username, token | |
def collect(opts: argparse.ArgumentParser) -> Tuple[int, str, re.Pattern, list]: | |
'''collect the listings''' | |
url = opts.URL[0] | |
recs = [] | |
username, token = get_username_and_token(opts) | |
mex = re.compile(opts.MATCH) | |
if opts.verbose: | |
info(f'url: {url}') | |
info(f'username: {username}') | |
if opts.verbose > 1: | |
info(f'token: {token}') | |
info(f'all-prs: {opts.all_prs}') | |
info(f'colorize: {opts.colorize}') | |
info(f'query: "{opts.query}"') | |
info(f'match: "{opts.MATCH}"') | |
gob = Github(base_url=url, login_or_token=token) | |
# check the connection | |
check_username(username, gob) | |
start = time.time() | |
if opts.verbose: | |
info(f'loading repos from {url}') | |
repos = gob.search_repositories(opts.query) | |
##repos = gob.get_repos(since=0, visibility='all') # does NOT work!! | |
repolist = list(repos) | |
if opts.verbose: | |
sec = time.time() - start | |
##info(f'{repos.totalCount} repositories found in {sec:.2f} seconds') | |
info(f'{len(repolist)} repositories found in {sec:.2f} seconds') | |
maxn = 0 | |
for repo in sorted(repos, key=lambda x: x.full_name.lower()): | |
if not mex or mex.search(repo.full_name): | |
recs.append(repo) | |
maxn = max(maxn, len(repo.full_name)) | |
# do sorting and ordering here. | |
return maxn, url, mex, list(repos) | |
def get_name(rec: github.NamedUser.NamedUser, default: str = '???') -> str: | |
'''Get the user name. | |
''' | |
if rec.name: | |
return rec.name | |
if rec.email: | |
return rec.email | |
if rec.login: | |
return rec.login | |
return default | |
def get_date(dts: Optional[datetime.datetime], default: str) -> str: | |
'''Get formatted date string. | |
Args: | |
dts: The datetime object. | |
default: The default to use if dts is null. | |
Returns: | |
The formatted string. | |
''' | |
if not dts: | |
return default | |
return dts.isoformat(timespec='seconds') | |
def print_pulls(opts: argparse.ArgumentParser, # pylint: disable=too-many-locals | |
pulls: github.PaginatedList.PaginatedList): | |
'''print pull requests | |
Args: | |
pulls: The paginated list of branches. | |
''' | |
pulls = sorted(pulls, key=lambda x: x.number) | |
num = len(pulls) | |
for j, pull in enumerate(pulls, start=1): | |
rtuple = pull.get_review_requests() | |
assert len(rtuple) == 2 | |
rusers = list(rtuple[0]) | |
rteams = list(rtuple[1]) | |
draft = 'Draft' if pull.draft else 'NotDraft' | |
user = get_name(pull.user, '???') | |
created = get_date(pull.created_at,'NotCreated') | |
closed = get_date(pull.closed_at,'NotClosed') | |
merged = get_date(pull.merged_at,'NotMerged') | |
days = 0 # number of days open | |
if created != 'NotCreated' and closed == 'NotClosed': | |
age = datetime.datetime.now() - dateutil.parser.parse(created) | |
if age.total_seconds() > 0: | |
days = age.total_seconds() / 3600. / 24. | |
else: # handle same day stuff | |
days = 0 | |
if opts.colorize: | |
if closed == 'NotClosed': | |
print('\x1b[31;1m', end='') | |
else: | |
print('\x1b[32;1m', end='') | |
print(f'{"P:":>6} {j:>6} of {num} {days:>5.1f} {pull.number:>4} {pull.commits:>3}' | |
f' {pull.state:<8}' | |
f' {draft:<8}' | |
f' CR:{created:<19}' | |
f' CL:{closed:<19}' | |
f' M:{merged:<19}' | |
f' {len(rusers):>2}' | |
f' {len(rteams):>2}' | |
f' "{user}"' | |
f' "{pull.title}"' | |
f' {pull.html_url}' | |
) | |
for k, ruser in enumerate(rusers, start=1): | |
name = get_name(ruser) | |
print(f'{k:>15} reviewer "{name}"') | |
for k, rteam in enumerate(rteams, start=1): | |
print(f'{k:>15} team "{rteam.name}" "{rteam.description}"') | |
if opts.colorize: | |
print('\x1b[0m', end='') | |
sys.stdout.flush()s | |
def print_branches(branches: github.PaginatedList.PaginatedList): | |
'''print branches | |
Args: | |
branches: The paginated list of branches. | |
''' | |
branches = sorted(branches, key=lambda x: x.name.lower()) | |
max1 = max([len(x.name) for x in branches]) | |
num = len(branches) | |
for j, branch in enumerate(branches, start=1): | |
pstat = 'protected ' if branch.protected else 'unprotected' | |
print(f'{"B:":>6} {j:>6} of {num}' | |
f' {branch.name:<{max1}}' | |
f' {pstat}' | |
f' {branch.commit.sha}') | |
def print_entry(opts: argparse.ArgumentParser, repo: github.Repository.Repository, maxn: int): | |
'''print the entry | |
Args: | |
opts: The command line options | |
repo: The repository to print | |
maxn: The maximum full_name size | |
''' | |
if opts.long > 0: | |
owner = get_name(repo.owner, '???') | |
archived = 'archived' if repo.archived else 'active' | |
vis = 'private' if repo.private else 'public' | |
if opts.colorize: | |
print('\x1b[34;1m', end='') | |
print(f'C:{repo.created_at}' | |
f' U:{repo.updated_at}' | |
f' P:{repo.pushed_at}' | |
f' {repo.size:>8}' | |
f' {archived:<8}' | |
f' {vis:<7}' | |
## f'{repo.visibility} ' # not defined | |
f' {repo.open_issues_count:>3}', | |
end='') | |
if opts.long > 1: | |
branches = repo.get_branches() | |
if opts.all_prs: | |
pulls = repo.get_pulls(state='all') | |
else: | |
pulls = repo.get_pulls() | |
print(f' {branches.totalCount:>3} ' | |
f' {pulls.totalCount:>3} ' | |
f' {repo.id:>5}' | |
f' "{owner}"' | |
f' {repo.full_name:<{maxn}}' | |
f' "{repo.description}"' | |
f' {repo.html_url}' | |
) | |
if opts.colorize: | |
print('\x1b[0m', end='') | |
sys.stdout.flush() | |
if opts.long > 4: # -llll | |
print_branches(branches) | |
if opts.long > 2: # -lll | |
print_pulls(opts, pulls) | |
else: | |
print(f' {repo.id:>5}' | |
f' "{owner}"' | |
f'{repo.full_name}') | |
if opts.colorize: | |
print('\x1b[0m', end='') | |
sys.stdout.flush() | |
else: | |
print(repo.full_name) | |
def main(): #pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks | |
'main' | |
opts = getopts() | |
start = time.time() | |
maxn, url, mex, repos = collect(opts) | |
num = 0 | |
dups = {} | |
for repo in sorted(repos, key=lambda x: x.full_name.lower()): | |
if not mex or mex.search(repo.full_name): | |
num += 1 ## only count matches | |
if repo.full_name in dups: | |
# need this logic because the REST API returns duplicates sometimes! | |
# first noticed it with platform-rule-utils showing up twice. | |
continue | |
dups[repo.full_name] = True | |
print_entry(opts, repo, maxn) | |
etime = time.time() - start | |
print(f'\nfound {num} repositories in {url}', end='') | |
if mex: | |
print(f' that match "{opts.MATCH}"', end='') | |
print(f' in {etime:.2f} seconds') | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
git clone all repos. | |
If a repo exists then simply update it using "git pull". | |
''' | |
import argparse | |
import getpass | |
import inspect | |
import os | |
import re | |
import subprocess | |
import sys | |
import time | |
from typing import TextIO, Tuple | |
import github # pylint: disable=unused-import) | |
from github import Github | |
NOWARN = bool(int(os.getenv('NOWARN', '0'))) | |
__version__ = '0.1.0' | |
def info(msg: str, ofp: TextIO = sys.stderr, end='\n', level=1): | |
'''Print a verbose info message. | |
Args: | |
msg: The message | |
ofp: The output file. | |
end: The message terminator. | |
level: The level of the caller in the stack. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print(f'INFO:{lnum}: {msg}', file=ofp, end=end) | |
def warn(msg: str, ofp: TextIO = sys.stderr, end='\n', level=1): | |
'''Print a verbose warning message. | |
Args: | |
msg: The message | |
ofp: The output file. | |
end: The message terminator. | |
level: The level of the caller in the stack. | |
''' | |
if not NOWARN: | |
print('\x1b[35m', end='', file=ofp) | |
lnum = inspect.stack()[level].lineno | |
print(f'WARNING:{lnum}: {msg}', file=ofp, end=end) | |
print('\x1b[0m', end='', file=ofp) | |
def err(msg: str, ofp: TextIO = sys.stderr, end='\n', level=1): | |
'''Error message. | |
Args: | |
msg: The message | |
ofp: The output file. | |
end: The message terminator. | |
level: The level of the caller in the stack. | |
''' | |
lnum = inspect.stack()[level].lineno | |
print('\x1b[31m', end='', file=ofp) | |
print(f'ERROR:{lnum}: {msg}', file=ofp, end=end) | |
print('\x1b[0m', end='', file=ofp) | |
sys.exit(1) | |
def getopts() -> argparse.ArgumentParser: | |
''' | |
Command line options. | |
''' | |
def gettext(string): | |
''' | |
Convert to upper case to make things consistent. | |
''' | |
lookup = { | |
'usage: ': 'USAGE:', | |
'positional arguments': 'POSITIONAL ARGUMENTS', | |
'optional arguments': 'OPTIONAL ARGUMENTS', | |
'show this help message and exit': 'Show this help message and exit.\n ', | |
} | |
return lookup.get(string, string) | |
argparse._ = gettext # to capitalize help headers | |
base = os.path.basename(sys.argv[0]) | |
#name = os.path.splitext(base)[0] | |
usage = f'\n {base} [OPTIONS]' | |
desc = 'DESCRIPTION:{0}'.format('\n '.join(__doc__.split('\n'))) | |
epilog = f''' | |
EXAMPLES: | |
# Example 1: help | |
$ {base} -h | |
# Example 2: mirror the eng components | |
$ {base} -q 'org:eng' -o /tmp/mirror-eng -U jlinoff https://git.eng.esentire.com/api/v3 | |
# Example 3: | |
$ {base} -cv -q 'org:eng' -o /tmp/mirror-eng -U jlinoff -T 2068b670fa401d7c5bd7718e393473e020418721 https://git.eng.esentire.com/api/v3 | |
VERSION | |
{base} {1} | |
LICENSE | |
Copyright (c) 2018 by Joe Linoff | |
MIT Open Source | |
'''.format(base, __version__) | |
afc = argparse.RawTextHelpFormatter | |
parser = argparse.ArgumentParser(formatter_class=afc, | |
description=desc[:-2], | |
usage=usage, | |
epilog=epilog.rstrip() + '\n ') | |
parser.add_argument('-c', '--colorize', | |
action='store_true', | |
help='''\ | |
Colorize the output to make it a bit more readable. | |
''') | |
parser.add_argument('-C', '--git-clone', | |
action='store', | |
default='git clone {repo.clone_url}', | |
help='''\ | |
The git clone command to use to create | |
a repository. | |
Default: %(default)s | |
''') | |
parser.add_argument('-P', '--git-pull', | |
action='store', | |
default='cd {repo.name} && git pull --stat && cd -', | |
help='''\ | |
The git pull command to use to update | |
a repository. | |
Default: %(default)s | |
''') | |
parser.add_argument('-q', '--query', | |
action='store', | |
default='size:>=0', | |
help='''\ | |
THe gituhb api listing query string sent | |
as the argument to the search_repositories | |
interface. | |
For mode details see: | |
https://docs.github.com/en/search-github/searching-on-github/searching-for-repositories | |
Default: "%(default)s" | |
If you want a specific organization try: "org:ORG". | |
''') | |
parser.add_argument('-o', '--out', | |
action='store', | |
default='out', | |
help='''\ | |
The ouput directory, relative to the directory where this tool is run. | |
It will be created if it does not exist. | |
Default: %(default)s | |
''') | |
parser.add_argument('-t', '--timeout', | |
action='store', | |
type=int, | |
default=600, | |
help='''\ | |
Subprocess timeout in seconds. | |
Default: %(default)s. | |
''') | |
parser.add_argument('-T', '--token', | |
action='store', | |
help='''\ | |
The access token. | |
It will prompt if not specified. | |
''') | |
parser.add_argument('-U', '--username', | |
action='store', | |
help='''\ | |
The username. | |
''') | |
parser.add_argument('-v', '--verbose', | |
action='count', | |
default=0, | |
help='''\ | |
Increase the level of verbosity. | |
''') | |
parser.add_argument('-V', '--version', | |
action='version', | |
version=f'%(prog)s version {__version__}', | |
help='''\ | |
Show program's version number and exit. | |
''') | |
parser.add_argument('URL', | |
nargs=1, | |
help='''\ | |
The repository. | |
''') | |
parser.add_argument('MATCH', | |
nargs='?', | |
default='.', | |
help='''\ | |
Only list repos that match. | |
''') | |
opts = parser.parse_args() | |
return opts | |
def runcmd(opts: argparse.ArgumentParser, cmd: str): | |
'''run a command | |
''' | |
if opts.verbose: | |
info(f'command: {cmd}', level=2) | |
try: | |
# pylint: disable=line-too-long | |
## https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running | |
subprocess.check_output(cmd, | |
shell=True, | |
#stdout=sys.stderr, # flush | |
#stderr=subprocess.STDOUT, | |
universal_newlines=True, | |
timeout=opts.timeout) | |
# pylint: enable=line-too-long | |
status = 0 | |
except subprocess.CalledProcessError as exc: | |
status = exc.returncode | |
err(f'command failed with status {status}: {cmd}\n{exc}') | |
except Exception as exc: # pylint: disable=broad-except | |
status = -1 | |
warn(f'command failed with status {status}: {cmd}\n{exc}') | |
if opts.verbose: | |
info(f'command completed with status {status}: {cmd}', level=2) | |
def check_username(username: str, gob: github.Github): | |
'''check the username''' | |
try: | |
_user = gob.get_user(username) | |
except Exception as exc: # pylint: disable=broad-except | |
err(f'connection failed: {exc}') | |
sys.exit(1) # pylint does not know that err() exits | |
def get_username_and_token(opts: argparse.ArgumentParser) -> Tuple[str, str]: | |
''' get the username and token''' | |
if opts.username: | |
username = opts.username | |
else: | |
username = input('username?') | |
if opts.token: | |
token = opts.token | |
else: | |
token = getpass.getpass('token? ') | |
return username, token | |
def collect(opts: argparse.ArgumentParser) -> Tuple[int, str, re.Pattern, list]: | |
'''collect the listings''' | |
url = opts.URL[0] | |
recs = [] | |
username, token = get_username_and_token(opts) | |
mex = re.compile(opts.MATCH) | |
if opts.verbose: | |
info(f'url: {url}') | |
info(f'username: {username}') | |
if opts.verbose > 1: | |
info(f'token: {token}') | |
info(f'colorize: {opts.colorize}') | |
info(f'out: "{opts.out}"') | |
info(f'query: "{opts.query}"') | |
info(f'match: "{opts.MATCH}"') | |
runcmd(opts, f'mkdir -p "{opts.out}"') | |
os.chdir(opts.out) | |
gob = Github(base_url=url, login_or_token=token) | |
# check the connection | |
check_username(username, gob) | |
start = time.time() | |
if opts.verbose: | |
info(f'loading repos from {url}') | |
repos = gob.search_repositories(opts.query) | |
##repos = gob.get_repos(since=0, visibility='all') # does NOT work!! | |
repolist = list(repos) | |
if opts.verbose: | |
sec = time.time() - start | |
##info(f'{repos.totalCount} repositories found in {sec:.2f} seconds') | |
info(f'{len(repolist)} repositories found in {sec:.2f} seconds') | |
maxn = 0 | |
for repo in sorted(repos, key=lambda x: x.full_name.lower()): | |
if not mex or mex.search(repo.full_name): | |
recs.append(repo) | |
maxn = max(maxn, len(repo.full_name)) | |
# do sorting and ordering here. | |
return maxn, url, mex, list(repos) | |
def get_name(rec: github.NamedUser.NamedUser, default: str = '???') -> str: | |
'''Get the user name. | |
''' | |
if rec.name: | |
return rec.name | |
if rec.email: | |
return rec.email | |
if rec.login: | |
return rec.login | |
return default | |
def checkout_entry(opts: argparse.ArgumentParser, repo: github.Repository.Repository, maxn: int): | |
'''checkout the entry | |
Args: | |
opts: The command line options | |
repo: The repository to print | |
maxn: The maximum full_name size | |
''' | |
exists = os.path.exists(repo.name) | |
owner = get_name(repo.owner, '???') | |
archived = 'archived' if repo.archived else 'active' | |
vis = 'private' if repo.private else 'public' | |
update = 'update' if exists else 'create' | |
if opts.colorize: | |
print('\x1b[34;1m', end='') | |
print(f'C:{repo.created_at}' | |
f' U:{repo.updated_at}' | |
f' P:{repo.pushed_at}' | |
f' {repo.size:>8}' | |
f' {archived:<8}' | |
f' {vis:<7}' | |
## f'{repo.visibility} ' # not defined | |
f' {repo.open_issues_count:>3}', | |
end='') | |
print(f' {repo.id:>5}' | |
f' {update}' | |
f' "{owner}"' | |
f' {repo.full_name:<{maxn}}' | |
f' {repo.clone_url}') | |
if opts.colorize: | |
print('\x1b[0m', end='') | |
sys.stdout.flush() | |
if not exists: | |
cmd = eval("f'" + opts.git_clone + "'") # pylint: disable=eval-used | |
runcmd(opts, cmd) | |
else: | |
# the repo exists update it. | |
cmd = eval("f'" + opts.git_pull + "'") # pylint: disable=eval-used | |
runcmd(opts, cmd) | |
def main(): #pylint: disable=too-many-branches,too-many-locals,too-many-nested-blocks | |
'main' | |
opts = getopts() | |
start = time.time() | |
maxn, url, mex, repos = collect(opts) | |
num = 0 | |
dups = {} | |
for repo in sorted(repos, key=lambda x: x.full_name.lower()): | |
if not mex or mex.search(repo.full_name): | |
num += 1 ## only count matches | |
if repo.full_name in dups: | |
# need this logic because the REST API returns duplicates sometimes! | |
# first noticed it with platform-rule-utils showing up twice. | |
continue | |
dups[repo.full_name] = True | |
try: | |
checkout_entry(opts, repo, maxn) | |
except Exception as exc: # pylint: disable=broad-except | |
warn(f'checkout failed for {repo.full_name}: {exc}') | |
etime = time.time() - start | |
print(f'\nfound {num} repositories in {url}', end='') | |
if mex: | |
print(f' that match "{opts.MATCH}"', end='') | |
print(f' in {etime:.2f} seconds') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment