Created
November 17, 2025 11:22
-
-
Save samisalkosuo/d5494aefc4df2f7a691906cf09fbbde8 to your computer and use it in GitHub Desktop.
Sample Spark-application running on IBM watsonx.data and decorated with IBM Databand SDK.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # %% [markdown] | |
| # Drone Ops raw data from PostgreSQL to watsonx.data | |
| # | |
| # Uses Databand. Code uses Databand Python SDK to decorate with Databand tracking. | |
| # | |
| # See https://www.ibm.com/docs/en/dobd?topic=python-tracking-functions. | |
| # | |
| # See also Databand docs https://www.ibm.com/docs/en/dobd?topic=integrations-code-based-workflows. | |
| # %% | |
| #variables | |
| APP_NAME="drone-ops-bronze-to-silver-1min" | |
| SCHEMA_NAME = "drone_swarm_ops" | |
| CATALOG_NAME = "bucket_01" | |
| BUCKET_NAME = "bucket-01" | |
| JDBC_URL = "jdbc:postgresql://postgres.lab-postgres.svc.cluster.local:5432/drone_swarm_ops" | |
| JDBC_PROPS = {"user":"demo", "password":"passw0rd", "driver":"org.postgresql.Driver"} | |
| #imports | |
| from pyspark.sql import SparkSession | |
| import os | |
| from datetime import datetime, timedelta | |
| import time | |
| from pyspark.sql import SparkSession, functions as F, types as T | |
| from pyspark.sql.functions import col, to_utc_timestamp | |
| import base64,getpass | |
| import warnings | |
| from dbnd import dbnd_tracking, task, dataset_op_logger, log_metric, log_dataframe | |
| warnings.filterwarnings('ignore') | |
| # %% | |
| #init Spark | |
| #spark config from watsonx.data Infrastructure Manager Spark engines Default Spark configuration | |
| spark_config=""" | |
| spark.config.line.1 | |
| spark.config.line.2 | |
| """ | |
| # Parse non-empty, non-comment lines into key/value pairs | |
| spark_conf_lines = [ | |
| line.strip().split("=", 1) | |
| for line in spark_config.strip().splitlines() | |
| if line.strip() and not line.strip().startswith("#") | |
| ] | |
| # %% | |
| LASTTIMESTAMP_TABLE_NAME="lasttimestamp" | |
| #limit telemetry read, use empty string to disabe | |
| LIMIT=" limit 100000" | |
| @task(task_family="Spark initialization") | |
| def init_spark(): | |
| builder = SparkSession.builder | |
| for k, v in spark_conf_lines: | |
| builder = builder.config(k.strip(), v.strip()) | |
| spark = builder.appName(APP_NAME).enableHiveSupport().getOrCreate() | |
| return spark | |
| #functions | |
| @task(task_family=f"Create database {CATALOG_NAME}.{SCHEMA_NAME} if not exists") | |
| def create_database(spark): | |
| spark.sql(f"create database if not exists {CATALOG_NAME}.{SCHEMA_NAME} LOCATION 's3a://{BUCKET_NAME}/'") | |
| @task(task_family=f"Create table {LASTTIMESTAMP_TABLE_NAME} if not exists") | |
| def create_lasttimestamp_table(spark): | |
| spark.sql(f"create table if not exists {CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME} (last_timestamp timestamp) using iceberg") | |
| @task(task_family=f"Create table drone_telemetry_1min if not exists") | |
| def create_dronetelemetry1min_table(spark): | |
| # 1-minute aggregate table | |
| spark.sql(f""" | |
| CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min ( | |
| bucket TIMESTAMP, -- window start (UTC minute) | |
| drone_id STRING, | |
| lat_last DOUBLE, | |
| lon_last DOUBLE, | |
| alt_last DOUBLE, | |
| speed_avg DOUBLE, | |
| speed_max DOUBLE, | |
| airspeed_avg DOUBLE, | |
| heading_avg DOUBLE, | |
| battery_avg DOUBLE, | |
| battery_min DOUBLE, | |
| hdop_avg DOUBLE, | |
| sats_avg DOUBLE, | |
| motor_temp_avg DOUBLE, | |
| motor_temp_max DOUBLE, | |
| link_quality_avg DOUBLE, | |
| cam_active_samples BIGINT, | |
| cam_thermal_samples BIGINT, | |
| samples BIGINT | |
| ) | |
| USING iceberg | |
| """) | |
| @task(task_family=f"Get raw drone telemetry data from PostgresQL") | |
| def get_drone_telemetry_from_postgres(spark): | |
| df = (spark.read.format("jdbc") | |
| .option("url", JDBC_URL) | |
| .option("dbtable", "public.drone_telemetry") | |
| .options(**JDBC_PROPS) | |
| .load() | |
| ) | |
| unique_file_name = "drone-telemetry-data" | |
| with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger: | |
| logger.set(data=df) | |
| return df | |
| @task(task_family=f"Get latest timestamp") | |
| def getLastTimeStamp(spark): | |
| # Read watermark from a small Delta table | |
| wm = spark.read.format("iceberg").load(f"{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}").first() | |
| last_ts = wm["last_timestamp"] if wm else "1970-01-01T00:00:00Z" | |
| return last_ts | |
| @task(task_family=f"Get drone telemetry since latest timestamp") | |
| def getTelemetrySinceLastTimeStamp(spark,lastTimeStamp): | |
| bronze = (spark.read.format("jdbc") | |
| .option("url", JDBC_URL) | |
| .option("dbtable", f"(select * from public.drone_telemetry where ts > '{lastTimeStamp}' order by ts asc {LIMIT}) as q") | |
| .options(**JDBC_PROPS) | |
| .load() | |
| ) | |
| unique_file_name = "drone-telemetry-data-since-latest-timestamp" | |
| with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger: | |
| logger.set(data=bronze) | |
| return bronze | |
| #create 1 minute bucket columns to telemetry data | |
| @task(task_family=f"Create 1 minute bucket columns to telemetry data") | |
| def create1MinuteBucketColumns(spark, droneTelemetryDF): | |
| droneTelemetryDF1Min = droneTelemetryDF.withColumn("bucket", F.window("ts", "1 minute").getField("start")) | |
| #droneTelemetryDF1Min.count() | |
| print("Added 1 minute bucket columns to telemetry data") | |
| # struct-max trick to pick the last lat/lon/alt within each bucket | |
| last_lat_struct = F.max(F.struct("ts", "lat")).alias("s_lat") | |
| last_lon_struct = F.max(F.struct("ts", "lon")).alias("s_lon") | |
| last_alt_struct = F.max(F.struct("ts", "alt_m")).alias("s_alt") | |
| agg = (droneTelemetryDF1Min.groupBy("bucket", "drone_id") | |
| .agg( | |
| last_lat_struct, last_lon_struct, last_alt_struct, | |
| F.avg("speed_mps").alias("speed_avg"), | |
| F.max("speed_mps").alias("speed_max"), | |
| F.avg("airspeed_mps").alias("airspeed_avg"), | |
| F.avg("heading_deg").alias("heading_avg"), | |
| F.avg("battery_pct").alias("battery_avg"), | |
| F.min("battery_pct").alias("battery_min"), | |
| F.avg("gnss_hdop").alias("hdop_avg"), | |
| F.avg("gnss_satellites").alias("sats_avg"), | |
| F.avg("motor_temp_c").alias("motor_temp_avg"), | |
| F.max("motor_temp_c").alias("motor_temp_max"), | |
| F.avg("link_quality_pct").alias("link_quality_avg"), | |
| F.sum(F.col("camera_thermal_active").cast("int")).alias("cam_thermal_samples"), | |
| F.sum(F.col("camera_active").cast("int")).alias("cam_active_samples"), | |
| F.count(F.lit(1)).alias("samples"), | |
| ) | |
| .select( | |
| "bucket", "drone_id", | |
| F.col("s_lat").getField("lat").alias("lat_last"), | |
| F.col("s_lon").getField("lon").alias("lon_last"), | |
| F.col("s_alt").getField("alt_m").alias("alt_last"), | |
| "speed_avg","speed_max","airspeed_avg","heading_avg", | |
| "battery_avg","battery_min", | |
| "hdop_avg","sats_avg", | |
| "motor_temp_avg","motor_temp_max", | |
| "link_quality_avg", | |
| "cam_active_samples","cam_thermal_samples","samples" | |
| ) | |
| ) | |
| print(f"Created aggregated 1 min telemetry data. Aggregated rows: {agg.count()}") | |
| unique_file_name = "drone-telemetry-data-with-1-minute-buckets" | |
| with dataset_op_logger(unique_file_name, "write", with_schema=True, with_preview=True, with_stats=True) as logger: | |
| logger.set(data=agg) | |
| #write/merge data to table so that there are no duplicates | |
| agg.createOrReplaceTempView("agg_tmp") | |
| spark.sql(f""" | |
| MERGE INTO {CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min t | |
| USING agg_tmp s | |
| ON t.bucket = s.bucket AND t.drone_id = s.drone_id | |
| WHEN MATCHED THEN UPDATE SET * | |
| WHEN NOT MATCHED THEN INSERT * | |
| """) | |
| print(f"Wrote {agg.count()} rows of aggregated 1 min telemetry data to table '{CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min'") | |
| # Log writing the Drone telemetry 1 minute buckets | |
| log_dataframe("drone-telemetry-data-with-1-minute-buckets", agg, with_schema=True, with_stats=True) | |
| @task(task_family=f"Update latest telemetry timestamp") | |
| def updateLatestTimeStamp(spark, droneTelemetryDF): | |
| max_ts = droneTelemetryDF.agg(F.max("ts").alias("m")).first()["m"] | |
| print(f"Latest timestamp of telemetry data: {max_ts}") | |
| if max_ts: | |
| df = spark.createDataFrame([(max_ts,)], "last_timestamp timestamp") | |
| df.write.format("iceberg").mode("overwrite").save(f"{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}") | |
| print(f"Stored latest timestamp {max_ts} to '{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}' table") | |
| unique_file_name = "drone-telemetry-data-since-latest-timestamp" | |
| with dataset_op_logger(unique_file_name, "write", with_schema=True, with_preview=True, with_stats=True) as logger: | |
| logger.set(data=df) | |
| #main function | |
| def aggregateRawDataTo1MinBuckets(): | |
| # Current date and time | |
| nowISOString = datetime.now().isoformat() | |
| with dbnd_tracking(job_name="drone-ops-bronze-to-silver-1min", | |
| run_name=f"drone-ops-bronze-to-silver-1min-{nowISOString}", | |
| conf={ | |
| "tracking": { | |
| "track_source_code": True | |
| }, | |
| "log": { | |
| "preview_head_bytes": 512, | |
| "preview_tail_bytes": 512 | |
| } | |
| } | |
| ): | |
| start_time_milliseconds = int(round(time.time() * 1000)) | |
| spark = init_spark() | |
| print("Spark initialized") | |
| create_database(spark) | |
| create_lasttimestamp_table(spark) | |
| create_dronetelemetry1min_table(spark) | |
| print("Database and tables created if they didn't exist") | |
| #get telemetry since last timestamp | |
| lastTimeStamp = getLastTimeStamp(spark) | |
| print(f"Last timestamp: {lastTimeStamp}") | |
| droneTelemetryDF = getTelemetrySinceLastTimeStamp(spark, lastTimeStamp) | |
| telemetryCount = droneTelemetryDF.count() | |
| print(f"Fetched {telemetryCount} rows of telemetry data since {lastTimeStamp}") | |
| create1MinuteBucketColumns(spark, droneTelemetryDF) | |
| updateLatestTimeStamp(spark, droneTelemetryDF) | |
| end_time_milliseconds = int(round(time.time() * 1000)) | |
| elapsed_time = end_time_milliseconds - start_time_milliseconds | |
| log_metric('elapsed-time', elapsed_time) | |
| print("Functions declared") | |
| # %% | |
| print("Raw data aggregation start.") | |
| aggregateRawDataTo1MinBuckets() | |
| print("Raw data aggregation end.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment