Created
April 6, 2023 20:21
-
-
Save rmoff/aad7c2c5ba3130a180cc12284aa9e72d to your computer and use it in GitHub Desktop.
Using lakeFS and Delta Lake with PySpark in a Jupyter Notebook
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "78797e22", | |
"metadata": {}, | |
"source": [ | |
"# lakeFS and Delta\n", | |
"\n", | |
"This uses the [Everything Bagel](https://github.com/treeverse/lakeFS/tree/master/deployments/compose) Docker Compose environment.\n", | |
"\n", | |
"[@rmoff](https://twitter.com/rmoff/) " | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "3a642edb", | |
"metadata": {}, | |
"source": [ | |
"## Setup" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "3180968f", | |
"metadata": {}, | |
"source": [ | |
"Display version numbers just for info" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "45ba6754", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Kernel: /opt/conda/bin/python\n", | |
"Python version: 3.9.7 | packaged by conda-forge | (default, Oct 10 2021, 15:08:54) \n", | |
"[GCC 9.4.0]\n", | |
"PySpark version: 3.2.0\n" | |
] | |
} | |
], | |
"source": [ | |
"import sys\n", | |
"print(\"Kernel:\", sys.executable)\n", | |
"print(\"Python version:\", sys.version)\n", | |
"\n", | |
"import pyspark\n", | |
"print(\"PySpark version:\", pyspark.__version__)\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "c1e3db35", | |
"metadata": {}, | |
"source": [ | |
"### Spark\n", | |
"\n", | |
"_With the necessary Delta Lake config too_" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "6d3747db", | |
"metadata": { | |
"scrolled": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from pyspark import SparkFiles\n", | |
"from pyspark.sql.session import SparkSession\n", | |
"\n", | |
"spark = (\n", | |
" SparkSession.builder.master(\"local[*]\")\n", | |
" .config(\"spark.jars.packages\", \"io.delta:delta-core_2.12:2.0.0\")\n", | |
" .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", | |
" .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n", | |
" .config(\"spark.delta.logStore.class\", \"org.apache.spark.sql.delta.storage.S3SingleDriverLogStore\")\n", | |
" .config(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", | |
" .config(\"spark.hadoop.fs.s3a.endpoint\", \"http://lakefs:8000\")\n", | |
" .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\")\n", | |
" .config(\"spark.hadoop.fs.s3a.access.key\", \"AKIAIOSFODNN7EXAMPLE\")\n", | |
" .config(\"spark.hadoop.fs.s3a.secret.key\", \"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY\") \n", | |
" .getOrCreate()\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "644a7daa", | |
"metadata": {}, | |
"source": [ | |
"#### Test delta - write/read local" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "e70664be", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"data = spark.range(0, 5)\n", | |
"data.write.format(\"delta\").mode(\"overwrite\").save(\"/tmp/delta-table\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "b53a65c0", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---+\n", | |
"| id|\n", | |
"+---+\n", | |
"| 4|\n", | |
"| 3|\n", | |
"| 2|\n", | |
"| 0|\n", | |
"| 1|\n", | |
"+---+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df = spark.read.format(\"delta\").load(\"/tmp/delta-table\")\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "ffd24df2", | |
"metadata": {}, | |
"source": [ | |
"#### Test delta - write/read lakeFS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "a1d64961", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"data = spark.range(0, 5)\n", | |
"df.write.format(\"delta\").mode('overwrite').save('s3a://example/main/test')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "b5b4b61e", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+---+\n", | |
"| id|\n", | |
"+---+\n", | |
"| 0|\n", | |
"| 4|\n", | |
"| 3|\n", | |
"| 1|\n", | |
"| 2|\n", | |
"+---+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df = spark.read.format(\"delta\").load('s3a://example/main/test')\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "f824b929", | |
"metadata": {}, | |
"source": [ | |
"### LakeFS" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "49c3136c", | |
"metadata": {}, | |
"source": [ | |
"#### Install libraries\n", | |
"\n", | |
"(could be built into the `Dockerfile`)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "f3c34c28", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Requirement already satisfied: lakefs_client in /opt/conda/lib/python3.9/site-packages (0.98.0)\r\n", | |
"Requirement already satisfied: urllib3>=1.25.3 in /opt/conda/lib/python3.9/site-packages (from lakefs_client) (1.26.8)\r\n", | |
"Requirement already satisfied: python-dateutil in /opt/conda/lib/python3.9/site-packages (from lakefs_client) (2.8.2)\r\n", | |
"Requirement already satisfied: six>=1.5 in /opt/conda/lib/python3.9/site-packages (from python-dateutil->lakefs_client) (1.16.0)\r\n" | |
] | |
} | |
], | |
"source": [ | |
"import sys\n", | |
"!{sys.executable} -m pip install lakefs_client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "ef108b55", | |
"metadata": {}, | |
"source": [ | |
"#### Config" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "b2e44584", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import lakefs_client\n", | |
"from lakefs_client import models\n", | |
"from lakefs_client.client import LakeFSClient\n", | |
"from lakefs_client.api import branches_api\n", | |
"from lakefs_client.api import commits_api\n", | |
"\n", | |
"# lakeFS credentials and endpoint\n", | |
"configuration = lakefs_client.Configuration()\n", | |
"configuration.username = 'AKIAIOSFODNN7EXAMPLE'\n", | |
"configuration.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'\n", | |
"configuration.host = 'http://lakefs:8000'\n", | |
"\n", | |
"client = LakeFSClient(configuration)\n", | |
"api_client = lakefs_client.ApiClient(configuration)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "0047139f", | |
"metadata": {}, | |
"source": [ | |
"#### List the current branches in the repository\n", | |
"\n", | |
"https://pydocs.lakefs.io/docs/BranchesApi.html#list_branches" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "9651e04e", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"repo='example'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "bdaa7756", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'main'" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"for b in client.branches.list_branches(repo).results:\n", | |
" display(b.id)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "0a45118a", | |
"metadata": {}, | |
"source": [ | |
"## Load some data into lakeFS" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "ff22dfb5", | |
"metadata": {}, | |
"source": [ | |
"Read a parquet file from URL" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "8a87c7ef", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# The sample parquet file is Apache 2.0 licensed so perhaps include it in the Everything Bagel distribution? \n", | |
"url='https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata1.parquet?raw=true'\n", | |
"spark.sparkContext.addFile(url)\n", | |
"df = spark.read.parquet(\"file://\" + SparkFiles.get(\"userdata1.parquet\"))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "7c22768b", | |
"metadata": {}, | |
"source": [ | |
"How many rows of data?" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"id": "6ec747fe", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"1000" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"display(df.count())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "08f17847", | |
"metadata": {}, | |
"source": [ | |
"What does the data look like?" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"id": "b6268496", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"-RECORD 0--------------------------------\n", | |
" registration_dttm | 2016-02-03 07:55:29 \n", | |
" id | 1 \n", | |
" first_name | Amanda \n", | |
" last_name | Jordan \n", | |
" email | [email protected] \n", | |
" gender | Female \n", | |
" ip_address | 1.197.201.2 \n", | |
" cc | 6759521864920116 \n", | |
" country | Indonesia \n", | |
" birthdate | 3/8/1971 \n", | |
" salary | 49756.53 \n", | |
" title | Internal Auditor \n", | |
" comments | 1E+02 \n", | |
"only showing top 1 row\n", | |
"\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"None" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"display(df.show(n=1,vertical=True))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "b41dab2b", | |
"metadata": {}, | |
"source": [ | |
"## Write data to lakeFS (on the `main` branch) in Delta format" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"id": "40be34f5", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"branch='main'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"id": "68718621", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"df.write.format(\"delta\").mode('overwrite').save('s3a://'+repo+'/'+branch+'/demo/users')" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "6963a378", | |
"metadata": {}, | |
"source": [ | |
"#### ππ»[The data as seen from LakeFS](http://localhost:8000/repositories/example/objects?ref=main&path=demo%2Fusers%2F)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "1bcdcb3c", | |
"metadata": {}, | |
"source": [ | |
"### Commit the new file in `main`\n", | |
"\n", | |
"https://pydocs.lakefs.io/docs/CommitsApi.html#commit" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"id": "0078cc8a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"{'committer': 'docker',\n", | |
" 'creation_date': 1680812443,\n", | |
" 'id': '642d398f9279554c2f2c643492959cf3cd92f2db05a8c666265c45d19eccd130',\n", | |
" 'message': 'Initial user data load',\n", | |
" 'meta_range_id': '',\n", | |
" 'metadata': {},\n", | |
" 'parents': ['7328d2f124d036865a3101e76b9fff9b77425c1ca5cf872329442fe76f97ad5c']}" | |
] | |
}, | |
"execution_count": 16, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from lakefs_client.api import commits_api\n", | |
"from lakefs_client.model.commit import Commit\n", | |
"from lakefs_client.model.commit_creation import CommitCreation\n", | |
"\n", | |
"api_instance = commits_api.CommitsApi(api_client)\n", | |
"commit_creation = CommitCreation(\n", | |
" message=\"Initial user data load\"\n", | |
") \n", | |
"\n", | |
"api_instance.commit(repo, branch, commit_creation)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "dd3c5e94", | |
"metadata": {}, | |
"source": [ | |
"## Create a branch" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"id": "0fa39ade", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"branch='modify_user_data'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"id": "fecd1769", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'642d398f9279554c2f2c643492959cf3cd92f2db05a8c666265c45d19eccd130'" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"from lakefs_client.model.branch_creation import BranchCreation\n", | |
"\n", | |
"api_instance = branches_api.BranchesApi(api_client)\n", | |
"branch_creation = BranchCreation(\n", | |
" name=branch,\n", | |
" source=\"main\",\n", | |
") \n", | |
"\n", | |
"api_response = api_instance.create_branch(repo, branch_creation)\n", | |
"display(api_response)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "69e1a416", | |
"metadata": {}, | |
"source": [ | |
"### List the current branches in the `example` repository" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"id": "118732d6", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'main'" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"'modify_user_data'" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"for b in client.branches.list_branches(repo).results:\n", | |
" display(b.id)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "1378435c", | |
"metadata": {}, | |
"source": [ | |
"## Add some new data with merge" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 20, | |
"id": "ad246bf7", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from delta.tables import *\n", | |
"from pyspark.sql.functions import *" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 21, | |
"id": "fc637861", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# The sample parquet file is Apache 2.0 licensed so perhaps include it in the Everything Bagel distribution? \n", | |
"url='https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata2.parquet?raw=true'\n", | |
"spark.sparkContext.addFile(url)\n", | |
"new_df = spark.read.parquet(\"file://\" + SparkFiles.get(\"userdata2.parquet\"))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 22, | |
"id": "d2f818de", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"users_deltaTable = DeltaTable.forPath(spark, 's3a://'+repo+'/'+branch+'/demo/users')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 23, | |
"id": "b57e93b9", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"users_deltaTable.alias(\"users\").merge(\n", | |
" source = new_df.alias(\"new_users\"),\n", | |
" condition = \"users.id = new_users.id\") \\\n", | |
" .whenNotMatchedInsertAll() \\\n", | |
" .execute()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "bb92fada", | |
"metadata": {}, | |
"source": [ | |
"Commit in lakeFS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 24, | |
"id": "86ae6261", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"{'committer': 'docker',\n", | |
" 'creation_date': 1680812447,\n", | |
" 'id': '2356793bc4723c62b829ea76d241a5d9728094812df22700923c4d495e22944a',\n", | |
" 'message': 'Merge in new user data',\n", | |
" 'meta_range_id': '',\n", | |
" 'metadata': {},\n", | |
" 'parents': ['642d398f9279554c2f2c643492959cf3cd92f2db05a8c666265c45d19eccd130']}" | |
] | |
}, | |
"execution_count": 24, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"api_instance = commits_api.CommitsApi(api_client)\n", | |
"commit_creation = CommitCreation(\n", | |
" message=\"Merge in new user data\"\n", | |
") \n", | |
"\n", | |
"api_instance.commit(repo, branch, commit_creation)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "1b64c675", | |
"metadata": {}, | |
"source": [ | |
"## Update some data" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 25, | |
"id": "28d2041a", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"deltaTable = DeltaTable.forPath(spark, 's3a://'+repo+'/'+branch+'/demo/users')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 26, | |
"id": "28b1cce1", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------+---------------+\n", | |
"| country| ip_address|\n", | |
"+--------+---------------+\n", | |
"| China| 140.35.109.83|\n", | |
"|Portugal| 232.234.81.197|\n", | |
"| China| 246.225.12.189|\n", | |
"| China|172.215.104.127|\n", | |
"| China| 191.88.236.116|\n", | |
"+--------+---------------+\n", | |
"only showing top 5 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"deltaTable.toDF().filter(col(\"country\").isin(\"Portugal\", \"China\")).select(\"country\",\"ip_address\").show(5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 27, | |
"id": "6a2dd8c2", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"deltaTable.update(\n", | |
" condition = \"country == 'Portugal'\",\n", | |
" set = { \"ip_address\" : \"'x.x.x.x'\" })" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 28, | |
"id": "ab657830", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+--------+---------------+\n", | |
"| country| ip_address|\n", | |
"+--------+---------------+\n", | |
"| China| 140.35.109.83|\n", | |
"|Portugal| x.x.x.x|\n", | |
"| China| 246.225.12.189|\n", | |
"| China|172.215.104.127|\n", | |
"| China| 191.88.236.116|\n", | |
"| China| 65.111.200.146|\n", | |
"| China| 252.20.193.145|\n", | |
"|Portugal| x.x.x.x|\n", | |
"| China| 152.6.235.33|\n", | |
"| China| 80.111.141.47|\n", | |
"+--------+---------------+\n", | |
"only showing top 10 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"deltaTable.toDF().filter(col(\"country\").isin(\"Portugal\", \"China\")).select(\"country\",\"ip_address\").show(10)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "00cc8abc", | |
"metadata": {}, | |
"source": [ | |
"Commit in lakeFS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 29, | |
"id": "dd7a6843", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"{'committer': 'docker',\n", | |
" 'creation_date': 1680812449,\n", | |
" 'id': 'ff744a27ac9d092a8dc8b209b21699922a7c32829639987a77ce668837f6f69d',\n", | |
" 'message': 'Mask all IPs for users in Portugal',\n", | |
" 'meta_range_id': '',\n", | |
" 'metadata': {},\n", | |
" 'parents': ['2356793bc4723c62b829ea76d241a5d9728094812df22700923c4d495e22944a']}" | |
] | |
}, | |
"execution_count": 29, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"api_instance = commits_api.CommitsApi(api_client)\n", | |
"commit_creation = CommitCreation(\n", | |
" message=\"Mask all IPs for users in Portugal\"\n", | |
") \n", | |
"\n", | |
"api_instance.commit(repo, branch, commit_creation)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "97239613", | |
"metadata": {}, | |
"source": [ | |
"## Delete some data" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 30, | |
"id": "60544aeb", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"765" | |
] | |
}, | |
"execution_count": 30, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"deltaTable.toDF().filter(col(\"salary\") > 60000).count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 31, | |
"id": "ab9a8a5c", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"deltaTable.delete(col(\"salary\") > 60000)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 32, | |
"id": "c094678f", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"0" | |
] | |
}, | |
"execution_count": 32, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"deltaTable.toDF().filter(col(\"salary\") > 60000).count()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "295fedaa", | |
"metadata": {}, | |
"source": [ | |
"Commit in lakeFS" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 33, | |
"id": "210fd6b0", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"{'committer': 'docker',\n", | |
" 'creation_date': 1680812451,\n", | |
" 'id': 'a0ddd0a1b3091534e4e7b0384e72c65cad818c32810b52561c2fb69d2ef0b813',\n", | |
" 'message': 'Delete users with salary over 60k',\n", | |
" 'meta_range_id': '',\n", | |
" 'metadata': {},\n", | |
" 'parents': ['ff744a27ac9d092a8dc8b209b21699922a7c32829639987a77ce668837f6f69d']}" | |
] | |
}, | |
"execution_count": 33, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"api_instance = commits_api.CommitsApi(api_client)\n", | |
"commit_creation = CommitCreation(\n", | |
" message=\"Delete users with salary over 60k\"\n", | |
") \n", | |
"\n", | |
"api_instance.commit(repo, branch, commit_creation)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "f24fcd35", | |
"metadata": {}, | |
"source": [ | |
"### Look at the data and diffs in LakeFS" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "e127c37e", | |
"metadata": {}, | |
"source": [ | |
"#### ππ» [`main`](http://localhost:8000/repositories/example/objects?ref=main&path=demo%2Fusers%2F)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "0bb2e574", | |
"metadata": {}, | |
"source": [ | |
"#### ππ» [`modify_user_data`](http://localhost:8000/repositories/example/objects?ref=modify_user_data&path=demo%2Fusers%2F)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 34, | |
"id": "6fc556d7", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"1000" | |
] | |
}, | |
"execution_count": 34, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"DeltaTable.forPath(spark, 's3a://example/main/demo/users').toDF().count()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 35, | |
"id": "7b05d9a8", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"236" | |
] | |
}, | |
"execution_count": 35, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"DeltaTable.forPath(spark, 's3a://example/modify_user_data/demo/users').toDF().count()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.9.7" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment