Created
January 13, 2022 13:22
-
-
Save voutilad/e0670a94f643ef1ddd43f8d4a533d7e4 to your computer and use it in GitHub Desktop.
Example remote bulk import with neo4j-arrow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
## When pointed at an import directory (see https://neo4j.com/docs/operations-manual/current/tools/neo4j-admin/neo4j-admin-import/) | |
## this script identifies the graph entities and builds the stream of nodes and relationships. | |
import neo4j_arrow as na | |
import pyimport as pi # see https://github.com/neo4j-field/neo4j-arrow/blob/bulk-import/src/python/pyimport.py | |
from time import time | |
from sys import argv, exit | |
if __name__ != '__main__': | |
print('this is not a module!') | |
exit(1) | |
if len(argv) != 2: | |
print(f'usage: {argv[0]} [import directory]') | |
exit(1) | |
db = argv[-1] | |
client = na.Neo4jArrow('neo4j', 'password', ('voutila-arrow-test', 9999), tls=True, verifyTls=False) | |
nodes, rels = pi.load_dir(db) | |
db = f'{db}{int(time())}' # use a timestamped name for now | |
print(f'importing {len(nodes):,} nodes & {len(rels):,} rels into db {db}') | |
ticket = client.bulk_import(db, idField='ID', sourceField='START_ID', targetField='END_ID') | |
# Push the nodes to the server. Doesn't _need_ to be done first, but bulkimport API starts working on nodes | |
rows, nbytes = client.put_stream(ticket, nodes, metadata={'stream.type': 'node'}) | |
print(f'streamed {rows:,} nodes ({round(nbytes / (1 << 20), 3):,} MiB)') | |
# Push the relationships to the server. This could be done concurrently with the node stream. | |
rows, nbytes = client.put_stream(ticket, rels, metadata={'stream.type': 'rels'}) | |
print(f'streamed {rows:,} relationships ({round(nbytes / (1 << 20), 3):,} MiB)') | |
# At this point, it's just waiting for the server to finish the import. | |
timeout = 5 * 60 | |
print(f'waiting {timeout}s for job completion') | |
if client.wait_for_job(ticket, status=na.JobStatus.COMPLETE, timeout=timeout): | |
print('finished!') | |
else: | |
print('timed out.' ) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment