Skip to content

Instantly share code, notes, and snippets.

@mheler
Forked from clayg/.gitignore
Created March 5, 2019 01:19
Show Gist options
  • Save mheler/7a086f37457c9607ea50b8f4739091ea to your computer and use it in GitHub Desktop.
Save mheler/7a086f37457c9607ea50b8f4739091ea to your computer and use it in GitHub Desktop.
Troll the disks for async_pendings and gather some stats
#!/usr/bin/env python
import sys
import os
import errno
from argparse import ArgumentParser
from collections import defaultdict
import pickle
import logging
import thread
import threading
from Queue import Queue, Empty
import time
import random
from swift.common.storage_policy import POLICIES
from swift.common.ring import Ring
from swift.obj.diskfile import get_async_dir
# fix monkey-patch lp bug #1380815
logging.threading = threading
logging.thread = thread
logging._lock = threading.RLock()
parser = ArgumentParser()
parser.add_argument('devices', help='root of devices tree for node',
nargs='?', default='/srv/node')
parser.add_argument('--policy-index', help='the policy index',
type=int, default=0)
parser.add_argument('--limit', help='max number of asyncs to check',
default=None, type=int)
parser.add_argument('--top-stats', help='display N top account & container',
default=10, type=int)
parser.add_argument('--workers', help='number of workers', type=int,
default=24, )
parser.add_argument('--verbose', help='log at debug', action='store_true')
parser.add_argument('--swift-dir', help='y u no use /etc/swift',
default='/etc/swift')
class AtomicStats(object):
def __init__(self):
self.stats = defaultdict(int)
self.lock = threading.RLock()
def incr(self, key, amount=1):
with self.lock:
self.stats[key] += amount
def __iter__(self):
return iter(self.stats.items())
STATS = AtomicStats()
def handle_update(update_path, container_ring):
with open(update_path) as f:
update_data = pickle.load(f)
num_success = len(update_data.get('successes', []))
_part, nodes = container_ring.get_nodes(update_data['account'],
update_data['container'])
bad_devs = [n['device'] for n in nodes
if n['id'] not in update_data.get('successes', [])]
if len(bad_devs) == 1:
logging.debug('Notice %r waiting on update to %s',
update_path, ','.join(bad_devs))
return {
'op': update_data['op'],
'account': update_data.get('account'),
'container': update_data.get('container'),
'num_success': num_success,
'bad_devs': bad_devs,
}
def consumer(q, args, ring):
while True:
update_path = q.get()
if update_path is None:
return
logging.debug('Checking %r', update_path)
STATS.incr('count')
update_data = handle_update(update_path, ring)
update_stats(STATS, update_data)
def update_stats(stats, update):
stats.incr('op_%s' % update['op'])
stats.incr('acct_%s' % update['account'])
key = 'cont_%s/%s' % (update['account'], update['container'])
stats.incr(key)
key = 'success_%s' % update['num_success']
stats.incr(key)
for dev in update['bad_devs']:
key = 'dev_%s' % dev
stats.incr(key)
def _display_stats(stats, args):
accounts = []
containers = []
success_counts = []
ops = []
devs = []
logging.info('=' * 50)
for k, v in stats:
if k.startswith('acct_'):
accounts.append((v, k[5:]))
elif k.startswith('cont_'):
containers.append((v, k[5:]))
elif k.startswith('success_'):
success_counts.append((k, v))
elif k.startswith('op_'):
ops.append((k[3:], v))
elif k.startswith('dev_'):
devs.append((v, k[4:]))
else:
logging.info('%-9s: %s', k, v)
for k, v in ops:
logging.info('%-9s: %s' % (k, v))
success_counts.sort()
for k, v in success_counts:
logging.info('%s: %s', k, v)
logging.info('-' * 50)
accounts.sort(reverse=True)
for v, k in accounts[:args.top_stats]:
logging.info('%s: %s', k, v)
containers.sort(reverse=True)
for v, k in containers[:args.top_stats]:
logging.info('%s: %s', k, v)
devs.sort(reverse=True)
for v, k in devs[:args.top_stats]:
logging.info('%s: %s', k, v)
def display_stats(q, args):
while True:
try:
q.get(block=False)
except Empty:
_display_stats(STATS, args)
time.sleep(1.0)
else:
return
def feed_queue(q, args):
policy = POLICIES[args.policy_index]
device_root = args.devices
asyncdir = get_async_dir(policy)
device_dirs = os.listdir(device_root)
random.shuffle(device_dirs)
num_updates = 0
for device_dir in device_dirs:
async_path = os.path.join(device_root, device_dir, asyncdir)
try:
suffixes = os.listdir(async_path)
except OSError as e:
if e.errno == errno.ENOENT:
continue
else:
raise
random.shuffle(suffixes)
for suffix in suffixes:
try:
int(suffix, 16)
except ValueError:
continue
suffix_path = os.path.join(async_path, suffix)
updates = os.listdir(suffix_path)
for update in updates:
update_path = os.path.join(suffix_path, update)
q.put(update_path)
num_updates += 1
if args.limit and num_updates >= args.limit:
return
def main():
args = parser.parse_args()
if args.verbose:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
container_ring = Ring(os.path.join(args.swift_dir, 'container.ring.gz'))
stats_kill_q = Queue(1)
stats_worker = threading.Thread(target=display_stats, args=(
stats_kill_q, args))
stats_worker.start()
q = Queue(1000)
workers = []
try:
for i in range(args.workers):
t = threading.Thread(target=consumer, args=(
q, args, container_ring))
t.start()
workers.append(t)
feed_queue(q, args)
finally:
logging.info('queue finished')
for t in workers:
q.put(None)
for t in workers:
t.join()
logging.info('workers finished')
stats_kill_q.put(None)
stats_worker.join()
_display_stats(STATS, args)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment