Created
December 5, 2018 10:02
-
-
Save semyont/3df3a31882cc17225d12fbad168b5633 to your computer and use it in GitHub Desktop.
Redshift Upsert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import psycopg2 | |
from secret import REDSHIFT_CREDS | |
from secret import AWS_ACCESS_KEY, AWS_SECRET_KEY | |
def get_primary_keys(tablename, db): | |
c = db.cursor() | |
sql = "select indexdef from pg_indexes where tablename = '%s';" % tablename | |
c.execute(sql) | |
result = c.fetchall()[0][0] | |
rfields = result.split('(')[1].strip(')').split(',') | |
fields = [field.strip().strip('"') for field in rfields] | |
return fields | |
def load_s3_file_into_redshift(s3path, tablename, delimiter='|'): | |
"""File must be gzipped: | |
s3path on Amazon S3 - (str) | |
tablename - (str) | |
field delimiter - (str)""" | |
temp_tablename = 'temp_%s' % uuid.uuid4().get_hex() | |
db = psycopg2.connect(**REDSHIFT_CREDS) | |
primary_keys = get_primary_keys(tablename, db) | |
equals_clause = '{dest}.%s = {src}.%s' | |
join_clause = ' AND '.join([equals_clause % (pk, pk) for pk in primary_keys]) | |
join_clause = join_clause.format(dest=tablename, src=temp_tablename) | |
upsert_qry = """\ | |
CREATE TEMPORARY TABLE {src} (LIKE {dest}); | |
COPY {src} FROM '{s3path}' | |
CREDENTIALS 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}' | |
DELIMITER '{delimiter}' gzip; | |
BEGIN; | |
LOCK {dest}; | |
DELETE FROM {dest} USING {src} WHERE {join_clause}; | |
INSERT INTO {dest} SELECT * FROM {src}; | |
END; | |
""".format(dest=tablename, | |
src=temp_tablename, | |
s3path=s3path, | |
join_clause=join_clause, | |
access_key=AWS_ACCESS_KEY, | |
secret_key=AWS_SECRET_KEY, | |
delimiter=delimiter) | |
c = db.cursor() | |
c.execute(upsert_qry) | |
db.commit() | |
return s3path |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment