Last active
April 19, 2023 16:09
-
-
Save simonwagner/f9b6f66d06eb8a355e2d to your computer and use it in GitHub Desktop.
Git graph renderer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import fnmatch | |
from itertools import chain | |
from inspect import getframeinfo, stack | |
import math | |
from collections import OrderedDict | |
import pygit2 as git | |
import cairocffi as cairo | |
def hex2rgb(hexvalue): | |
hexparts = (hexvalue[i:i+2] for i in xrange(0, len(hexvalue), 2)) | |
hexintvalues = (int(value, 16) for value in hexparts) | |
hexfloatvalues = (value/255.0 for value in hexintvalues) | |
return tuple(hexfloatvalues) | |
COLORS = [hex2rgb(s) for s in | |
[ | |
"1f77b4", | |
"aec7e8", | |
"ff7f0e", | |
"ffbb78", | |
"2ca02c", | |
"98df8a", | |
"d62728", | |
"ff9896", | |
"9467bd", | |
"c5b0d5", | |
"8c564b", | |
"c49c94", | |
"e377c2", | |
"f7b6d2", | |
"7f7f7f", | |
"c7c7c7", | |
"bcbd22", | |
"dbdb8d", | |
"17becf", | |
"9edae5", | |
] | |
] | |
def linecolor(line): | |
idvalue = int(line.start_commit.id.hex, 16) | |
color = COLORS[idvalue % len(COLORS)] | |
return color | |
def main(): | |
if len(sys.argv) >= 2: | |
cur_path = sys.argv[1] | |
else: | |
cur_path = os.getcwd() | |
repo_path = git.discover_repository(cur_path) | |
repo = git.Repository(repo_path) | |
graph = generate_graph(repo) | |
rendering = render_graph(graph) | |
write_rendering(rendering, "graph.pdf") | |
def write_rendering(surface, fname): | |
#render into real surface with appropriate size | |
extents = surface.ink_extents() | |
out_surface = cairo.PDFSurface(fname, extents[2], extents[3]) | |
out_ctx = cairo.Context(out_surface) | |
out_ctx.set_source_surface(surface, -extents[0], -extents[1]) | |
out_ctx.paint() | |
def render_graph(graph): | |
CELL_SIZE = 12 | |
COMMIT_CIRCLE_RADIUS = (CELL_SIZE - 6)/2 | |
lines, order, commits = graph | |
#calculate the size | |
max_order = max(order.values()) | |
width = len(lines)*CELL_SIZE | |
height = (max_order + 1)*CELL_SIZE | |
#surface = cairo.SVGSurface("graph.svg", width, height) | |
surface = cairo.RecordingSurface(cairo.CONTENT_COLOR_ALPHA, None) | |
ctx = cairo.Context(surface) | |
previous_commit = None | |
previous_commit_col = None | |
for i, commit in enumerate(commits): | |
commit_order = order[commit.id] | |
lines_around = lines_at_commit(commit, lines, order) | |
for idx, line in enumerate(lines_around): | |
if commit.id in set(commit.id for commit in line.commits): | |
line_with_commit_index = idx | |
break | |
commit_line = lines_around[line_with_commit_index] | |
commit_x = line_with_commit_index * CELL_SIZE | |
commit_y = i * CELL_SIZE | |
commit_cx = commit_x + CELL_SIZE/2 | |
commit_cy = commit_y + CELL_SIZE/2 | |
commit_lc = linecolor(commit_line) | |
#draw all other lines | |
splits = 0 | |
skipped_lines = 0 | |
for j, line in enumerate(lines_around): | |
offset = splits * -CELL_SIZE | |
if j == line_with_commit_index: | |
splits_before_commit = splits | |
continue | |
x = (j - skipped_lines) * CELL_SIZE | |
y = i * CELL_SIZE | |
cx = x + CELL_SIZE/2 | |
cy = y + CELL_SIZE/2 | |
simple_line = line.topo_length(order) == 1 and len(line) == 0 | |
lc = linecolor(line) | |
#draw line | |
if line.merge is not None and line.merge.id == commit.id and not simple_line: | |
#draw merge line | |
ctx.new_path() | |
ctx.set_source_rgb(*lc) | |
ctx.move_to(cx + offset, y + CELL_SIZE) | |
ctx.line_to(commit_cx, commit_cy) | |
ctx.stroke() | |
if line.split is not None and line.split.id == commit.id and not simple_line: | |
#draw split line | |
ctx.new_path() | |
ctx.set_source_rgb(*lc) | |
ctx.move_to(cx, y) | |
ctx.line_to(commit_cx, commit_cy) | |
ctx.stroke() | |
splits += 1 | |
if (line.merge is None or line.merge.id != commit.id) and \ | |
(line.split is None or line.split.id != commit.id): | |
ctx.new_path() | |
ctx.set_source_rgb(*lc) | |
ctx.move_to(cx, y) | |
ctx.line_to(cx + offset, y + CELL_SIZE) | |
ctx.stroke() | |
if simple_line and line.split.id == commit.id: | |
#for all lines that have no commits and a | |
#topological distance of 1 (which | |
# means the previous commit is our child) | |
#simply draw a line connecting both commits | |
#and not a special line | |
#assert j == len(lines_around) - 1 #we should always be the last line | |
previous_commit_cx = previous_commit_col * CELL_SIZE + CELL_SIZE/2 | |
previous_commit_bottom = y | |
ctx.new_path() | |
ctx.set_source_rgb(*linecolor(previous_commit_line)) | |
ctx.move_to(previous_commit_cx, previous_commit_bottom) | |
ctx.line_to(commit_cx, commit_cy) | |
ctx.stroke() | |
skipped_lines += 1 | |
commit_offset = splits_before_commit * -CELL_SIZE | |
if commit_line.end.id == commit.id: | |
if len(commit_line) > 1: | |
ctx.new_path() | |
ctx.set_source_rgb(*commit_lc) | |
ctx.move_to(commit_cx, commit_cy) | |
ctx.line_to(commit_cx, commit_cy - CELL_SIZE/2) | |
ctx.stroke() | |
if commit_line.split is not None: | |
ctx.new_path() | |
ctx.set_source_rgb(*commit_lc) | |
ctx.move_to(commit_cx, commit_cy) | |
ctx.line_to(commit_cx, commit_cy + CELL_SIZE/2) | |
ctx.stroke() | |
if commit_line.start_commit.id == commit.id: | |
if len(commit_line) > 1: | |
ctx.new_path() | |
ctx.set_source_rgb(*commit_lc) | |
ctx.move_to(commit_cx, commit_cy) | |
ctx.line_to(commit_cx, commit_cy + CELL_SIZE/2) | |
ctx.stroke() | |
if commit_line.merge is not None: | |
ctx.new_path() | |
ctx.set_source_rgb(*commit_lc) | |
ctx.move_to(commit_cx, commit_cy) | |
ctx.line_to(commit_cx, commit_cy - CELL_SIZE/2) | |
ctx.stroke() | |
if not (commit_line.start_commit.id == commit.id or commit_line.end.id == commit.id): | |
ctx.set_source_rgb(*commit_lc) | |
ctx.move_to(commit_cx, commit_cy - CELL_SIZE/2) | |
ctx.line_to(commit_cx, commit_cy) | |
ctx.line_to(commit_cx + commit_offset, commit_cy + CELL_SIZE/2) | |
ctx.stroke() | |
#draw circle | |
ctx.new_path() | |
ctx.arc(commit_cx, commit_cy, COMMIT_CIRCLE_RADIUS, 0, 2*math.pi) | |
ctx.set_source_rgb(0.0, 0.0, 0.0) | |
ctx.stroke_preserve() | |
ctx.set_source_rgb(1.0, 1.0, 1.0) | |
ctx.fill() | |
#continue with next step | |
previous_commit = commit | |
previous_commit_col = line_with_commit_index | |
previous_commit_line = commit_line | |
return surface | |
def line_for_commit(commit, lines): | |
for line in lines: | |
if commit.id in [c.id for c in line.commits]: | |
return line | |
assert False | |
def lines_at_commit(commit, lines, order): | |
commit_order = order[commit.id] | |
result = [] | |
for line in lines: | |
begin = order[first_not_none(line.merge, line.start_commit).id] | |
stop = order[first_not_none(line.split, line.end).id] | |
if begin <= commit_order <= stop: | |
result.append(line) | |
return result | |
class GraphLine(object): | |
def __init__(self, commits, start, start_commit, end, merge, split): | |
self.start = start | |
self.start_commit = start_commit | |
self.end = end | |
self.merge = merge | |
self.split = split | |
self.commits = commits | |
def __len__(self): | |
return len(self.commits) | |
def topo_length(self, order): | |
begin = first_not_none(self.merge, self.start_commit) | |
end = first_not_none(self.split, self.end) | |
return order[end.id] - order[begin.id] | |
def __eq__(self, other): | |
return self._start == other._start and \ | |
self._end == other._end and \ | |
self._merge == other._merge and \ | |
self._split == other._split | |
def __hash__(self): | |
return (hash(self._start) ^ hash(self._end) ^ hash(self._merge) ^ hash(self._split)) | |
class GraphLineBuilder(object): | |
def __init__(self): | |
self._start = None | |
self._end = None | |
self._start_commit = None | |
self._commits = [] | |
self._next = None | |
self._merge = None | |
self._split = None | |
def start(self, start, commit): | |
self._start = start | |
self._start_commit = commit | |
def get_start_order(self): | |
return self._start | |
def get_start(self): | |
return self._start_commit | |
def end(self, end): | |
self._end = end | |
self._next = None | |
def next(self, next): | |
self._next = next | |
def merge(self, merge): | |
self._merge = merge | |
def split(self, split): | |
self._split = split | |
def is_next(self, possible_next): | |
return self._next.id == possible_next.id | |
def get_next(self): | |
return self._next | |
def current(self): | |
if len(self._commits) > 0: | |
return self._commits[-1] | |
else: | |
return None | |
def is_last_commit(self, commit): | |
if len(self._commits) == 0: | |
return False | |
else: | |
return self._commits[-1] == commit | |
def add_commit(self, order, commit): | |
self._commits.append(commit) | |
def is_finished(self): | |
return self._end is not None | |
def build(self): | |
requirements = ("_start", "_end", "_start_commit") | |
missing_requirements = [requirement for requirement in requirements | |
if getattr(self, requirement) is None] | |
if len(missing_requirements) > 0: | |
raise ValueError("missing values for: %s in %r" % (str.join(",", missing_requirements), self)) | |
return GraphLine( | |
commits=self._commits, | |
start=self._start, | |
start_commit=self._start_commit, | |
end=self._end, | |
merge=self._merge, | |
split=self._split | |
) | |
def __repr__(self): | |
if self.is_finished(): | |
return "<GLB %d: [%r - %r]>" % (id(self), hex_or_none(self._start_commit), hex_or_none(self._end)) | |
else: | |
return "<GLB %d: [%r - %r >" % (id(self), hex_or_none(self._start_commit), hex_or_none(self.current())) | |
class GraphBuilder(object): | |
def __init__(self): | |
self._lines = [] | |
self._commit_lookup = {} | |
self._commit_order = {} | |
def add_line(self, line): | |
self._lines.append(line) | |
def add_commit_to_line(self, commit, line): | |
self._commit_lookup[commit] = line | |
def add_commit(commit, order): | |
self._commit_order[commit] = order | |
def log(message): | |
#print message | |
pass | |
class logged_list(list): | |
def __init__(self, name, *args, **kwargs): | |
list.__init__(self, *args, **kwargs) | |
self._name = name | |
def append(self, *args): | |
caller = getframeinfo(stack()[1][0]) | |
log("At l. %d: appending to %s: %r" % (caller.lineno, self._name, args[0])) | |
list.append(self, *args) | |
def generate_graph(repo, ref_globs=("refs/heads/*","refs/remotes/*")): | |
all_refs = repo.listall_references() | |
ref_names = list(chain.from_iterable( | |
fnmatch.filter(all_refs, glob) for glob in ref_globs | |
)) | |
#ref_names = ['refs/heads/master', 'refs/heads/c', 'refs/heads/a', 'refs/heads/b'] | |
print ref_names | |
refs = (repo.lookup_reference(ref_name) for ref_name in ref_names) | |
resolved_refs = set(ref.peel(git.GIT_OBJ_COMMIT) for ref in refs) | |
walker = repo.walk(repo.head.target, git.GIT_SORT_TOPOLOGICAL | git.GIT_SORT_TIME) | |
for ref in resolved_refs: | |
walker.push(ref.id) | |
graph_builder = GraphBuilder() | |
commits = tuple(walker) | |
topo_order = tuple(commit.id for commit in commits) | |
topo_order_lookup = OrderedDict((id, i) for i, id in enumerate(topo_order)) | |
next = repo[topo_order[0]] | |
line = GraphLineBuilder() | |
line.next(next) | |
line.start(start=0, commit=next) | |
current_lines = [line] | |
finished_lines = [] | |
for order, oid in enumerate(topo_order): | |
commit = repo[oid] | |
log("At %d, %s - %s" % (order, oid.hex, summary(commit))) | |
log("\tCurrent line is %d" % id(line)) | |
log("\tQueue: %r" % current_lines) | |
log("") | |
possible_lines = [line for line in current_lines if line.is_next(commit)] | |
if len(possible_lines) == 0: | |
line = GraphLineBuilder() | |
line.start(commit=commit, start=order) | |
line.next(first_or_none(commit.parents)) | |
current_lines.append(line) | |
log("\tNo line found which expects this commit, created new line %r" % line) | |
elif len(possible_lines) == 1: | |
line = possible_lines[0] | |
log("\tCommit is on line: %r" % line) | |
else: | |
#use the line with the lowest topological order | |
log("\tCommit is possibly on lines %r" % possible_lines) | |
line = min(possible_lines, key=lambda line: line.get_start_order()) | |
log("\tSelected line: %r" % line) | |
log("\tAdding commit %s to line %r" % (commit.id.hex, line)) | |
line.add_commit(order, commit) | |
assert line is not None | |
#now, what should be the next commit under the assumption that the line | |
#continues? | |
parents = commit.parents | |
log("\tParents: %r" % [parent.id.hex for parent in parents]) | |
if len(parents) > 0: | |
#the first parent should be our next commit on the line | |
line.next(parents[0]) | |
log("\tNext expected commit on line %d: %s" % (id(line), line.get_next().id)) | |
#if we have more than one parent, we need to prepare the | |
#new lines for them | |
for parent_on_other_line in parents[1:]: | |
new_line = GraphLineBuilder() | |
new_line.start( | |
commit=parent_on_other_line, | |
start=topo_order_lookup[parent_on_other_line.id] | |
) | |
#we are the point where the new line was merged with the current line | |
new_line.merge(commit) | |
new_line.next(parent_on_other_line) | |
#queue up the new line | |
current_lines.append(new_line) | |
log("\tNew line %d added: starting at %s" % (id(new_line), parent_on_other_line.id.hex)) | |
assert new_line.get_next() is not None | |
assert line.get_next() is not None | |
else: | |
#we have reached the end of this path in the tree | |
#we won't merge with anyone, so finish up this line | |
#and go to the next | |
line.end(commit) | |
line.next(None) | |
#we are a leaf, so we didn't split from any commit | |
#e.g. we are the initial commit | |
line.split(None) | |
#finish this line | |
finished_lines.append(line) | |
current_lines.remove(line) | |
log("\tEnd of line %r (no parents anymore)" % line) | |
assert line.get_next() is not None or len(commit.parents) == 0, "Line has not next commit, but this commit has parents" | |
#now check if this line joins with any other line | |
#this is the case if the current commit on the line is | |
#already the next commit of another line | |
assert line.current() is not None | |
joined_lines = [] | |
for other_line in current_lines: | |
if other_line is line: | |
continue #skip current line | |
#All lines that have no commits anymore should already have been removed | |
assert other_line.get_next() is not None | |
if other_line.get_next().id == line.current().id: | |
#so this line joins with other_line | |
#therefore we end this line and set the | |
#next commit as the split point where we join | |
#the lines | |
#who joins whom? | |
joining_line = other_line | |
continuing_line = line | |
log("\tEnd of line %r (joining %d)" % (joining_line, id(continuing_line))) | |
#assert joining_line.current() is not None | |
#assert joining_line.get_next() is not None | |
if joining_line.current() is not None: | |
#this is a line with at least 1 commit | |
joining_line.split(joining_line.get_next()) | |
joining_line.end(joining_line.current()) | |
else: | |
#this line does not have any commits | |
joining_line.split(joining_line.get_start()) | |
joining_line.end(joining_line.get_next()) | |
joined_lines.append(joining_line) | |
for joined_line in joined_lines: | |
#remove lines here, otherwise the iteration above breaks :( | |
current_lines.remove(joined_line) | |
finished_lines.append(joined_line) | |
if line.is_finished(): | |
#if we are finished, create a new line in the next round | |
line = None | |
log("-----------------------------------------------------------------------------") | |
assert len(current_lines) == 0 | |
finished_lines = [line.build() for line in finished_lines] | |
finished_lines = sorted(finished_lines, cmp=line_order_cmp(topo_order_lookup)) | |
for i, line in enumerate(finished_lines): | |
print "%d:" % (i) | |
print "\tmerge: ", summary_with_id_and_order(line.merge, topo_order_lookup) | |
print "\tstart: ", summary_with_id_and_order(line.start_commit, topo_order_lookup) | |
print "\tend: ", summary_with_id_and_order(line.end, topo_order_lookup) | |
print "\tsplit: ", summary_with_id_and_order(line.split, topo_order_lookup) | |
print "\tcommits:" | |
for commit in line.commits: | |
print "\t\t%d: %s" % (topo_order_lookup[commit.id], summary_with_id(commit)) | |
return (finished_lines, topo_order_lookup, commits) | |
def summary(commit): | |
if commit is None: | |
return None | |
try: | |
message = commit.message | |
except LookupError: | |
return "<unknown encoding>" | |
lines = commit.message.splitlines() | |
return lines[0] if len(lines) > 0 else "" | |
def summary_with_id(commit): | |
if commit is None: | |
return None | |
return "%s - %s" % (commit.id.hex, summary(commit)) | |
def summary_with_id_and_order(commit, order_lookup): | |
if commit is None: | |
return None | |
order = order_lookup[commit.id] | |
return "%03d: %s" % (order, summary_with_id(commit)) | |
def first_or_none(l): | |
if len(l) > 0: | |
return l[0] | |
else: | |
return None | |
def hex_or_none(commit): | |
if commit is None: | |
return None | |
else: | |
return commit.id.hex | |
def first_not_none(*args): | |
for arg in args: | |
if arg is not None: | |
return arg | |
return None | |
def line_order_cmp(order_lookup): | |
def _line_order_cmp(a, b): | |
a_order = order_lookup[a.merge.id] if a.merge is not None else order_lookup[a.start_commit.id] - 1 | |
b_order = order_lookup[b.merge.id] if b.merge is not None else order_lookup[b.start_commit.id] - 1 | |
return cmp(a_order, b_order) | |
return _line_order_cmp | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment