import re import sys import json import httplib from datetime import datetime from HTMLParser import HTMLParser from optparse import OptionParser # pre-compiled regular expressions rowRe = re.compile('^\s*<row') # detects a row attrRe = re.compile('(\w+)="(.*?)"') # extracts all attribues and values cleanupRe = re.compile('<[^<]+?>|[\r\n]+|\s+') # strips out html and extra whitespace tagsRe = re.compile('<(.*?)>') # splits tags into a list intRe = re.compile('^\d+$') # determines if field is an integer def http(method, path, data): """ Execute a Generic HTTP Request againt ElasticSearch """ conn = httplib.HTTPConnection(HOST) conn.request(method, path, data) return conn.getresponse() def bulk(data): """ Submit Bulk ElasticSearch Request """ resp = http('POST', '/%s/_bulk' % INDEX, data) # load response and check for errors status = json.loads(resp.read()) for stat in status['items']: if not stat['index']['ok']: print json.dumps(stat) return resp def main(fileName): """ Parse StackExchange data into ElasticSearch Bulk Format """ # html parser used to unescape the body and title html = HTMLParser() with open(fileName) as f: docs = [] for i, line in enumerate(f): # skip line if not a row if rowRe.match(line) is None: continue # build the document to be indexed doc = {} for field, val in attrRe.findall(line): # strip whitespace and skip field if empty value val = val.strip() if not val: continue # cleanup title and body by stripping html and whitespace if field in ['Body', 'Title']: val = cleanupRe.sub(' ', html.unescape(unicode(val, 'utf-8', 'ignore'))) # make sure dates are in correct format elif field in ['CreationDate', 'LastActivityDate', 'LastEditDate']: # 2008-07-31T21:42:52.667 val = '%sZ' % val # parse creation month, day, hour, and minute if field == 'CreationDate': dateObj = datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%fZ') doc['CreationMonth'] = dateObj.strftime('%B') doc['CreationDay'] = dateObj.strftime('%A') doc['CreationHour'] = dateObj.strftime('%H') doc['CreationMinute'] = dateObj.strftime('%M') # split tags into an aray of tags elif field == 'Tags': val = tagsRe.findall(val) # convert vals to integers if needed elif intRe.match(val) is not None: val = int(val) doc[field] = val # create the bulk action action = {'_id': '%s' % doc['Id'], '_type': 'question'} if doc['PostTypeId'] == 2: action['_type'] = 'answer' action['_parent'] = '%s' % doc['ParentId'] # queue bulk json request docs.append(json.dumps({'index': action})) docs.append(json.dumps(doc)) # submit bulk request if len(docs) == BULK_SIZE * 2: # multiple by 2 to account for action if VERBOSE: print 'Submitting %s docs...' % BULK_SIZE bulk('%s\n' % '\n'.join(docs)) # newline so last item is processed docs = [] # only index a subset of the posts # set max to -1 for all docs if MAX_DOCS != -1 and i == MAX_DOCS + 1: if VERBOSE: print 'Hit max document count of %s' % MAX_DOCS break; # submit any hanging requests if len(docs) > 0: if VERBOSE: print 'Submitting remaing %s docs in queue...' % (len(docs) / 2) bulk('%s\n' % '\n'.join(docs)) return 0 if __name__ == '__main__': usage = 'usage: %prog [options] file' parser = OptionParser(usage) parser.add_option('-s', '--server', dest='server', default='localhost:9200', help='ElasticSearch Server') parser.add_option('-i', '--index', dest='index', default='stackoverflow', help='Index name to use') parser.add_option('-b', '--bulk-size', dest='bulkSize', type='int', default=100000, help='Number of docs to submit in each bulk request.') parser.add_option('-m', '--max-docs', dest='maxDocs', type='int', default=-1, help='Max number of docs to index') parser.add_option('-v', '--verbose', dest='verbose', action='store_true', default=False, help='Enable verbose output') options, args = parser.parse_args() if len(args) != 1: parser.error('The StackOverflow posts.xml file location must be specified') # globals HOST = options.server INDEX = options.index BULK_SIZE = options.bulkSize MAX_DOCS = options.maxDocs VERBOSE = options.verbose ret = main(args[0]) sys.exit(ret)