Last active
February 2, 2021 07:46
-
-
Save mattarderne/163060f149de85f9f011d8e225974cff to your computer and use it in GitHub Desktop.
snowflake_bulk_create
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy as db | |
import pandas as pd | |
import snowflake.connector | |
import os | |
from snowflake.sqlalchemy import URL | |
from datetime import datetime | |
# import records | |
def connect(database, role = 'TRANSFORMER', warehouse = 'TRANSFORMING'): | |
''' | |
Connect to Snowflake, returns the cursor to be reused | |
''' | |
# Get passwords etc from the .env file | |
try: | |
engine = db.create_engine( | |
URL(user=os.getenv("user"), | |
password=os.getenv("pdw"), | |
account='<account>', | |
role=role, | |
warehouse=warehouse)) | |
con = engine.connect() | |
con.execute(f'USE WAREHOUSE {warehouse}') | |
con.execute(f'USE DATABASE {database}') | |
except Exception as e: | |
raise | |
traceback.format_exception(*sys.exc_info()) | |
raise # reraises the exception | |
return con; | |
def query_table(connection, database=None,schema=None,table=None, sql=None): | |
''' | |
Connect to snowflake and return a dataframe from the SQL query | |
''' | |
if sql: | |
sql = sql | |
else: | |
sql = f'''SELECT | |
* | |
FROM "{database}"."{schema}"."{table}" | |
''' | |
df = pd.read_sql(sql, connection) | |
con.close() | |
return df; | |
### template for the views | |
template = """ | |
config { | |
type: "view", | |
schema:"CANVAS_INSHOSTEDDATA" | |
} | |
with source as ( | |
SELECT * FROM ${getRawDatabase()}.${canvas.CANVAS_INSHOSTEDDATA_SCHEMA}."SNOWFLAKE_TABLE_NAME_VAR" | |
), | |
renamed as ( | |
select | |
ALL_FIELD_NAMES | |
'' as hack | |
from source | |
) | |
select * from renamed""" | |
con = connect('RAW_DEV', 'ROLE_DATA_ENGINEERING', 'WAREHOUSE_TRANSFORM_DEV') | |
tables_sql = """show tables like '%%' in RAW_DEV.CANVAS_INSHOSTEDDATA""" | |
columns_sql = """desc table RAW_DEV.CANVAS_INSHOSTEDDATA.SNOWFLAKE_TABLE_NAME_VAR;""" | |
results = query_table(con, 'RAW_DEV', 'CANVAS_INSHOSTEDDATA','',tables_sql) | |
for name in results.name: | |
all_columns = '' | |
columns_query = '' | |
columns = '' | |
sql_result = '' | |
print(name) | |
## get columns | |
columns_query = columns_sql.replace('SNOWFLAKE_TABLE_NAME_VAR',name) | |
con = connect('RAW_DEV', 'ROLE_DATA_ENGINEERING', 'WAREHOUSE_TRANSFORM_DEV') | |
columns = query_table(con, 'RAW_DEV', 'CANVAS_INSHOSTEDDATA','',columns_query) | |
## create columns list | |
for column in columns.name: | |
# column = ' ' + column + ' as ' + name + '_' + column + ',' + '\n' | |
column = ' ' + column + ',' + '\n' | |
all_columns+=column | |
## create SQL by replacing placeholders with variables | |
sql_result = template.replace('SNOWFLAKE_TABLE_NAME_VAR',name).replace('ALL_FIELD_NAMES',all_columns) | |
## write to a file | |
name = name.lower() | |
file_name = f'canvas/base/canvas_base_{name}.sqlx' | |
with open(file_name, 'w') as f: | |
f.write("%s" % sql_result) | |
print('done') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
TODO:
CANVAS_INSHOSTEDDATA
from the script and replace with a variablecol as col_name
orcol
)'' as hack
hack${getRawDatabase()}.${canvas.CANVAS_INSHOSTEDDATA_SCHEMA}.
javascript or remove