Created
August 16, 2019 20:58
-
-
Save englehardt/f8f0a6c325b16c7bc8750c5bed37edcd to your computer and use it in GitHub Desktop.
A file for converting OpenWPM sqlite databases to parquet on S3. This also requires the appropriate `parquet_schema.py` file that matches the sqlite schema. See: https://github.com/mozilla/OpenWPM/blob/master/automation/DataAggregator/parquet_schema.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" This script reads a sqlite database and writes the content to a parquet | |
database on S3 formatted as OpenWPM would format. It's best to just run this | |
on AWS as it bottlenecks on the S3 upload. This is a lightly modified version | |
of OpenWPM's S3Aggregator class. | |
""" | |
import os | |
import sqlite3 | |
import sys | |
from collections import defaultdict | |
import boto3 | |
import pandas as pd | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
import s3fs | |
import six | |
from parquet_schema import PQ_SCHEMAS | |
from pyarrow.filesystem import S3FSWrapper # noqa | |
from tqdm import tqdm | |
BATCH_SIZE = 500 | |
SITE_VISITS_INDEX = '_site_visits_index' | |
BUCKET = 'openwpm-crawls' | |
class Sqlite2Parquet(object): | |
def __init__(self, sqlite_db, s3_directory): | |
self.dir = s3_directory | |
self._records = defaultdict(list) # maps visit_id and table to records | |
self._batches = dict() # maps table_name to a list of batches | |
self._visit_ids = set() | |
self._bucket = BUCKET | |
self._s3 = boto3.client('s3') | |
self._s3_resource = boto3.resource('s3') | |
self._fs = s3fs.S3FileSystem() | |
self._s3_bucket_uri = 's3://%s/%s/visits/%%s' % ( | |
self._bucket, self.dir) | |
self._sqlite_connection = sqlite3.connect(sqlite_db) | |
self._sqlite_connection.row_factory = sqlite3.Row | |
self._sqlite_cursor = self._sqlite_connection.cursor() | |
return | |
def _fetchiter(self, arraysize=10000): | |
"""Generator for sqlite row results""" | |
while True: | |
rows = self._sqlite_cursor.fetchmany(arraysize) | |
if rows == []: | |
break | |
for row in rows: | |
yield row | |
def _write_record(self, table, data): | |
"""Insert data into a RecordBatch""" | |
# Add nulls | |
for item in PQ_SCHEMAS[table].names: | |
if item not in data: | |
data[item] = None | |
# Add instance_id (for partitioning) | |
data['instance_id'] = data['crawl_id'] | |
self._records[table].append(data) | |
def _upload_batch_to_s3(self): | |
"""Copy in-memory data to s3""" | |
for table_name, data in self._records.items(): | |
if table_name not in self._batches: | |
self._batches[table_name] = list() | |
try: | |
df = pd.DataFrame(data) | |
batch = pa.RecordBatch.from_pandas( | |
df, schema=PQ_SCHEMAS[table_name], preserve_index=False | |
) | |
self._batches[table_name].append(batch) | |
print( | |
"\nSuccessfully created batch for table %s, " | |
"consisting of %d records." % (table_name, len(data)) | |
) | |
except pa.lib.ArrowInvalid as e: | |
print( | |
"Error while creating record batch:\n%s\n%s\n%s\n" | |
% (table_name, type(e), e) | |
) | |
pass | |
self._records = defaultdict(list) | |
for table_name, batches in self._batches.items(): | |
try: | |
table = pa.Table.from_batches(batches) | |
pq.write_to_dataset( | |
table, self._s3_bucket_uri % table_name, | |
filesystem=self._fs, | |
preserve_index=False, | |
partition_cols=['instance_id'], | |
compression='snappy', | |
flavor='spark' | |
) | |
print( | |
"\nSuccessfully uploaded batch for table %s, " | |
"to file %s." % | |
(table_name, self._s3_bucket_uri % table_name) | |
) | |
except pa.lib.ArrowInvalid as e: | |
print( | |
"Error while sending record:\n%s\n%s\n%s\n" | |
% (table_name, type(e), e) | |
) | |
pass | |
self._batches[table_name] = list() | |
def _process_record(self, table, data): | |
"""Add `record` to database""" | |
# Upload data every 500 sites | |
self._visit_ids.add(data['visit_id']) | |
if len(self._visit_ids) > BATCH_SIZE: | |
self._upload_batch_to_s3() | |
self._visit_ids = set() | |
# Convert data to text type | |
for k, v in data.items(): | |
if isinstance(v, six.binary_type): | |
data[k] = six.text_type(v, errors='ignore') | |
elif callable(v): | |
data[k] = six.text_type(v) | |
# Save record to disk | |
self._write_record(table, data) | |
def process_table(self, table): | |
self._sqlite_cursor.execute("SELECT MAX(id) as max_id FROM %s" % table) | |
total_rows = self._sqlite_cursor.fetchone()['max_id'] | |
self._sqlite_cursor.execute("SELECT * FROM %s" % table) | |
pbar = tqdm(total=total_rows) | |
for row in self._fetchiter(): | |
pbar.update(1) | |
self._process_record(table, dict(row)) | |
self._upload_batch_to_s3() | |
def close(self): | |
self._sqlite_connection.close() | |
def main(): | |
tables = [ | |
'javascript', | |
'http_requests', | |
'http_responses', | |
'http_redirects', | |
'site_visits' | |
] | |
if len(sys.argv) < 2: | |
print( | |
"Usage: sqlite2parquet.py [SQLITE_DB] [S3 FOLDER] " | |
"(optional: [TABLES])" | |
) | |
sys.exit(1) | |
converter = Sqlite2Parquet( | |
os.path.expanduser(sys.argv[1]), | |
sys.argv[2] | |
) | |
if len(sys.argv) >= 4: | |
tables = sys.argv[3].split(',') | |
print("Processing tables: %s" % tables) | |
else: | |
print("Processing default set of tables: %s" % tables) | |
for table in tables: | |
print("\nStarting processing of %s" % table) | |
converter.process_table(table) | |
converter.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment