Skip to content

Instantly share code, notes, and snippets.

@rajvermacas
Last active March 16, 2025 08:47
Show Gist options
  • Save rajvermacas/b6169945a41e70e85398f20ec6c9e731 to your computer and use it in GitHub Desktop.
Save rajvermacas/b6169945a41e70e85398f20ec6c9e731 to your computer and use it in GitHub Desktop.
Rough
// Define the path to the new folder in ADLS
val adlsFolderPath = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<folder-path>"
// Create the new folder
dbutils.fs.mkdirs(adlsFolderPath)
// Verify if the folder was created
val folderExists = dbutils.fs.ls(adlsFolderPath).nonEmpty
if (folderExists) {
println(s"Folder created successfully at: $adlsFolderPath")
} else {
println(s"Failed to create folder at: $adlsFolderPath")
}
====================================================================
// Check if folder exists
dbutils.fs.ls("/path/to/folder").nonEmpty
// Alternative more explicit way
def folderExists(path: String): Boolean = {
try {
dbutils.fs.ls(path)
true
} catch {
case _: Exception => false
}
}
// Usage example
val folderPath = "/path/to/folder"
if (folderExists(folderPath)) {
println(s"Folder $folderPath exists")
} else {
println(s"Folder $folderPath does not exist")
}
====================================================================
def is_last_weekday_of_month():
from datetime import datetime, timedelta
today = datetime.now()
# Get the first day of next month
first_of_next_month = (today.replace(day=1) + timedelta(days=32)).replace(day=1)
# Get the last day of current month
last_day = first_of_next_month - timedelta(days=1)
# Check if today is the last weekday
while last_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday
last_day -= timedelta(days=1)
return today.date() == last_day.date()
====================================================================
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
import requests
import json
import os
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# UAMI and Databricks configuration
UAMI_CLIENT_ID = "your-uami-client-id" # Replace with your actual UAMI client ID
DATABRICKS_URL = "https://your-databricks-workspace.azuredatabricks.net" # Replace with your Databricks workspace URL
DATABRICKS_JOB_ID = "your-databricks-job-id" # Replace with your Databricks job ID
# Connection ID that will be created programmatically
DATABRICKS_CONN_ID = "databricks_uami_conn"
def get_token_from_uami():
"""
Get an access token using the User-Assigned Managed Identity.
This uses the Azure Instance Metadata Service (IMDS) endpoint.
"""
metadata_url = "http://169.254.169.254/metadata/identity/oauth2/token"
params = {
"api-version": "2018-02-01",
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d", # Resource ID for Azure Databricks
"client_id": UAMI_CLIENT_ID
}
headers = {"Metadata": "true"}
response = requests.get(metadata_url, headers=headers, params=params)
response.raise_for_status()
return response.json()["access_token"]
def create_databricks_connection(**kwargs):
"""
Create a Databricks connection using UAMI authentication.
This function will be run as the first task in the DAG.
"""
try:
# Check if connection already exists
try:
connection = BaseHook.get_connection(DATABRICKS_CONN_ID)
print(f"Connection {DATABRICKS_CONN_ID} already exists, no need to create it.")
return
except:
print(f"Connection {DATABRICKS_CONN_ID} does not exist, creating it...")
# Get token from UAMI
token = get_token_from_uami()
# Create a new connection
conn = Connection(
conn_id=DATABRICKS_CONN_ID,
conn_type='databricks',
host=DATABRICKS_URL,
extra=json.dumps({
"token": token,
"host": DATABRICKS_URL
})
)
# Add connection to session
from airflow.settings import Session
session = Session()
session.add(conn)
session.commit()
print(f"Created connection: {DATABRICKS_CONN_ID}")
except Exception as e:
print(f"Error creating connection: {str(e)}")
raise
# Define the DAG
with DAG(
'databricks_uami_job',
default_args=default_args,
description='Run Azure Databricks job using UAMI',
schedule_interval=timedelta(days=1),
start_date=datetime(2025, 3, 15),
catchup=False,
tags=['databricks', 'uami'],
) as dag:
# Task to create the Databricks connection
create_connection = PythonOperator(
task_id='create_databricks_connection',
python_callable=create_databricks_connection,
provide_context=True,
)
# Task to run the Databricks job
run_databricks_job = DatabricksRunNowOperator(
task_id='run_databricks_job',
databricks_conn_id=DATABRICKS_CONN_ID,
job_id=DATABRICKS_JOB_ID,
)
# Set task dependencies
create_connection >> run_databricks_job
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment