Created
August 26, 2021 18:06
-
-
Save williballenthin/8566a1043532479c51ebb22a5b590db1 to your computer and use it in GitHub Desktop.
compare vivisect analysis comparison across versions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
compare vivisect analysis comparison across versions. | |
pip install devtools[pygments] pydantic viv-utils termcolor | |
''' | |
import sys | |
import time | |
import os.path | |
import logging | |
import argparse | |
from typing import List, Literal, Optional, Union | |
import viv_utils | |
import termcolor | |
from devtools import debug | |
from pydantic import BaseModel, Field | |
logger = logging.getLogger(__name__) | |
class MemoryMapEntry(BaseModel): | |
address: int | |
size: int | |
def __str__(self): | |
return "0x%08X 0x%06X" % (self.address, self.size) | |
MemoryMap = List[MemoryMapEntry] | |
class CoverageResult(BaseModel): | |
version: str | |
status: Union[Literal["ok"], str] | |
duration: float | |
# all following fields are present only if status == "ok" | |
memory_map: Optional[MemoryMap] | |
discovered_count: Optional[int] | |
undiscovered_count: Optional[int] | |
xref_count: Optional[int] | |
location_count: Optional[int] | |
function_count: Optional[int] | |
block_count: Optional[int] | |
instruction_count: Optional[int] | |
unicode_count: Optional[int] | |
ascii_count: Optional[int] | |
number_count: Optional[int] | |
pointer_count: Optional[int] | |
vtable_count: Optional[int] | |
import_count: Optional[int] | |
export_count: Optional[int] | |
class Spec(BaseModel): | |
sample: str | |
results: List[CoverageResult] = Field(default_factory=list) | |
def resolve_sample_path(self, spec_path): | |
return os.path.normpath(os.path.join(os.path.dirname(spec_path), self.sample)) | |
def tuple_get(t, index, default=None): | |
if len(t) > index: | |
return t[index] | |
else: | |
return default | |
def compute_coverage_result(version, vw, duration, status): | |
info = vw.getDiscoveredInfo() | |
return CoverageResult( | |
version=version, | |
status=status, | |
duration=duration, | |
memory_map=[MemoryMapEntry(address=va, size=size) for va, size, _, _ in vw.getMemoryMaps()], | |
discovered_count=tuple_get(info, 0, 0), | |
undiscovered_count=tuple_get(info, 1, 0), | |
xref_count=tuple_get(info, 2, 0), | |
location_count=tuple_get(info, 3, 0), | |
function_count=tuple_get(info, 4, 0), | |
block_count=tuple_get(info, 5, 0), | |
instruction_count=tuple_get(info, 6, 0), | |
unicode_count=tuple_get(info, 7, 0), | |
ascii_count=tuple_get(info, 8, 0), | |
number_count=tuple_get(info, 9, 0), | |
pointer_count=tuple_get(info, 10, 0), | |
vtable_count=tuple_get(info, 11, 0), | |
import_count=len(vw.getImports()), | |
export_count=len(vw.getExports()), | |
) | |
def blue(s: str) -> str: | |
return termcolor.colored(s, "blue") | |
def red(s: str) -> str: | |
return termcolor.colored(s, "red") | |
def green(s: str) -> str: | |
return termcolor.colored(s, "green") | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv[1:] | |
parser = argparse.ArgumentParser(description="A program.") | |
parser.add_argument("-v", "--verbose", action="store_true", | |
help="Enable debug logging") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Disable all output but errors") | |
action = parser.add_subparsers(dest="action") | |
action_create = action.add_parser("create", help="create a new spec") | |
action_create.add_argument("spec", type=str, help="Path to spec file") | |
action_create.add_argument("sample", type=str, help="Path to sample") | |
action_record = action.add_parser("record", help="record results for a version") | |
action_record.add_argument("spec", type=str, help="Path to spec file") | |
action_record.add_argument("version", type=str, help="Name of version to record") | |
action_display = action.add_parser("display", help="display results across versions") | |
action_display.add_argument("spec", type=str, help="Path to spec file") | |
action_clear= action.add_parser("clear", help="clear all results from spec") | |
action_clear.add_argument("spec", type=str, help="Path to spec file") | |
args = parser.parse_args(args=argv) | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger().setLevel(logging.DEBUG) | |
elif args.quiet: | |
logging.basicConfig(level=logging.ERROR) | |
logging.getLogger().setLevel(logging.ERROR) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger().setLevel(logging.INFO) | |
logging.getLogger("vivisect").setLevel(logging.ERROR) | |
logging.getLogger("vivisect.base").setLevel(logging.ERROR) | |
logging.getLogger("vivisect.impemu").setLevel(logging.ERROR) | |
logging.getLogger("vtrace").setLevel(logging.ERROR) | |
logging.getLogger("envi").setLevel(logging.ERROR) | |
logging.getLogger("envi.codeflow").setLevel(logging.ERROR) | |
if args.action is None: | |
parser.print_help() | |
return -1 | |
elif args.action == "create": | |
logger.info("action: create") | |
assert not os.path.exists(args.spec), "spec already exists" | |
spec = Spec(sample=os.path.relpath(args.sample, os.path.dirname(args.spec))) | |
with open(args.spec, "wb") as f: | |
f.write(spec.json().encode("utf-8")) | |
logger.info("wrote spec to: %s", args.spec) | |
elif args.action == "record": | |
logger.info("action: record") | |
spec = Spec.parse_file(args.spec) | |
assert args.version not in map(lambda result: result.version, spec.results), "version already recorded" | |
logger.info("spec: %s", os.path.abspath(args.spec)) | |
logger.info("sample: %s", os.path.abspath(spec.resolve_sample_path(args.spec))) | |
t0 = time.time() | |
try: | |
vw = viv_utils.getWorkspaceFromFile(spec.resolve_sample_path(args.spec)) | |
except Exception as e: | |
t1 = time.time() | |
duration = float(t1 - t0) | |
status = "error: %s" % (str(e)) | |
cov = CoverageResult( | |
version=args.version, | |
status=status, | |
duration=duration, | |
) | |
print(blue(cov.version) + " in %.2fs" % cov.duration) | |
print(red(status)) | |
else: | |
t1 = time.time() | |
duration = float(t1 - t0) | |
status = "ok" | |
cov = compute_coverage_result(args.version, vw, duration, status) | |
print(blue(cov.version) + " in %.2fs" % cov.duration) | |
print(" memory map:") | |
for entry in cov.memory_map: | |
print(" %s" % (str(entry))) | |
for k in cov.__fields__.keys(): | |
if not k.endswith("_count"): | |
continue | |
print(" %s %d" % ((k + ":").ljust(20), getattr(cov, k))) | |
spec.results.append(cov) | |
with open(args.spec, "wb") as f: | |
f.write(spec.json().encode("utf-8")) | |
logger.info("wrote spec to: %s", args.spec) | |
elif args.action == "display": | |
logger.info("action: display") | |
spec = Spec.parse_file(args.spec) | |
cov_by_version = {cov.version: cov for cov in spec.results} | |
versions = sorted(cov_by_version.keys()) | |
for i, version in enumerate(versions): | |
cov = cov_by_version[version] | |
if cov.status != "ok": | |
print(blue(cov.version) + " in %.2fs" % cov.duration) | |
print(" " + red(cov.status)) | |
print() | |
continue | |
prior = None | |
for prior_index in range(i - 1, -1, -1): | |
# find most recent result that was successful | |
prior = cov_by_version[versions[prior_index]] | |
if prior.status == "ok": | |
break | |
prior = None | |
if prior is None: | |
# no prior version to diff against | |
print(blue(cov.version) + " in %.2fs" % cov.duration) | |
print(" memory map:") | |
for entry in cov.memory_map: | |
print(" %s" % (str(entry))) | |
for k in cov.__fields__.keys(): | |
if not k.endswith("_count"): | |
continue | |
print(" %s %d" % ((k + ":").ljust(20), getattr(cov, k))) | |
else: | |
# diff against prior | |
found_change = False | |
duration_delta = cov.duration - prior.duration | |
if -0.1 < duration_delta < 0.1: | |
# no change | |
print(blue(cov.version) + " in %.2fs" % cov.duration) | |
elif duration_delta < -0.1: | |
# got faster | |
print(blue(cov.version) + " in %.2fs (%s)" % (cov.duration, green("-%.2fs" % abs(duration_delta)))) | |
elif duration_delta > +0.1: | |
# got slower | |
print(blue(cov.version) + " in %.2fs (%s)" % (cov.duration, red("+%.2fs" % abs(duration_delta)))) | |
else: | |
raise RuntimeError("impossible") | |
if cov.memory_map != prior.memory_map: | |
found_change = True | |
print(" memory map:") | |
for entry, prior_entry in zip(cov.memory_map, prior.memory_map): | |
if entry != prior_entry: | |
print((" %s" % (str(entry))).ljust(28) + "\t(" + red("changed") + ")") | |
else: | |
print(" %s" % (str(entry))) | |
for k in cov.__fields__.keys(): | |
if not k.endswith("_count"): | |
continue | |
val = getattr(cov, k) | |
prior_val = getattr(prior, k) | |
delta = val - prior_val | |
if delta == 0: | |
# same results | |
pass | |
elif delta > 0: | |
# more results | |
found_change = True | |
print(" %s %d\t(%s)" % ((k + ":").ljust(20), val, green("+" + str(delta)))) | |
elif delta < 0: | |
# fewer results | |
found_change = True | |
print(" %s %d\t(%s)" % ((k + ":").ljust(20), val, red("-" + str(abs(delta))))) | |
else: | |
raise RuntimeError("impossible") | |
if not found_change: | |
print(" no change.") | |
print() | |
elif args.action == "clear": | |
logger.info("action: clear") | |
assert os.path.exists(args.spec), "spec does not exist" | |
spec = Spec.parse_file(args.spec) | |
with open(args.spec, "wb") as f: | |
f.write(Spec(sample=spec.sample).json().encode("utf-8")) | |
logger.info("wrote spec to: %s", args.spec) | |
else: | |
raise NotImplementedError(args.action) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
record results from each commit since v1.0.0
git checkout master && git log --decorate=full v1.0.0..HEAD | grep "^commit" | choose 1 | tac | /bin/cat --number | sed -e 's/^ //g' -e 's/^ /00/g' -e 's/^ /0/g' | while read -r LINE; do I=$(echo "$LINE" | choose 0); COMMIT=$(echo "$LINE" | choose 1); echo "$COMMIT $I"; git checkout "$COMMIT"; for SPEC in regression/*; do python ~/code/capa-pub/scripts/compare-viv-analysis.py record "$SPEC" "$I $COMMIT"; done; done;
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
for reference, commands for bulk recording results: