Skip to content

Instantly share code, notes, and snippets.

@syakesaba
Created January 11, 2025 06:08
Show Gist options
  • Save syakesaba/f42f041d93b6e138989d625bd498297f to your computer and use it in GitHub Desktop.
Save syakesaba/f42f041d93b6e138989d625bd498297f to your computer and use it in GitHub Desktop.
This script sets up a Spark session configured for Apache Iceberg, creates necessary tables, and performs data ingestion from CSV files.
#!/usr/bin/env python
# encoding: utf-8
"""
This script sets up a Spark session configured for Apache Iceberg,
creates necessary tables, and performs data ingestion from CSV files.
Dependencies:
- PySpark
- Apache Iceberg
- Hadoop AWS
Environment Variables:
- JAVA_HOME: Path to jdk-17.0.2/
- JAVA_OPTS: Java options for the Spark driver
- e.g.: "-Dhttp.proxyHost=yourproxy.com -Dhttp.proxyPort=8080 -Dhttps.proxyHost=yourproxy.com -Dhttps.proxyPort=8080"
- PROXY_HOST: yourproxy.com
- PROXY_PORT: 8080
- S3A_REGION: AWS region for S3
- S3A_ENDPOINT: Endpoint for S3
- S3A_ACCESS_KEY: Access key for S3
- S3A_SECRET_KEY: Secret key for S3
"""
import os
from pyspark.sql import SparkSession
SPARK_ICEBERG_VERSION = "1.7.1"
HADOOP_AWS_VERSION = "3.3.4"
CATALOG_NAME = "catalog_iceberg"
WAREHOUSE_URL = "s3a://datalakehouse/iceberg"
spark: SparkSession = (
SparkSession.builder.appName("yourapp")
# .config("spark.log.level", "DEBUG")
.config("spark.driver.defaultJavaOptions", os.getenv("JAVA_OPTS") or "")
.config(
"spark.jars.packages",
f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{SPARK_ICEBERG_VERSION}"
f",org.apache.hadoop:hadoop-aws:{HADOOP_AWS_VERSION}",
)
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog",
)
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config(
f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog"
)
.config(
f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop"
) # https://iceberg.apache.org/docs/1.5.0/spark-configuration/
.config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", f"{WAREHOUSE_URL}")
.config("spark.hadoop.fs.s3a.proxy.host", os.getenv("PROXY_HOST") or "")
.config("spark.hadoop.fs.s3a.proxy.port", os.getenv("PROXY_PORT") or -1)
.config("spark.hadoop.fs.s3a.endpoint.region", os.getenv("S3A_REGION") or "")
.config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3A_ENDPOINT") or "")
.config("spark.hadoop.fs.s3a.access.key", os.getenv("S3A_ACCESS_KEY") or "")
.config("spark.hadoop.fs.s3a.secret.key", os.getenv("S3A_SECRET_KEY") or "")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.path.style.access", True)
.getOrCreate()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment