Skip to content

Instantly share code, notes, and snippets.

@wlinds
Created March 20, 2025 12:30
Show Gist options
  • Save wlinds/8819352b00fea3810207f9d52a0a38d1 to your computer and use it in GitHub Desktop.
Save wlinds/8819352b00fea3810207f9d52a0a38d1 to your computer and use it in GitHub Desktop.
Script to import data from csv to postgres
# Config:
#
# database.ini:
# [postgresql]
# host = your_host
# port = your_port
# user = your_username
#
# .env file:
# DB_PASSWORD=your_password
# DB_NAME=your_database_name (defaults to "test_pool" if not specified)
import sys, os, logging, psycopg2
import pandas as pd
from configparser import ConfigParser
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def config(
section="postgresql",
):
"""DB-config"""
parser = ConfigParser()
filename = "database.ini"
parser.read(filename)
database = {}
load_dotenv()
if parser.has_section(section):
params = parser.items(section)
for param in params:
database[param[0]] = param[1]
database["password"] = os.getenv("DB_PASSWORD")
database["database"] = os.getenv(
"DB_NAME",
"test_pool",
)
else:
raise RuntimeError(f"Section {section} not found in the {filename} file")
return database
def connect_to_db():
try:
params = config()
return psycopg2.connect(**params)
except (Exception, psycopg2.Error) as error:
logging.error(f"Error connecting to PostgreSQL database: {error}")
raise
def import_csv(csv_file, process_code, organisation, table_name):
"""Import data from CSV to table"""
conn = connect_to_db()
cursor = conn.cursor()
logging.info(f"Reading CSV file: {csv_file}")
df = pd.read_csv(csv_file, sep=';') # Change separator if needed
logging.info(f"Found columns: {df.columns.tolist()}")
required_cols = ['article_category', 'article_subcategory', 'division'] # Change required columns to match your CSV and DB table
email_cols = [col for col in df.columns if col not in required_cols]
logging.info(f"Found {len(email_cols)} email columns: {email_cols}")
total_inserted = 0
for email in email_cols:
inserted = 0
for _, row in df.iterrows():
try:
category = str(row['article_category']).strip()
subcategory = str(row['article_subcategory']).strip()
division = str(row['division']).strip()
days = row[email]
if pd.isna(days) or days == 0:
continue
if isinstance(days, str):
days = days.replace(' ', '').replace(',', '.')
days = float(days)
if days <= 0:
continue
# Update query to match your table columns
cursor.execute(
f"""
INSERT INTO {table_name}
(email, article_category, article_subcategory, process_code, days, organisation, division)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (email, article_category, article_subcategory, process_code, organisation)
DO UPDATE SET days = EXCLUDED.days, division = EXCLUDED.division
""",
(email, category, subcategory, process_code, days, organisation, division)
)
inserted += 1
except Exception as e:
logging.error(f"Error on row {category}/{subcategory}: {e}")
continue
total_inserted += inserted
logging.info(f"Inserted/updated {inserted} records for {email}")
conn.commit()
conn.close()
logging.info(f"Done! Total records inserted/updated: {total_inserted}")
return total_inserted
if __name__ == "__main__":
if len(sys.argv) < 5:
logging.error(f"Usage: {sys.argv[0]} <csv_file> <process_code> <organisation> <table_name>")
sys.exit(1)
csv_file = sys.argv[1]
process_code = sys.argv[2]
organisation = sys.argv[3]
table_name = sys.argv[4]
try:
total = import_csv(csv_file, process_code, organisation, table_name)
logging.info(f"Successfully imported {total} records from {csv_file} to {table_name}")
except Exception as e:
logging.error(f"Error importing data: {e}")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment