-
-
Save mheler/7a086f37457c9607ea50b8f4739091ea to your computer and use it in GitHub Desktop.
Troll the disks for async_pendings and gather some stats
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import os | |
import errno | |
from argparse import ArgumentParser | |
from collections import defaultdict | |
import pickle | |
import logging | |
import thread | |
import threading | |
from Queue import Queue, Empty | |
import time | |
import random | |
from swift.common.storage_policy import POLICIES | |
from swift.common.ring import Ring | |
from swift.obj.diskfile import get_async_dir | |
# fix monkey-patch lp bug #1380815 | |
logging.threading = threading | |
logging.thread = thread | |
logging._lock = threading.RLock() | |
parser = ArgumentParser() | |
parser.add_argument('devices', help='root of devices tree for node', | |
nargs='?', default='/srv/node') | |
parser.add_argument('--policy-index', help='the policy index', | |
type=int, default=0) | |
parser.add_argument('--limit', help='max number of asyncs to check', | |
default=None, type=int) | |
parser.add_argument('--top-stats', help='display N top account & container', | |
default=10, type=int) | |
parser.add_argument('--workers', help='number of workers', type=int, | |
default=24, ) | |
parser.add_argument('--verbose', help='log at debug', action='store_true') | |
parser.add_argument('--swift-dir', help='y u no use /etc/swift', | |
default='/etc/swift') | |
class AtomicStats(object): | |
def __init__(self): | |
self.stats = defaultdict(int) | |
self.lock = threading.RLock() | |
def incr(self, key, amount=1): | |
with self.lock: | |
self.stats[key] += amount | |
def __iter__(self): | |
return iter(self.stats.items()) | |
STATS = AtomicStats() | |
def handle_update(update_path, container_ring): | |
with open(update_path) as f: | |
update_data = pickle.load(f) | |
num_success = len(update_data.get('successes', [])) | |
_part, nodes = container_ring.get_nodes(update_data['account'], | |
update_data['container']) | |
bad_devs = [n['device'] for n in nodes | |
if n['id'] not in update_data.get('successes', [])] | |
if len(bad_devs) == 1: | |
logging.debug('Notice %r waiting on update to %s', | |
update_path, ','.join(bad_devs)) | |
return { | |
'op': update_data['op'], | |
'account': update_data.get('account'), | |
'container': update_data.get('container'), | |
'num_success': num_success, | |
'bad_devs': bad_devs, | |
} | |
def consumer(q, args, ring): | |
while True: | |
update_path = q.get() | |
if update_path is None: | |
return | |
logging.debug('Checking %r', update_path) | |
STATS.incr('count') | |
update_data = handle_update(update_path, ring) | |
update_stats(STATS, update_data) | |
def update_stats(stats, update): | |
stats.incr('op_%s' % update['op']) | |
stats.incr('acct_%s' % update['account']) | |
key = 'cont_%s/%s' % (update['account'], update['container']) | |
stats.incr(key) | |
key = 'success_%s' % update['num_success'] | |
stats.incr(key) | |
for dev in update['bad_devs']: | |
key = 'dev_%s' % dev | |
stats.incr(key) | |
def _display_stats(stats, args): | |
accounts = [] | |
containers = [] | |
success_counts = [] | |
ops = [] | |
devs = [] | |
logging.info('=' * 50) | |
for k, v in stats: | |
if k.startswith('acct_'): | |
accounts.append((v, k[5:])) | |
elif k.startswith('cont_'): | |
containers.append((v, k[5:])) | |
elif k.startswith('success_'): | |
success_counts.append((k, v)) | |
elif k.startswith('op_'): | |
ops.append((k[3:], v)) | |
elif k.startswith('dev_'): | |
devs.append((v, k[4:])) | |
else: | |
logging.info('%-9s: %s', k, v) | |
for k, v in ops: | |
logging.info('%-9s: %s' % (k, v)) | |
success_counts.sort() | |
for k, v in success_counts: | |
logging.info('%s: %s', k, v) | |
logging.info('-' * 50) | |
accounts.sort(reverse=True) | |
for v, k in accounts[:args.top_stats]: | |
logging.info('%s: %s', k, v) | |
containers.sort(reverse=True) | |
for v, k in containers[:args.top_stats]: | |
logging.info('%s: %s', k, v) | |
devs.sort(reverse=True) | |
for v, k in devs[:args.top_stats]: | |
logging.info('%s: %s', k, v) | |
def display_stats(q, args): | |
while True: | |
try: | |
q.get(block=False) | |
except Empty: | |
_display_stats(STATS, args) | |
time.sleep(1.0) | |
else: | |
return | |
def feed_queue(q, args): | |
policy = POLICIES[args.policy_index] | |
device_root = args.devices | |
asyncdir = get_async_dir(policy) | |
device_dirs = os.listdir(device_root) | |
random.shuffle(device_dirs) | |
num_updates = 0 | |
for device_dir in device_dirs: | |
async_path = os.path.join(device_root, device_dir, asyncdir) | |
try: | |
suffixes = os.listdir(async_path) | |
except OSError as e: | |
if e.errno == errno.ENOENT: | |
continue | |
else: | |
raise | |
random.shuffle(suffixes) | |
for suffix in suffixes: | |
try: | |
int(suffix, 16) | |
except ValueError: | |
continue | |
suffix_path = os.path.join(async_path, suffix) | |
updates = os.listdir(suffix_path) | |
for update in updates: | |
update_path = os.path.join(suffix_path, update) | |
q.put(update_path) | |
num_updates += 1 | |
if args.limit and num_updates >= args.limit: | |
return | |
def main(): | |
args = parser.parse_args() | |
if args.verbose: | |
level = logging.DEBUG | |
else: | |
level = logging.INFO | |
logging.basicConfig(level=level) | |
container_ring = Ring(os.path.join(args.swift_dir, 'container.ring.gz')) | |
stats_kill_q = Queue(1) | |
stats_worker = threading.Thread(target=display_stats, args=( | |
stats_kill_q, args)) | |
stats_worker.start() | |
q = Queue(1000) | |
workers = [] | |
try: | |
for i in range(args.workers): | |
t = threading.Thread(target=consumer, args=( | |
q, args, container_ring)) | |
t.start() | |
workers.append(t) | |
feed_queue(q, args) | |
finally: | |
logging.info('queue finished') | |
for t in workers: | |
q.put(None) | |
for t in workers: | |
t.join() | |
logging.info('workers finished') | |
stats_kill_q.put(None) | |
stats_worker.join() | |
_display_stats(STATS, args) | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment