Created
November 8, 2024 21:21
-
-
Save oliverangelil/170bf42bca47517e181f4315756b8695 to your computer and use it in GitHub Desktop.
Recursively Scan Unity Catalog Tables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
def get_table_details(catalogs: list[str]): | |
""" | |
Retrieve detailed information about tables across multiple catalogs and schemas, including row count and size statistics. | |
This function uses Spark SQL to iterate over a list of catalogs, fetching details for each schema and table within | |
each catalog. For each table, it calculates the number of rows, the total size in bytes, and the average size per row | |
(in bytes). This data is stored in a dictionary and converted into a Pandas DataFrame for easy analysis. | |
Parameters: | |
----------- | |
catalogs : list of str | |
A list of catalog names to analyze. | |
Returns: | |
-------- | |
pd.DataFrame | |
A DataFrame containing the details of each table with the following columns: | |
- 'catalog': The catalog name. | |
- 'schema': The schema name. | |
- 'table': The table name. | |
- 'rows': The number of rows in the table. | |
- 'size_mb': The size of the table in megabytes. | |
- 'size_bytes': The size of the table in bytes. | |
- 'bytes_per_row': The average size of each row in bytes. | |
Notes: | |
------ | |
- Temporary tables are excluded from the analysis. | |
- If a table has zero rows, the average size per row calculation is skipped to avoid division by zero errors. | |
""" | |
table_details = {} | |
counter = 0 | |
for catalog in catalogs: | |
spark.sql(f"USE CATALOG {catalog}") | |
schemas = spark.sql(f"SHOW SCHEMAS IN {catalog}").collect() | |
for schema_row in schemas: | |
print(schema_row) | |
schema_name = schema_row["databaseName"] | |
tables = spark.sql(f"SHOW TABLES IN {schema_name}").collect() | |
for table_row in tables: | |
print(counter, table_row) | |
table_name = table_row["tableName"] | |
if not table_row["isTemporary"]: | |
df = spark.table(f"{catalog}.{schema_name}.{table_name}") | |
rows = df.count() | |
size_bytes = df._jdf.queryExecution().optimizedPlan().stats().sizeInBytes() | |
size_mb = size_bytes / (1024 * 1024) | |
try: | |
table_details[counter] = (catalog, schema_name, table_name, rows, size_mb, size_bytes, size_bytes/rows) | |
counter += 1 | |
except ZeroDivisionError as err: | |
print(err) | |
pd_df = pd.DataFrame.from_dict(table_details, orient='index', columns=['catalog', 'schema', 'table', 'rows', 'size_mb', 'size_bytes', 'bytes_per_row']) | |
pd_df = pd_df.sort_values(by='rows', ascending=False).reset_index(drop=True) | |
return pd_df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment