Last active
May 28, 2025 21:39
-
-
Save mzhang77/3ba63cdc2cc0bc18817431622ea46d19 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import os | |
import subprocess | |
import json | |
import sys | |
from threading import Thread | |
from queue import Queue | |
# global variables | |
cnt = {'total': 0, 'need compaction': 0, 'skipped': 0} | |
tls = '' | |
# tls = "--ca-path /path/to/ca.crt --cert-path /path/to/client.crt --key-path /path/to/client.pem" | |
version = 'v7.5.5' | |
pd = '10.0.21.72:2379' | |
thread_per_store = 2 | |
import logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
filename='app.log', | |
filemode='a' | |
) | |
logger = logging.getLogger(__name__) | |
def log(msg): | |
logger.info(msg) | |
def process_region(region_id, store_address): | |
cnt['total'] += 1 | |
cmd = f'tiup ctl:{version} tikv {tls} --host {store_address} region-properties -r {region_id}' | |
log(cmd) | |
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True) | |
result_dict = {} | |
if result.returncode == 0: | |
for line in result.stdout.split('\n'): | |
if len(line) > 0: | |
key, value = line.split(': ') | |
result_dict[key] = value | |
# log(f"mvcc.num_deletes: {result_dict['mvcc.num_deletes']}") | |
# log(f"mvcc.num_rows: {result_dict['mvcc.num_rows']}") | |
# log(f"writecf.num_deletes: {result_dict['writecf.num_deletes']}") | |
# log(f"writecf.num_entries: {result_dict['writecf.num_entries']}") | |
if (float(result_dict['mvcc.num_deletes']) / float(result_dict['mvcc.num_rows']) > .2 or | |
float(result_dict['writecf.num_deletes']) / float(result_dict['writecf.num_entries']) > .2): | |
cnt['need compaction'] += 1 | |
start_time = time.time() | |
subprocess.run(f'tiup ctl:{version} tikv {tls} --host {store_address} compact --bottommost force -c write -r {region_id}', | |
shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
subprocess.run(f'tiup ctl:{version} tikv {tls} --host {store_address} compact --bottommost force -c default -r {region_id}', | |
shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
end_time = time.time() | |
elapsed = end_time - start_time | |
log(f"Compacted region {region_id} on {store_address} in {elapsed:.2f} seconds.") | |
else: | |
cnt['skipped'] += 1 | |
log(f"No need to compact {store_address} region {region_id}") | |
def get_stores(): | |
store_regions = {} | |
try: | |
# Run the shell command and capture the output | |
result = subprocess.run( | |
f'tiup ctl:{version} pd -u {pd} {tls} store', | |
check=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
shell=True | |
) | |
# Parse the JSON output | |
data = json.loads(result.stdout) | |
for store_info in data.get("stores", []): | |
store = store_info.get("store", {}) | |
store_id = store.get("id") | |
cmd = f"tiup ctl:{version} pd -u http://{pd} {tls} region store {store_id}" | |
log(cmd) | |
region_result = subprocess.run( | |
cmd, | |
check=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
shell=True | |
) | |
region_data = json.loads(region_result.stdout) | |
region_ids = [region["id"] for region in region_data.get("regions", [])] | |
store_regions[store_id] = region_ids | |
def store_worker(store_id, region_ids): | |
address = next(store_info["store"]["address"] for store_info in data["stores"] if store_info["store"]["id"] == store_id) | |
store_queue = Queue() | |
for region_id in region_ids: | |
store_queue.put(region_id) | |
def region_worker(): | |
while True: | |
try: | |
region_id = store_queue.get_nowait() | |
except: | |
break | |
process_region(region_id, address) | |
threads = [Thread(target=region_worker) for _ in range(thread_per_store)] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
store_threads = [Thread(target=store_worker, args=(sid, regions)) for sid, regions in store_regions.items()] | |
for t in store_threads: | |
t.start() | |
for t in store_threads: | |
t.join() | |
except subprocess.CalledProcessError as e: | |
print(f"Command failed with error:\n{e.stderr}", file=sys.stderr) | |
except json.JSONDecodeError: | |
print("Failed to parse JSON output.", file=sys.stderr) | |
except Exception as e: | |
print(f"Unexpected error: {e}", file=sys.stderr) | |
if __name__ == "__main__": | |
get_stores() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment