Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Last active May 28, 2025 21:39
Show Gist options
  • Save mzhang77/3ba63cdc2cc0bc18817431622ea46d19 to your computer and use it in GitHub Desktop.
Save mzhang77/3ba63cdc2cc0bc18817431622ea46d19 to your computer and use it in GitHub Desktop.
import time
import os
import subprocess
import json
import sys
from threading import Thread
from queue import Queue
# global variables
cnt = {'total': 0, 'need compaction': 0, 'skipped': 0}
tls = ''
# tls = "--ca-path /path/to/ca.crt --cert-path /path/to/client.crt --key-path /path/to/client.pem"
version = 'v7.5.5'
pd = '10.0.21.72:2379'
thread_per_store = 2
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='app.log',
filemode='a'
)
logger = logging.getLogger(__name__)
def log(msg):
logger.info(msg)
def process_region(region_id, store_address):
cnt['total'] += 1
cmd = f'tiup ctl:{version} tikv {tls} --host {store_address} region-properties -r {region_id}'
log(cmd)
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
result_dict = {}
if result.returncode == 0:
for line in result.stdout.split('\n'):
if len(line) > 0:
key, value = line.split(': ')
result_dict[key] = value
# log(f"mvcc.num_deletes: {result_dict['mvcc.num_deletes']}")
# log(f"mvcc.num_rows: {result_dict['mvcc.num_rows']}")
# log(f"writecf.num_deletes: {result_dict['writecf.num_deletes']}")
# log(f"writecf.num_entries: {result_dict['writecf.num_entries']}")
if (float(result_dict['mvcc.num_deletes']) / float(result_dict['mvcc.num_rows']) > .2 or
float(result_dict['writecf.num_deletes']) / float(result_dict['writecf.num_entries']) > .2):
cnt['need compaction'] += 1
start_time = time.time()
subprocess.run(f'tiup ctl:{version} tikv {tls} --host {store_address} compact --bottommost force -c write -r {region_id}',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
subprocess.run(f'tiup ctl:{version} tikv {tls} --host {store_address} compact --bottommost force -c default -r {region_id}',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
end_time = time.time()
elapsed = end_time - start_time
log(f"Compacted region {region_id} on {store_address} in {elapsed:.2f} seconds.")
else:
cnt['skipped'] += 1
log(f"No need to compact {store_address} region {region_id}")
def get_stores():
store_regions = {}
try:
# Run the shell command and capture the output
result = subprocess.run(
f'tiup ctl:{version} pd -u {pd} {tls} store',
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True
)
# Parse the JSON output
data = json.loads(result.stdout)
for store_info in data.get("stores", []):
store = store_info.get("store", {})
store_id = store.get("id")
cmd = f"tiup ctl:{version} pd -u http://{pd} {tls} region store {store_id}"
log(cmd)
region_result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True
)
region_data = json.loads(region_result.stdout)
region_ids = [region["id"] for region in region_data.get("regions", [])]
store_regions[store_id] = region_ids
def store_worker(store_id, region_ids):
address = next(store_info["store"]["address"] for store_info in data["stores"] if store_info["store"]["id"] == store_id)
store_queue = Queue()
for region_id in region_ids:
store_queue.put(region_id)
def region_worker():
while True:
try:
region_id = store_queue.get_nowait()
except:
break
process_region(region_id, address)
threads = [Thread(target=region_worker) for _ in range(thread_per_store)]
for t in threads:
t.start()
for t in threads:
t.join()
store_threads = [Thread(target=store_worker, args=(sid, regions)) for sid, regions in store_regions.items()]
for t in store_threads:
t.start()
for t in store_threads:
t.join()
except subprocess.CalledProcessError as e:
print(f"Command failed with error:\n{e.stderr}", file=sys.stderr)
except json.JSONDecodeError:
print("Failed to parse JSON output.", file=sys.stderr)
except Exception as e:
print(f"Unexpected error: {e}", file=sys.stderr)
if __name__ == "__main__":
get_stores()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment