Skip to content

Instantly share code, notes, and snippets.

@oliverangelil
Created November 8, 2024 21:21
Show Gist options
  • Save oliverangelil/170bf42bca47517e181f4315756b8695 to your computer and use it in GitHub Desktop.
Save oliverangelil/170bf42bca47517e181f4315756b8695 to your computer and use it in GitHub Desktop.
Recursively Scan Unity Catalog Tables
import pandas as pd
def get_table_details(catalogs: list[str]):
"""
Retrieve detailed information about tables across multiple catalogs and schemas, including row count and size statistics.
This function uses Spark SQL to iterate over a list of catalogs, fetching details for each schema and table within
each catalog. For each table, it calculates the number of rows, the total size in bytes, and the average size per row
(in bytes). This data is stored in a dictionary and converted into a Pandas DataFrame for easy analysis.
Parameters:
-----------
catalogs : list of str
A list of catalog names to analyze.
Returns:
--------
pd.DataFrame
A DataFrame containing the details of each table with the following columns:
- 'catalog': The catalog name.
- 'schema': The schema name.
- 'table': The table name.
- 'rows': The number of rows in the table.
- 'size_mb': The size of the table in megabytes.
- 'size_bytes': The size of the table in bytes.
- 'bytes_per_row': The average size of each row in bytes.
Notes:
------
- Temporary tables are excluded from the analysis.
- If a table has zero rows, the average size per row calculation is skipped to avoid division by zero errors.
"""
table_details = {}
counter = 0
for catalog in catalogs:
spark.sql(f"USE CATALOG {catalog}")
schemas = spark.sql(f"SHOW SCHEMAS IN {catalog}").collect()
for schema_row in schemas:
print(schema_row)
schema_name = schema_row["databaseName"]
tables = spark.sql(f"SHOW TABLES IN {schema_name}").collect()
for table_row in tables:
print(counter, table_row)
table_name = table_row["tableName"]
if not table_row["isTemporary"]:
df = spark.table(f"{catalog}.{schema_name}.{table_name}")
rows = df.count()
size_bytes = df._jdf.queryExecution().optimizedPlan().stats().sizeInBytes()
size_mb = size_bytes / (1024 * 1024)
try:
table_details[counter] = (catalog, schema_name, table_name, rows, size_mb, size_bytes, size_bytes/rows)
counter += 1
except ZeroDivisionError as err:
print(err)
pd_df = pd.DataFrame.from_dict(table_details, orient='index', columns=['catalog', 'schema', 'table', 'rows', 'size_mb', 'size_bytes', 'bytes_per_row'])
pd_df = pd_df.sort_values(by='rows', ascending=False).reset_index(drop=True)
return pd_df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment