Last active
May 30, 2023 19:46
-
-
Save luskan/7d41fa983d6d2c631b14017fcafee17f to your computer and use it in GitHub Desktop.
grabs log times from easyreadmine and generates reports.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
er_pass_key.py :: various tools to analyze er with its rest api | |
- grabs log times from easyreadmine and generates reports. | |
- lists issues in which user had added any changes in last week | |
api documentation: | |
https://easyredmine.docs.apiary.io/ | |
https://www.redmine.org/projects/redmine/wiki/Rest_api | |
info: requires python 3.7, you also must create er_pass_key_settings.py file with correct | |
S_AUTH_USER, S_AUTH_PASS, S_API_KEY, S_SERVER_HOST, S_TIMEOUT_SECONDS variables (example below) | |
''' | |
import concurrent | |
import datetime | |
import functools | |
import http.client | |
import os | |
import sys | |
import getopt | |
import time | |
import xml.etree.ElementTree as ElementTree | |
from base64 import b64encode | |
from concurrent.futures import ThreadPoolExecutor | |
from contextlib import closing | |
from time import strftime, gmtime | |
import sqlite3 | |
import numpy as np | |
usage_text = ''' | |
usage, to print last 7 days history for Tom Kelly: er_pass_key.py -h -u Tom Kell -d 7 | |
''' | |
# Used by log_time_report | |
holidays = ['2018-11-01', '2018-11-02', '2018-11-12', '2018-12-31', '2019-01-01'] | |
''' | |
S_API_KEY = "0123456789001234567890012345678900123456" | |
S_AUTH_USER = "" # optional | |
S_AUTH_PASS = "" # optional | |
S_SERVER_HOST = "er.someserver.com" | |
S_TIMEOUT_SECONDS = 10 | |
''' | |
from er_pass_key_settings import S_AUTH_USER, S_AUTH_PASS, S_API_KEY, S_SERVER_HOST, S_TIMEOUT_SECONDS | |
headers = { | |
'cache-control': 'no-cache', | |
} | |
if S_AUTH_USER != "": | |
userAndPass = b64encode(bytes(S_AUTH_USER + ":" + S_AUTH_PASS, "utf-8")).decode("ascii") | |
headers['Authorization'] = 'Basic ' + userAndPass | |
request_response_cache = {} | |
issueNameDict = {} | |
# key-value db based on sqlite3 | |
class KeyValueDB(object): | |
def __init__(self, name="er_cache.db"): | |
self.db_conn = sqlite3.connect(name, check_same_thread=False) | |
def put(self, table, main_id, key, value): | |
""" id is unique id associated with this key-value pair """ | |
self.__verify(table, key) | |
self.db_conn.execute('''INSERT INTO {0} (main_id, {1}) | |
VALUES (?, ?) | |
ON CONFLICT (main_id) | |
DO UPDATE SET {1} = excluded.{1} | |
''' | |
.format(table, key), | |
(main_id, value)) | |
self.db_conn.commit() | |
def get(self, table, main_id, key, def_value=""): | |
self.__verify(table, key) | |
result = def_value | |
with closing(self.db_conn.execute( | |
'''SELECT {0} from {1} where main_id = "{2}"'''.format(key, table, main_id))) as cursor: | |
if cursor.rowcount != 0: | |
row_one = cursor.fetchone() | |
if row_one: | |
result = row_one[0] | |
cursor.close() | |
return result | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.db_conn.close() | |
def __verify(self, table, column): | |
self.__add_table_if_not_exists(table) | |
self.__add_column_if_not_exists(table, column) | |
def __add_table_if_not_exists(self, table): | |
self.db_conn.execute('''CREATE TABLE IF NOT EXISTS {0} ( | |
main_id TEXT PRIMARY KEY | |
)'''.format(table)) | |
def __add_column_if_not_exists(self, table, column): | |
if self.__column_exists(table, column): | |
return | |
self.db_conn.execute('''ALTER TABLE {0} ADD COLUMN {1} TEXT''' | |
.format(table, column)) | |
def __table_exists(self, table): | |
with closing(self.db_conn.cursor()) as c: | |
c.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{0}' '''.format(table)) | |
return c.fetchone()[0] == 1 | |
def __column_exists(self, table, column_name): | |
with closing(self.db_conn.cursor()) as c: | |
data = c.execute('''select * from {0}'''.format(table)) | |
return len(list(filter(lambda x: x[0] == column_name, data.description))) > 0 | |
def test_data_cache(): | |
test_db = "test_cache.db" | |
if os.path.exists(test_db): | |
os.remove(test_db) | |
try: | |
with KeyValueDB(test_db) as dc: | |
dc.put("students", "1000", "name", "tim") | |
dc.put("students", "1000", "surname", "cook") | |
dc.put("grades", "1000", "math", "4") | |
assert dc.get("students", "1000", "name") == "tim" | |
assert dc.get("students", "1000", "surname") == "cook" | |
assert dc.get("grades", "1000", "math") == "4" | |
dc.put("students", "1000", "age", "18") | |
assert dc.get("students", "1000", "age") == "18" | |
assert dc.get("students", "1001", "age", "?") == "?" | |
with KeyValueDB(test_db) as dc: | |
assert dc.get("students", "1000", "name") == "tim" | |
assert dc.get("students", "1000", "surname") == "cook" | |
assert dc.get("grades", "1000", "math") == "4" | |
assert dc.get("students", "1000", "age") == "18" | |
assert dc.get("students", "1001", "age", "?") == "?" | |
dc.put("students", "1000", "name", "timmi") | |
assert dc.get("students", "1000", "name") == "timmi" | |
finally: | |
os.remove(test_db) | |
def get_issue_name(id): | |
''' | |
Returns name of issue given ID | |
Args: | |
id: | |
ID of issue | |
Returns: | |
Name of issue | |
''' | |
if id in issueNameDict: | |
return issueNameDict[id] | |
if id == -1: | |
return "out of office/no log times" | |
if id == -2: | |
return "(out of case entry)" | |
conn = http.client.HTTPSConnection(S_SERVER_HOST) | |
conn.request("GET", "/issues/" + id + ".xml?key=" + S_API_KEY, headers=headers) | |
res = conn.getresponse() | |
data = res.read() | |
root = ElementTree.fromstring(data) | |
issueNameDict[id] = root.find("subject").text | |
return issueNameDict[id] | |
# This will not work as API does not allow to retrieve journals for many issues in one request, you | |
# must get each issue by separate request and in which you will have journals for it. | |
''' | |
def get_issues_journals(issues_ids, for_user_name, create_after_date_time): | |
print("issue:{0}, {1}, {2}".format(str(",".join(issues_ids)), for_user_name, create_after_date_time)) | |
found_journals = [] | |
while True: | |
conn = http.client.HTTPSConnection(S_SERVER_HOST) | |
# request = "/issues.xml?key={0}&issue_id={1}&include=journals".format(S_API_KEY, ','.join(ids)) | |
url = "/issues.xml?key={0}&issue_id={1}&include=journals".format(S_API_KEY, ",".join(issues_ids)) | |
conn.request("GET", url, headers=headers) | |
res = conn.getresponse() | |
data = res.read() | |
try: | |
root = ElementTree.fromstring(data) | |
except ElementTree.ParseError as e: | |
# print(e) | |
print(str(data) + " .. waiting 5s") | |
# sys.exit(1) | |
time.sleep(5) | |
continue | |
# TODO: journal iteration is not yet finished. API does not support it and probably never will | |
issue_id = root.find("id").text | |
subject = root.find("subject").text | |
journalsElement = root.find("journals") | |
for journal in journalsElement.iter("journal"): | |
user = journal.find("user") | |
user_name = user.attrib['name'] | |
user_id = user.attrib['id'] | |
if user_name != for_user_name: | |
continue | |
created_on = journal.find('created_on').text | |
created_on_date_time = datetime.datetime.strptime(created_on, "%Y-%m-%dT%H:%M:%SZ") | |
if created_on_date_time.date() >= create_after_date_time.date(): | |
found_journals.append( | |
{"issue_id": issue_id, "subject": subject, "journal": journal, "created_on": created_on}) | |
break | |
return found_journals | |
''' | |
def get_issue_journals(id, updated_on, for_user_name, create_after_date_time, db): | |
#print("issue:{0}, {1}, {2}".format(id, for_user_name, create_after_date_time)) | |
found_journals = [] | |
while True: | |
data = "" | |
db_updated_on = db.get("issues", id, "updated_on", "") | |
if db_updated_on and db_updated_on == updated_on: | |
data = db.get("issues", id, "server_data", "") | |
if not data: | |
conn = http.client.HTTPSConnection(S_SERVER_HOST) | |
conn.request("GET", "/issues/{0}.xml?key={1}&include=journals".format(id, S_API_KEY), headers=headers) | |
res = conn.getresponse() | |
data = res.read().decode('utf-8') | |
try: | |
root = ElementTree.fromstring(data) | |
except ElementTree.ParseError as e: | |
print(str(data) + " .. waiting 10s") | |
time.sleep(S_TIMEOUT_SECONDS) | |
continue | |
issue_id = root.find("id").text | |
subject = root.find("subject").text | |
journalsElement = root.find("journals") | |
for journal in journalsElement.iter("journal"): | |
user = journal.find("user") | |
user_name = user.attrib['name'] | |
user_id = user.attrib['id'] | |
if user_name != for_user_name: | |
continue | |
created_on = journal.find('created_on').text | |
created_on_date_time = datetime.datetime.strptime(created_on, "%Y-%m-%dT%H:%M:%SZ") | |
if created_on_date_time.date() >= create_after_date_time.date(): | |
found_journals.append( | |
{"issue_id": issue_id, "subject": subject, "journal": journal, "created_on": created_on}) | |
db.put("issues", id, "updated_on", updated_on) | |
db.put("issues", id, "server_data", data) | |
break | |
return found_journals | |
def init_log_time_reports(from_date, to_date, for_user_name=""): | |
''' | |
Retrievies time entries array in xml form from the server. | |
Args: | |
from_date: start date | |
to_date: end date | |
for_user_name: optional, filter by user name | |
Returns: | |
array of matching xml.etree.ElementTree.Element types | |
''' | |
time_entries = [] | |
current_offset = 0 | |
while True: | |
path = "/time_entries.xml?key={0}&limit={1}&period_type={2}&from={3}&to={4}&offset={5}".format( | |
S_API_KEY, | |
400, # limit | |
2, | |
from_date, | |
to_date, | |
str(current_offset) | |
) | |
if path in request_response_cache: | |
data = request_response_cache[path] | |
else: | |
conn = http.client.HTTPSConnection(S_SERVER_HOST) | |
conn.request("GET", path, headers=headers) | |
res = conn.getresponse() | |
data = res.read() | |
try: | |
root = ElementTree.fromstring(data) | |
except ElementTree.ParseError as e: | |
print(str(data) + " .. waiting 2s") | |
time.sleep(2) | |
continue | |
request_response_cache[path] = data | |
total_count = root.attrib['total_count'] | |
limit = root.attrib['limit'] | |
offset = root.attrib['offset'] | |
for child in root: | |
current_offset = current_offset + 1 | |
percent = (float(current_offset) / float(total_count)) * 100.0 | |
sys.stdout.write('\r') | |
sys.stdout.write( | |
"Getting time entries: [%-20s] %d%%" % ('=' * int(int(percent + 0.5) / 5.0), int(percent + 0.5))) | |
sys.stdout.flush() | |
if for_user_name != "": | |
entry_user_name = child.find('user').attrib['name'] | |
if entry_user_name != for_user_name: | |
continue | |
time_entries.append(child) | |
if current_offset >= int(total_count): | |
break | |
return time_entries | |
def get_user_list_from_time_entries(time_entries): | |
''' | |
List users from the time entries array | |
Args: | |
time_entries: | |
must be generated using init_log_time_reports | |
Returns: | |
array of tuples (username, user_id) | |
''' | |
user_list = [] | |
for entry in time_entries: | |
user_name = entry.find('user').attrib['name'] | |
user_id = entry.find('user').attrib['id'] | |
if [user_name, user_id] not in user_list: | |
user_list.append([user_name, user_id]) | |
return user_list | |
def get_log_time_report_for_user(time_entries, reportUserName, reportUserId, hours_per_day, from_date, to_date): | |
''' | |
Generates report in string format for provided time entries. | |
Args: | |
time_entries: collection of time entries, must be generated using init_log_time_reports | |
reportUserName: optional, you may specify user name for whom to generate this report | |
reportUserId: user id, mandatory - used in outputed report | |
hours_per_day: how many hours a day does this person work (we assume 5 days work week) | |
from_date: start date | |
to_date: end date | |
Returns: | |
dictionary containing three parts of the report: | |
- header | |
- daily work | |
- percentage work on various projects | |
:{"header": header_report_str, "daily": daily_report_str, "by_case_percentage": by_case_report_str} | |
''' | |
daily_report_str = "" | |
index = 1 | |
loop_index = 0 | |
total_day_hours = {} | |
total_by_case_in_week_hours = {} | |
last_date = "" | |
total_week_hours = 0 | |
total_entries_for_user = 0 | |
d_start = datetime.datetime.strptime(from_date, '%Y-%m-%d').date() | |
d_end = datetime.datetime.strptime(to_date, '%Y-%m-%d').date() | |
work_days = np.busday_count(d_start, d_end, holidays=holidays) | |
print("\n") | |
for child in reversed(time_entries): | |
loop_index += 1 | |
percent = (float(loop_index) / float(len(time_entries))) * 100.0 | |
sys.stdout.write('\r') | |
sys.stdout.write("Generating report: [%-20s] %d%% - %s" % ( | |
'=' * int(int(percent + 0.5) / 5.0), int(percent + 0.5), reportUserName)) | |
sys.stdout.flush() | |
user_name = child.find('user').attrib['name'] | |
if user_name != reportUserName: | |
continue | |
total_entries_for_user += 1 | |
spent_on_str = child.find('spent_on').text | |
spent_on = datetime.datetime.strptime(spent_on_str, '%Y-%m-%d') | |
spent_on_date = spent_on.date() | |
spent_on_week_day_name = spent_on_date.strftime("%A") | |
if last_date != spent_on_str: | |
if last_date != "": | |
daily_report_str += "total: {0}h\n".format(total_day_hours[last_date]) | |
daily_report_str += "\n{0} {1}\n".format(spent_on_week_day_name, spent_on_str) | |
hours_float = float(child.find('hours').text) | |
total_week_hours += hours_float | |
if spent_on_str not in total_day_hours: | |
total_day_hours[spent_on_str] = hours_float | |
else: | |
total_day_hours[spent_on_str] += hours_float | |
last_date = spent_on_str | |
if child.find('issue') is not None: | |
case_id = child.find('issue').attrib['id'] | |
else: | |
case_id = -2 | |
if case_id not in total_by_case_in_week_hours: | |
total_by_case_in_week_hours[case_id] = hours_float | |
else: | |
total_by_case_in_week_hours[case_id] += hours_float | |
if case_id == -1 or case_id == -2: | |
case_field = "{0}".format(get_issue_name(case_id)) | |
else: | |
case_field = "case {0}: {1}".format(case_id, get_issue_name(case_id)) | |
daily_report_str += "- {3} [{1} - https://{6}/issues/{2}]\n {4}h - {5}\n".format( | |
index, | |
child.find('project').attrib['name'], | |
case_id, case_field, | |
hours_float, | |
child.find('comments').text, | |
S_SERVER_HOST | |
) | |
index += 1 | |
daily_report_str += "total: {0}h".format(total_day_hours[last_date]) | |
daily_report_str += "\n" | |
if total_week_hours < work_days * hours_per_day: | |
total_by_case_in_week_hours[-1] = (work_days * hours_per_day) - total_week_hours; | |
by_case_report_str = "" | |
percent_by_case_id = {} | |
for case_id, hours in total_by_case_in_week_hours.items(): | |
if case_id == -1: | |
# out of office | |
percent_by_case_id[case_id] = (hours / (hours_per_day * work_days)) * 100.0 | |
else: | |
percent_by_case_id[case_id] = (hours / (hours_per_day * work_days)) * 100.0 | |
for elem in sorted(percent_by_case_id.items(), key=lambda x: x[1], reverse=True): | |
if elem[0] == -1 or elem[0] == -2: | |
case_field = "{0}".format(get_issue_name(elem[0])) | |
else: | |
case_field = "case {0}: {1}".format(elem[0], get_issue_name(elem[0])) | |
by_case_report_str += "{0:4.1f}% {1:4.1f}h - {2} \n".format(elem[1], total_by_case_in_week_hours[elem[0]], | |
case_field) | |
header_report_str = "Report for: {0} (ID={1})\n".format(reportUserName, reportUserId) | |
header_report_str += "Report date: {0}\n".format(strftime("%Y-%m-%d %H:%M:%S", gmtime())) | |
header_report_str += "Report span: od {0} do {1}\n".format(from_date, to_date) | |
header_report_str += "Total work days: {0}\n".format(work_days) | |
header_report_str += "Number of entries: {0}\n".format(total_entries_for_user) | |
header_report_str += "Work week: {0}h/day\n".format(hours_per_day) | |
return {"header": header_report_str, "daily": daily_report_str, "by_case_percentage": by_case_report_str} | |
def generate_report(from_date, to_date, db, user_name=""): | |
''' | |
Args: | |
from_date: start date | |
to_date: end date | |
user_name: name of user, might be empty then all found users will be in report | |
Returns: Report in string form | |
''' | |
report_str = "" | |
time_entries = init_log_time_reports(from_date, to_date, user_name) | |
user_list = get_user_list_from_time_entries(time_entries) | |
for userName in user_list: | |
report = get_log_time_report_for_user(time_entries, userName[0], userName[1], 8, from_date, to_date) | |
report_str += "\n*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***\n\n{0}\n{2}{1}".format( | |
report["header"], report["daily"], report["by_case_percentage"]) | |
report_str += "\n*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***" | |
return report_str | |
def generate_activity_history(after_date, for_user_name, db): | |
# GET /issues.xml?updated_on=%3E%3D2014-01-02T00:00:00Z | |
after_date_date_time = datetime.datetime.strptime(after_date, "%Y-%m-%d") | |
journal_entries = [] | |
current_offset = 0 | |
total_count = 0 | |
total_found = 0 | |
def journal_get_callback(after_date, fut): | |
nonlocal total_found | |
nonlocal current_offset | |
total_found += len(fut.result()) | |
current_offset = current_offset + 1 | |
percent = (float(current_offset) / float(total_count)) * 100.0 | |
sys.stdout.write( | |
"Analyzing journals: [%-20s] %d%% - Found %d journal entries since %s\r" | |
% ('=' * int(int(percent + 0.5) / 5.0), int(percent + 0.5), total_found, after_date)) | |
sys.stdout.flush() | |
with ThreadPoolExecutor(max_workers=1) as pool: | |
while True: | |
path = "/issues.xml?key={0}&limit={1}&offset={2}&updated_on=%3E%3D{3}T00:00:00Z".format( | |
S_API_KEY, | |
400, # limit | |
str(current_offset), | |
after_date | |
) | |
conn = http.client.HTTPSConnection(S_SERVER_HOST) | |
conn.request("GET", path, headers=headers) | |
res = conn.getresponse() | |
data = res.read() | |
try: | |
root = ElementTree.fromstring(data) | |
except ElementTree.ParseError as e: | |
# print(e) | |
print(str(data) + " .. waiting 1s") | |
# sys.exit(1) | |
time.sleep(1) | |
continue | |
request_response_cache[path] = data | |
total_count = int(root.attrib['total_count']) | |
limit = root.attrib['limit'] | |
offset = root.attrib['offset'] | |
issue_entries = [] | |
for child in root.iter("issue"): | |
issue_id = child.find('id').text | |
issue_subject = child.find('subject').text | |
updated_on = child.find('updated_on').text | |
issue_entries.append({"id": issue_id, "subject": issue_subject, "updated_on": updated_on}) | |
''' | |
# Journals are getting in bunches of 5, but this is notworking as er api does not allow it. | |
window = 5 | |
windowed_issues_entries = [[id[0] for id in issue_entries[i:i + window]] for i in range(len(issue_entries)) if i % window == 0] | |
futures = {pool.submit(get_issues_journals, id_entries, for_user_name, after_date_date_time) for id_entries in | |
windowed_issues_entries} | |
''' | |
futures = [pool.submit(get_issue_journals, identry["id"], identry["updated_on"], | |
for_user_name, after_date_date_time, db) for identry in | |
issue_entries] | |
for fut in futures: | |
fut.add_done_callback(functools.partial(journal_get_callback, after_date)) | |
concurrent.futures.wait(futures) | |
for fut in futures: | |
for entry in fut.result(): | |
journal_entries.append({ | |
"issue_id": entry['issue_id'], | |
"issue_name": entry['subject'], | |
"journal_entry_time": entry['created_on'] | |
}) | |
if current_offset >= total_count: | |
break; | |
return journal_entries | |
def generate_history_report(after_date, for_user_name, db): | |
journal_entries = generate_activity_history(after_date, for_user_name, db) | |
print('\n') | |
journal_entries.sort( | |
key=lambda x: datetime.datetime.strptime(x['journal_entry_time'], "%Y-%m-%dT%H:%M:%SZ").timestamp()) | |
report = "" | |
current_issue_id = -1 | |
for entry in journal_entries: | |
if entry['issue_id'] != current_issue_id: | |
current_issue_id = entry['issue_id'] | |
report += 'Issue: https://{0}/issues/{1} - {2}\n'.format(S_SERVER_HOST, current_issue_id, | |
entry['issue_name']) | |
dt = datetime.datetime.strptime(entry['journal_entry_time'], "%Y-%m-%dT%H:%M:%SZ") | |
report += ' : {0} {1}\n'.format(str(dt.date()), str(dt.time())) | |
return report | |
def main(): | |
argv = sys.argv[1:] | |
print_report = False | |
print_history = False | |
this_week_report = False | |
history_days_period = 7 | |
report_for_user = '' | |
reference_date = datetime.datetime.now() | |
try: | |
# Define the getopt parameters | |
opts, args = getopt.getopt(argv, 'rthu:d:e:', | |
['report', 'this_week_report', 'history', 'user', 'days_period', 'reference_date']) | |
if len(opts) == 0 or len(opts) > 5: | |
print(usage_text) | |
else: | |
# Iterate the options and get the corresponding values | |
for o, a in opts: | |
if o in ('-r', '--report'): | |
print_report = True | |
elif o in ('-t', '--this_week_report'): | |
this_week_report = True | |
elif o in ('-h', '--history'): | |
print_history = True | |
elif o in ('-u', '--user'): | |
report_for_user = a | |
elif o in ('-d', '--days_period'): | |
history_days_period = int(a) | |
elif o in ('-e', '--reference_date'): | |
reference_date = datetime.datetime.strptime(a, '%Y-%m-%d') | |
except getopt.GetoptError: | |
# Print something useful | |
print(usage_text) | |
sys.exit(2) | |
with KeyValueDB("er_" + S_SERVER_HOST + ".db") as db: | |
if print_report: | |
date_time_days_ago = reference_date | |
if not this_week_report: | |
date_time_days_ago = date_time_days_ago - datetime.timedelta(days=history_days_period) | |
start = date_time_days_ago - datetime.timedelta(days=date_time_days_ago.weekday()) | |
end = start + datetime.timedelta(days=history_days_period - 1) | |
print(generate_report(str(start.date()), str(end.date()), report_for_user, db)) | |
if print_history: | |
if print_report: | |
print("\n\n") | |
# from_date_time = reference_date - datetime.timedelta(days=history_days_period) | |
from_date_time = reference_date - datetime.timedelta(days=date_time_days_ago.weekday()) | |
date_time_days_ago = str(from_date_time.date()) | |
print(generate_history_report(date_time_days_ago, report_for_user, db)) | |
if __name__ == '__main__': | |
#test_data_cache() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment