Last active
March 16, 2025 08:47
-
-
Save rajvermacas/b6169945a41e70e85398f20ec6c9e731 to your computer and use it in GitHub Desktop.
Rough
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define the path to the new folder in ADLS | |
val adlsFolderPath = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<folder-path>" | |
// Create the new folder | |
dbutils.fs.mkdirs(adlsFolderPath) | |
// Verify if the folder was created | |
val folderExists = dbutils.fs.ls(adlsFolderPath).nonEmpty | |
if (folderExists) { | |
println(s"Folder created successfully at: $adlsFolderPath") | |
} else { | |
println(s"Failed to create folder at: $adlsFolderPath") | |
} | |
==================================================================== | |
// Check if folder exists | |
dbutils.fs.ls("/path/to/folder").nonEmpty | |
// Alternative more explicit way | |
def folderExists(path: String): Boolean = { | |
try { | |
dbutils.fs.ls(path) | |
true | |
} catch { | |
case _: Exception => false | |
} | |
} | |
// Usage example | |
val folderPath = "/path/to/folder" | |
if (folderExists(folderPath)) { | |
println(s"Folder $folderPath exists") | |
} else { | |
println(s"Folder $folderPath does not exist") | |
} | |
==================================================================== | |
def is_last_weekday_of_month(): | |
from datetime import datetime, timedelta | |
today = datetime.now() | |
# Get the first day of next month | |
first_of_next_month = (today.replace(day=1) + timedelta(days=32)).replace(day=1) | |
# Get the last day of current month | |
last_day = first_of_next_month - timedelta(days=1) | |
# Check if today is the last weekday | |
while last_day.weekday() >= 5: # 5 = Saturday, 6 = Sunday | |
last_day -= timedelta(days=1) | |
return today.date() == last_day.date() | |
==================================================================== | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator | |
from airflow.hooks.base import BaseHook | |
from airflow.models.connection import Connection | |
import requests | |
import json | |
import os | |
# Default arguments for the DAG | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5), | |
} | |
# UAMI and Databricks configuration | |
UAMI_CLIENT_ID = "your-uami-client-id" # Replace with your actual UAMI client ID | |
DATABRICKS_URL = "https://your-databricks-workspace.azuredatabricks.net" # Replace with your Databricks workspace URL | |
DATABRICKS_JOB_ID = "your-databricks-job-id" # Replace with your Databricks job ID | |
# Connection ID that will be created programmatically | |
DATABRICKS_CONN_ID = "databricks_uami_conn" | |
def get_token_from_uami(): | |
""" | |
Get an access token using the User-Assigned Managed Identity. | |
This uses the Azure Instance Metadata Service (IMDS) endpoint. | |
""" | |
metadata_url = "http://169.254.169.254/metadata/identity/oauth2/token" | |
params = { | |
"api-version": "2018-02-01", | |
"resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d", # Resource ID for Azure Databricks | |
"client_id": UAMI_CLIENT_ID | |
} | |
headers = {"Metadata": "true"} | |
response = requests.get(metadata_url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json()["access_token"] | |
def create_databricks_connection(**kwargs): | |
""" | |
Create a Databricks connection using UAMI authentication. | |
This function will be run as the first task in the DAG. | |
""" | |
try: | |
# Check if connection already exists | |
try: | |
connection = BaseHook.get_connection(DATABRICKS_CONN_ID) | |
print(f"Connection {DATABRICKS_CONN_ID} already exists, no need to create it.") | |
return | |
except: | |
print(f"Connection {DATABRICKS_CONN_ID} does not exist, creating it...") | |
# Get token from UAMI | |
token = get_token_from_uami() | |
# Create a new connection | |
conn = Connection( | |
conn_id=DATABRICKS_CONN_ID, | |
conn_type='databricks', | |
host=DATABRICKS_URL, | |
extra=json.dumps({ | |
"token": token, | |
"host": DATABRICKS_URL | |
}) | |
) | |
# Add connection to session | |
from airflow.settings import Session | |
session = Session() | |
session.add(conn) | |
session.commit() | |
print(f"Created connection: {DATABRICKS_CONN_ID}") | |
except Exception as e: | |
print(f"Error creating connection: {str(e)}") | |
raise | |
# Define the DAG | |
with DAG( | |
'databricks_uami_job', | |
default_args=default_args, | |
description='Run Azure Databricks job using UAMI', | |
schedule_interval=timedelta(days=1), | |
start_date=datetime(2025, 3, 15), | |
catchup=False, | |
tags=['databricks', 'uami'], | |
) as dag: | |
# Task to create the Databricks connection | |
create_connection = PythonOperator( | |
task_id='create_databricks_connection', | |
python_callable=create_databricks_connection, | |
provide_context=True, | |
) | |
# Task to run the Databricks job | |
run_databricks_job = DatabricksRunNowOperator( | |
task_id='run_databricks_job', | |
databricks_conn_id=DATABRICKS_CONN_ID, | |
job_id=DATABRICKS_JOB_ID, | |
) | |
# Set task dependencies | |
create_connection >> run_databricks_job |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment