Skip to content

Instantly share code, notes, and snippets.

@tinkerbotfoo
Forked from brandonwillard/pandas_dbms.py
Last active December 22, 2015 00:39
Show Gist options
  • Save tinkerbotfoo/6390680 to your computer and use it in GitHub Desktop.
Save tinkerbotfoo/6390680 to your computer and use it in GitHub Desktop.
PYTHON PANDAS - Work with Database script - postgres, mysql, oracle, sqllite3
"""
example use of pandas with oracle mysql postgresql sqlite
lightly tested.
to do:
save/restore index (how to check table existence? just do select count(*)?),
finish odbc,
add booleans?,
sql_server?
"""
from datetime import datetime
import cStringIO #for file-like objects
import numpy as np
from pandas import *
import pandas.io.sql as psql
# for building create table schemas with appropriate type names
dbtypes={
'mysql' : {'DATE':'DATE', 'DATETIME':'DATETIME', 'INT':'INT', 'FLOAT':'FLOAT', 'VARCHAR':'VARCHAR'},
'oracle': {'DATE':'DATE', 'DATETIME':'DATE', 'INT':'NUMBER', 'FLOAT':'NUMBER', 'VARCHAR':'VARCHAR2'},
'sqlite': {'DATE':'TIMESTAMP', 'DATETIME':'TIMESTAMP', 'INT':'NUMBER', 'FLOAT':'NUMBER', 'VARCHAR':'VARCHAR2'},
'postgresql': {'DATE':'TIMESTAMP', 'DATETIME':'TIMESTAMP', 'INT':'INTEGER', 'FLOAT':'REAL', 'VARCHAR':'TEXT'},
}
def get_schema(frame, name, flavor):
'''build a create table string from a dataframe for the specified flavor of dbms'''
types = dbtypes[flavor] #deal with datatype differences
column_types = []
dtypes = frame.dtypes
for i,k in enumerate(dtypes.index):
dt = dtypes[k]
#print 'dtype', dt, dt.itemsize
if str(dt.type)=="<type 'numpy.datetime64'>":
sqltype = types['DATETIME']
elif issubclass(dt.type, np.datetime64):
sqltype = types['DATETIME']
elif issubclass(dt.type, (np.integer, np.bool_)):
sqltype = types['INT']
elif issubclass(dt.type, np.floating):
sqltype = types['FLOAT']
else:
sampl = frame[ frame.columns[i] ][0]
#print 'other', type(sampl)
if str(type(sampl))=="<type 'datetime.datetime'>":
sqltype = types['DATETIME']
elif str(type(sampl))=="<type 'datetime.date'>":
sqltype = types['DATE']
else:
if flavor in ('mysql','oracle'):
size = 2 + max( (len(str(a)) for a in frame[k]) )
print k,'varchar sz', size
sqltype = types['VARCHAR'] + '(?)'.replace('?', str(size) )
else:
sqltype = types['VARCHAR']
column_types.append((k, sqltype))
columns = ',\n '.join('%s %s' % x for x in column_types)
template_create = """CREATE TABLE %(name)s (
%(columns)s
);"""
return template_create % {'name' : name, 'columns' : columns}
def read_db(sql, con):
'''send SELECT to server and return a dataframe'''
return psql.frame_query(sql, con)
def table_exists(name=None, con=None, flavor='sqlite'):
'''check whether this table exists on the server already. how to do in ODBC?'''
if flavor == 'sqlite':
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';".replace('MYTABLE', name)
elif flavor == 'mysql':
sql="show tables like 'MYTABLE';".replace('MYTABLE', name)
elif flavor == 'postgresql':
sql= "SELECT * FROM pg_tables WHERE tablename='MYTABLE';".replace('MYTABLE', name)
elif flavor == 'oracle':
sql="select table_name from user_tables where table_name='MYTABLE'".replace('MYTABLE', name.upper())
else:
raise NotImplementedError
df = read_db(sql, con)
print sql, df
print 'table_exists?', len(df)
exists = True if len(df)>0 else False
return exists
def write_frame(frame, name=None, con=None, flavor='sqlite', if_exists='fail'):
"""
Write records stored in a DataFrame to specified dbms.
if_exists:
'fail' - create table will be attempted and fail
'replace' - if table with 'name' exists, it will be deleted
'append' - assume table with correct schema exists and add data. if no table or bad data, then fail.
??? if table doesn't exist, make it.
if table already exists. Add: if_exists=('replace','append','fail')
"""
if if_exists=='replace' and table_exists(name, con, flavor):
cur = con.cursor()
cur.execute("drop table "+name)
cur.close()
if if_exists in ('fail','replace') or ( if_exists=='append' and table_exists(name, con, flavor)==False ):
#create table
schema = get_schema(frame, name, flavor)
if flavor=='oracle':
schema = schema.replace(';','')
cur = con.cursor()
print 'schema\n', schema
cur.execute(schema)
print 'created table'
if flavor=='sqlite':
wildcards = ','.join(['?'] * len(frame.columns))
insert_sql = 'INSERT INTO %s VALUES (%s)' % (name, wildcards)
print 'insert_sql', insert_sql
data = [tuple(x) for x in frame.values]
print 'data', data
cur.executemany(insert_sql, data)
elif flavor=='oracle':
cols=[k for k in frame.dtypes.index]
colnames = ','.join(cols)
colpos = ', '.join([':'+str(i+1) for i,f in enumerate(cols)])
insert_sql = 'INSERT INTO %s (%s) VALUES (%s)' % (name, colnames, colpos)
print 'insert_sql', insert_sql
data = [ convertSequenceToDict(rec) for rec in frame.values]
print data
cur.executemany(insert_sql, data)
elif flavor=='mysql':
wildcards = ','.join(['%s'] * len(frame.columns))
cols=[k for k in frame.dtypes.index]
colnames = ','.join(cols)
insert_sql = 'INSERT INTO %s (%s) VALUES (%s)' % (name, colnames, wildcards)
data = [tuple(x) for x in frame.values]
cur.executemany(insert_sql, data)
elif flavor=='postgresql':
postgresql_copy_from(frame, name, con)
else:
raise NotImplementedError
con.commit()
#cur.close()
return
def postgresql_copy_from(df, name, con ):
# append data into existing postgresql table using COPY for speed
# 1. convert df to csv no header
output = cStringIO.StringIO()
# deal with datetime64 to_csv() bug
have_datetime64 = False
dtypes = df.dtypes
for i, k in enumerate(dtypes.index):
dt = dtypes[k]
print 'dtype', dt, dt.itemsize
if str(dt.type)=="<type 'numpy.datetime64'>":
have_datetime64 = True
if have_datetime64:
d2=df.copy()
for i, k in enumerate(dtypes.index):
dt = dtypes[k]
if str(dt.type)=="<type 'numpy.datetime64'>":
d2[k] = [ v.to_pydatetime() for v in d2[k] ]
#convert datetime64 to datetime
#ddt= [v.to_pydatetime() for v in dd] #convert datetime64 to datetime
d2.to_csv(output, sep='\t', header=False, index=False, na_rep='\\N')
else:
df.to_csv(output, sep='\t', header=False, index=False, na_rep='\\N')
contents = output.getvalue()
#print 'contents\n', contents
# 2. copy from
cur = con.cursor()
output.seek(0)
cur.copy_from(output, name, null='\\N')
con.commit()
#cur.close()
return
#source: http://www.gingerandjohn.com/archives/2004/02/26/cx_oracle-executemany-example/
def convertSequenceToDict(list):
"""for cx_Oracle:
For each element in the sequence, creates a dictionary item equal
to the element and keyed by the position of the item in the list.
>>> convertListToDict(("Matt", 1))
{'1': 'Matt', '2': 1}
"""
dict = {}
argList = range(1,len(list)+1)
for k,v in zip(argList, list):
dict[str(k)] = v
return dict
###############################################################################
def test_sqlite(name, testdf):
print '\nsqlite, using detect_types=sqlite3.PARSE_DECLTYPES for datetimes'
import sqlite3
with sqlite3.connect('test.db', detect_types=sqlite3.PARSE_DECLTYPES) as conn:
#conn.row_factory = sqlite3.Row
write_frame(testdf, name, con=conn, flavor='sqlite', if_exists='replace')
df_sqlite = read_db('select * from '+name, con=conn)
print 'loaded dataframe from sqlite', len(df_sqlite)
print 'done with sqlite'
def test_oracle(name, testdf):
print '\nOracle'
import cx_Oracle
with cx_Oracle.connect('YOURCONNECTION') as ora_conn:
testdf['d64'] = np.datetime64( testdf['hire_date'] )
write_frame(testdf, name, con=ora_conn, flavor='oracle', if_exists='replace')
df_ora2 = read_db('select * from '+name, con=ora_conn)
print 'done with oracle'
return df_ora2
def test_postgresql(name, testdf):
#from pg8000 import DBAPI as pg
import psycopg2 as pg
print '\nPostgresQL, Greenplum'
pgcn = pg.connect(host='localhost', port=5433, database='pgdb', user='vpham', password='TwbSE29y')
print 'df frame_query'
try:
write_frame(testdf, name, con=pgcn, flavor='postgresql', if_exists='replace')
print 'pg copy_from'
postgresql_copy_from(testdf, name, con=pgcn)
df_gp = read_db('select * from '+name, con=pgcn)
print 'loaded dataframe from greenplum', len(df_gp)
finally:
pgcn.commit()
pgcn.close()
print 'done with greenplum'
def test_mysql(name, testdf):
import MySQLdb
print '\nmysql'
cn= MySQLdb.connect(YOURCONNECTION)
try:
write_frame(testdf, name='test_df', con=cn, flavor='mysql', if_exists='replace')
df_mysql = read_db('select * from '+name, con=cn)
print 'loaded dataframe from mysql', len(df_mysql)
finally:
cn.close()
print 'mysql done'
##############################################################################
if __name__=='__main__':
print """Aside from sqlite, you'll need to install the driver and set a valid
connection string for each test routine."""
test_data = {
"name": [ 'Joe', 'Bob', 'Jim', 'Suzy', 'Cathy', 'Sarah' ],
"hire_date": [ datetime(2012,1,1), datetime(2012,2,1), datetime(2012,3,1), datetime(2012,4,1), datetime(2012,5,1), datetime(2012,6,1) ],
"erank": [ 1, 2, 3, 4, 5, 6 ],
"score": [ 1.1, 2.2, 3.1, 2.5, 3.6, 1.8]
}
df = DataFrame(test_data)
name='tmp_table'
#test_sqlite(name, df)
#test_oracle(name, df)
test_postgresql(name, df)
#test_mysql(name, df)
print 'done'
@tinkerbotfoo
Copy link
Author

Works with postgres 9.2 on heroku postgres

Changes :
Commented certain print statements to avoid clutter
cur.close was giving some error. Commented and confirmed data loaded successfully

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment