Created
January 11, 2025 06:08
-
-
Save syakesaba/f42f041d93b6e138989d625bd498297f to your computer and use it in GitHub Desktop.
This script sets up a Spark session configured for Apache Iceberg, creates necessary tables, and performs data ingestion from CSV files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
This script sets up a Spark session configured for Apache Iceberg, | |
creates necessary tables, and performs data ingestion from CSV files. | |
Dependencies: | |
- PySpark | |
- Apache Iceberg | |
- Hadoop AWS | |
Environment Variables: | |
- JAVA_HOME: Path to jdk-17.0.2/ | |
- JAVA_OPTS: Java options for the Spark driver | |
- e.g.: "-Dhttp.proxyHost=yourproxy.com -Dhttp.proxyPort=8080 -Dhttps.proxyHost=yourproxy.com -Dhttps.proxyPort=8080" | |
- PROXY_HOST: yourproxy.com | |
- PROXY_PORT: 8080 | |
- S3A_REGION: AWS region for S3 | |
- S3A_ENDPOINT: Endpoint for S3 | |
- S3A_ACCESS_KEY: Access key for S3 | |
- S3A_SECRET_KEY: Secret key for S3 | |
""" | |
import os | |
from pyspark.sql import SparkSession | |
SPARK_ICEBERG_VERSION = "1.7.1" | |
HADOOP_AWS_VERSION = "3.3.4" | |
CATALOG_NAME = "catalog_iceberg" | |
WAREHOUSE_URL = "s3a://datalakehouse/iceberg" | |
spark: SparkSession = ( | |
SparkSession.builder.appName("yourapp") | |
# .config("spark.log.level", "DEBUG") | |
.config("spark.driver.defaultJavaOptions", os.getenv("JAVA_OPTS") or "") | |
.config( | |
"spark.jars.packages", | |
f"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{SPARK_ICEBERG_VERSION}" | |
f",org.apache.hadoop:hadoop-aws:{HADOOP_AWS_VERSION}", | |
) | |
.config( | |
"spark.sql.extensions", | |
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", | |
) | |
.config( | |
"spark.sql.catalog.spark_catalog", | |
"org.apache.iceberg.spark.SparkSessionCatalog", | |
) | |
.config("spark.sql.catalog.spark_catalog.type", "hive") | |
.config( | |
f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog" | |
) | |
.config( | |
f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop" | |
) # https://iceberg.apache.org/docs/1.5.0/spark-configuration/ | |
.config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", f"{WAREHOUSE_URL}") | |
.config("spark.hadoop.fs.s3a.proxy.host", os.getenv("PROXY_HOST") or "") | |
.config("spark.hadoop.fs.s3a.proxy.port", os.getenv("PROXY_PORT") or -1) | |
.config("spark.hadoop.fs.s3a.endpoint.region", os.getenv("S3A_REGION") or "") | |
.config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3A_ENDPOINT") or "") | |
.config("spark.hadoop.fs.s3a.access.key", os.getenv("S3A_ACCESS_KEY") or "") | |
.config("spark.hadoop.fs.s3a.secret.key", os.getenv("S3A_SECRET_KEY") or "") | |
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") | |
.config("spark.hadoop.fs.s3a.path.style.access", True) | |
.getOrCreate() | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment