Skip to content

Instantly share code, notes, and snippets.

@samisalkosuo
Created November 17, 2025 11:22
Show Gist options
  • Select an option

  • Save samisalkosuo/d5494aefc4df2f7a691906cf09fbbde8 to your computer and use it in GitHub Desktop.

Select an option

Save samisalkosuo/d5494aefc4df2f7a691906cf09fbbde8 to your computer and use it in GitHub Desktop.
Sample Spark-application running on IBM watsonx.data and decorated with IBM Databand SDK.
# %% [markdown]
# Drone Ops raw data from PostgreSQL to watsonx.data
#
# Uses Databand. Code uses Databand Python SDK to decorate with Databand tracking.
#
# See https://www.ibm.com/docs/en/dobd?topic=python-tracking-functions.
#
# See also Databand docs https://www.ibm.com/docs/en/dobd?topic=integrations-code-based-workflows.
# %%
#variables
APP_NAME="drone-ops-bronze-to-silver-1min"
SCHEMA_NAME = "drone_swarm_ops"
CATALOG_NAME = "bucket_01"
BUCKET_NAME = "bucket-01"
JDBC_URL = "jdbc:postgresql://postgres.lab-postgres.svc.cluster.local:5432/drone_swarm_ops"
JDBC_PROPS = {"user":"demo", "password":"passw0rd", "driver":"org.postgresql.Driver"}
#imports
from pyspark.sql import SparkSession
import os
from datetime import datetime, timedelta
import time
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col, to_utc_timestamp
import base64,getpass
import warnings
from dbnd import dbnd_tracking, task, dataset_op_logger, log_metric, log_dataframe
warnings.filterwarnings('ignore')
# %%
#init Spark
#spark config from watsonx.data Infrastructure Manager Spark engines Default Spark configuration
spark_config="""
spark.config.line.1
spark.config.line.2
"""
# Parse non-empty, non-comment lines into key/value pairs
spark_conf_lines = [
line.strip().split("=", 1)
for line in spark_config.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# %%
LASTTIMESTAMP_TABLE_NAME="lasttimestamp"
#limit telemetry read, use empty string to disabe
LIMIT=" limit 100000"
@task(task_family="Spark initialization")
def init_spark():
builder = SparkSession.builder
for k, v in spark_conf_lines:
builder = builder.config(k.strip(), v.strip())
spark = builder.appName(APP_NAME).enableHiveSupport().getOrCreate()
return spark
#functions
@task(task_family=f"Create database {CATALOG_NAME}.{SCHEMA_NAME} if not exists")
def create_database(spark):
spark.sql(f"create database if not exists {CATALOG_NAME}.{SCHEMA_NAME} LOCATION 's3a://{BUCKET_NAME}/'")
@task(task_family=f"Create table {LASTTIMESTAMP_TABLE_NAME} if not exists")
def create_lasttimestamp_table(spark):
spark.sql(f"create table if not exists {CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME} (last_timestamp timestamp) using iceberg")
@task(task_family=f"Create table drone_telemetry_1min if not exists")
def create_dronetelemetry1min_table(spark):
# 1-minute aggregate table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min (
bucket TIMESTAMP, -- window start (UTC minute)
drone_id STRING,
lat_last DOUBLE,
lon_last DOUBLE,
alt_last DOUBLE,
speed_avg DOUBLE,
speed_max DOUBLE,
airspeed_avg DOUBLE,
heading_avg DOUBLE,
battery_avg DOUBLE,
battery_min DOUBLE,
hdop_avg DOUBLE,
sats_avg DOUBLE,
motor_temp_avg DOUBLE,
motor_temp_max DOUBLE,
link_quality_avg DOUBLE,
cam_active_samples BIGINT,
cam_thermal_samples BIGINT,
samples BIGINT
)
USING iceberg
""")
@task(task_family=f"Get raw drone telemetry data from PostgresQL")
def get_drone_telemetry_from_postgres(spark):
df = (spark.read.format("jdbc")
.option("url", JDBC_URL)
.option("dbtable", "public.drone_telemetry")
.options(**JDBC_PROPS)
.load()
)
unique_file_name = "drone-telemetry-data"
with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=df)
return df
@task(task_family=f"Get latest timestamp")
def getLastTimeStamp(spark):
# Read watermark from a small Delta table
wm = spark.read.format("iceberg").load(f"{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}").first()
last_ts = wm["last_timestamp"] if wm else "1970-01-01T00:00:00Z"
return last_ts
@task(task_family=f"Get drone telemetry since latest timestamp")
def getTelemetrySinceLastTimeStamp(spark,lastTimeStamp):
bronze = (spark.read.format("jdbc")
.option("url", JDBC_URL)
.option("dbtable", f"(select * from public.drone_telemetry where ts > '{lastTimeStamp}' order by ts asc {LIMIT}) as q")
.options(**JDBC_PROPS)
.load()
)
unique_file_name = "drone-telemetry-data-since-latest-timestamp"
with dataset_op_logger(unique_file_name, "read", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=bronze)
return bronze
#create 1 minute bucket columns to telemetry data
@task(task_family=f"Create 1 minute bucket columns to telemetry data")
def create1MinuteBucketColumns(spark, droneTelemetryDF):
droneTelemetryDF1Min = droneTelemetryDF.withColumn("bucket", F.window("ts", "1 minute").getField("start"))
#droneTelemetryDF1Min.count()
print("Added 1 minute bucket columns to telemetry data")
# struct-max trick to pick the last lat/lon/alt within each bucket
last_lat_struct = F.max(F.struct("ts", "lat")).alias("s_lat")
last_lon_struct = F.max(F.struct("ts", "lon")).alias("s_lon")
last_alt_struct = F.max(F.struct("ts", "alt_m")).alias("s_alt")
agg = (droneTelemetryDF1Min.groupBy("bucket", "drone_id")
.agg(
last_lat_struct, last_lon_struct, last_alt_struct,
F.avg("speed_mps").alias("speed_avg"),
F.max("speed_mps").alias("speed_max"),
F.avg("airspeed_mps").alias("airspeed_avg"),
F.avg("heading_deg").alias("heading_avg"),
F.avg("battery_pct").alias("battery_avg"),
F.min("battery_pct").alias("battery_min"),
F.avg("gnss_hdop").alias("hdop_avg"),
F.avg("gnss_satellites").alias("sats_avg"),
F.avg("motor_temp_c").alias("motor_temp_avg"),
F.max("motor_temp_c").alias("motor_temp_max"),
F.avg("link_quality_pct").alias("link_quality_avg"),
F.sum(F.col("camera_thermal_active").cast("int")).alias("cam_thermal_samples"),
F.sum(F.col("camera_active").cast("int")).alias("cam_active_samples"),
F.count(F.lit(1)).alias("samples"),
)
.select(
"bucket", "drone_id",
F.col("s_lat").getField("lat").alias("lat_last"),
F.col("s_lon").getField("lon").alias("lon_last"),
F.col("s_alt").getField("alt_m").alias("alt_last"),
"speed_avg","speed_max","airspeed_avg","heading_avg",
"battery_avg","battery_min",
"hdop_avg","sats_avg",
"motor_temp_avg","motor_temp_max",
"link_quality_avg",
"cam_active_samples","cam_thermal_samples","samples"
)
)
print(f"Created aggregated 1 min telemetry data. Aggregated rows: {agg.count()}")
unique_file_name = "drone-telemetry-data-with-1-minute-buckets"
with dataset_op_logger(unique_file_name, "write", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=agg)
#write/merge data to table so that there are no duplicates
agg.createOrReplaceTempView("agg_tmp")
spark.sql(f"""
MERGE INTO {CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min t
USING agg_tmp s
ON t.bucket = s.bucket AND t.drone_id = s.drone_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
print(f"Wrote {agg.count()} rows of aggregated 1 min telemetry data to table '{CATALOG_NAME}.{SCHEMA_NAME}.drone_telemetry_1min'")
# Log writing the Drone telemetry 1 minute buckets
log_dataframe("drone-telemetry-data-with-1-minute-buckets", agg, with_schema=True, with_stats=True)
@task(task_family=f"Update latest telemetry timestamp")
def updateLatestTimeStamp(spark, droneTelemetryDF):
max_ts = droneTelemetryDF.agg(F.max("ts").alias("m")).first()["m"]
print(f"Latest timestamp of telemetry data: {max_ts}")
if max_ts:
df = spark.createDataFrame([(max_ts,)], "last_timestamp timestamp")
df.write.format("iceberg").mode("overwrite").save(f"{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}")
print(f"Stored latest timestamp {max_ts} to '{CATALOG_NAME}.{SCHEMA_NAME}.{LASTTIMESTAMP_TABLE_NAME}' table")
unique_file_name = "drone-telemetry-data-since-latest-timestamp"
with dataset_op_logger(unique_file_name, "write", with_schema=True, with_preview=True, with_stats=True) as logger:
logger.set(data=df)
#main function
def aggregateRawDataTo1MinBuckets():
# Current date and time
nowISOString = datetime.now().isoformat()
with dbnd_tracking(job_name="drone-ops-bronze-to-silver-1min",
run_name=f"drone-ops-bronze-to-silver-1min-{nowISOString}",
conf={
"tracking": {
"track_source_code": True
},
"log": {
"preview_head_bytes": 512,
"preview_tail_bytes": 512
}
}
):
start_time_milliseconds = int(round(time.time() * 1000))
spark = init_spark()
print("Spark initialized")
create_database(spark)
create_lasttimestamp_table(spark)
create_dronetelemetry1min_table(spark)
print("Database and tables created if they didn't exist")
#get telemetry since last timestamp
lastTimeStamp = getLastTimeStamp(spark)
print(f"Last timestamp: {lastTimeStamp}")
droneTelemetryDF = getTelemetrySinceLastTimeStamp(spark, lastTimeStamp)
telemetryCount = droneTelemetryDF.count()
print(f"Fetched {telemetryCount} rows of telemetry data since {lastTimeStamp}")
create1MinuteBucketColumns(spark, droneTelemetryDF)
updateLatestTimeStamp(spark, droneTelemetryDF)
end_time_milliseconds = int(round(time.time() * 1000))
elapsed_time = end_time_milliseconds - start_time_milliseconds
log_metric('elapsed-time', elapsed_time)
print("Functions declared")
# %%
print("Raw data aggregation start.")
aggregateRawDataTo1MinBuckets()
print("Raw data aggregation end.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment