Last active
December 27, 2022 14:55
-
-
Save johnnyaug/a1f11f11cebcf7b1078e9f5379fa8336 to your computer and use it in GitHub Desktop.
Understanding GC in lakeFS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Databricks notebook source | |
// MAGIC %md | |
// MAGIC ### Understanding Garbage Collection in lakeFS | |
// MAGIC | |
// MAGIC This notebook will allow you to investigate the results of a GC dry run. | |
// MAGIC | |
// MAGIC Run the cells of this notebook one by one. | |
// MAGIC | |
// MAGIC **In the next cell, fill in the repository name.** | |
// COMMAND ---------- | |
val GC_REPO_NAME = "gctest" | |
// COMMAND ---------- | |
// MAGIC %md | |
// MAGIC #### Configuring this notebook | |
// MAGIC | |
// MAGIC Make sure the cluster running this notebook is configured with your lakeFS credentials using the `spark.hadoop.lakefs.api.*` configurations. | |
// MAGIC | |
// MAGIC **If the next cell completes without errors, you're good to go.** | |
// COMMAND ---------- | |
val apiURL = sc.hadoopConfiguration.get("lakefs.api.url") | |
val apiAccessKey = sc.hadoopConfiguration.get("lakefs.api.access_key") | |
val apiSecretKey = sc.hadoopConfiguration.get("lakefs.api.secret_key") | |
import io.lakefs.clients.api | |
import io.lakefs.clients.api.{ConfigApi, RetentionApi, ObjectsApi, RepositoriesApi, CommitsApi, RefsApi} | |
import io.lakefs.clients.api.model._ | |
val client = new api.ApiClient | |
client.setUsername(apiAccessKey) | |
client.setPassword(apiSecretKey) | |
client.setBasePath(apiURL) | |
client.setReadTimeout(30000) | |
client.setConnectTimeout(10000) | |
val commitsApi = new CommitsApi(client) | |
val objectsApi = new ObjectsApi(client) | |
val repositoriesApi = new RepositoriesApi(client) | |
val refsApi = new RefsApi(client) | |
val configApi = new ConfigApi(client) | |
repositoriesApi.getRepository(GC_REPO_NAME); | |
val blockstoreType = configApi.getStorageConfig.getBlockstoreType | |
// COMMAND ---------- | |
// MAGIC %md | |
// MAGIC #### Performing the dry run | |
// MAGIC 1. Run GC with the `spark.hadoop.lakefs.debug.gc.no_delete` configuration set to "true". | |
// MAGIC 2. Take note of the run ID of the GC (this is a UUID4 string) and fill it in the following cell. | |
// COMMAND ---------- | |
val RUN_ID="7824a532-60a1-4155-800c-e6f29a808e62" | |
// COMMAND ---------- | |
// Prepare some addresses for reading and writing metadata: | |
import java.time.format.DateTimeFormatter | |
import org.apache.hadoop.fs._; | |
import scala.collection.mutable.ListBuffer | |
import spark.implicits._ | |
import io.treeverse.clients.ApiClient | |
import java.net.URI | |
val r = repositoriesApi.getRepository(GC_REPO_NAME) | |
val storageNamespace = ApiClient.translateURI(URI.create(r.getStorageNamespace), blockstoreType).toString | |
val GC_COMMITS_PATH = s"${storageNamespace}/_lakefs/retention/gc/commits/run_id=${RUN_ID}/" | |
val GC_ADDRESSES_PATH = s"${storageNamespace}/_lakefs/retention/gc/addresses/run_id=${RUN_ID}/" | |
val GC_PARQUET_METADATA_PREFIX =s"${storageNamespace}/_lakefs/parquet_metadata/" | |
val commits = spark.read.option("header", value = true).option("inferSchema", value = true).csv(GC_COMMITS_PATH) | |
// COMMAND ---------- | |
// MAGIC %md | |
// MAGIC #### Exploring expired commits | |
// MAGIC | |
// MAGIC Expired commits are determined according to garbage collection rules. | |
// MAGIC When all commits pointing to an object are expired, the object can be deleted from the storage. | |
// MAGIC | |
// MAGIC Let's see how many commits are expired, and find some examples. | |
// COMMAND ---------- | |
// prepare configurations and clients | |
import io.treeverse.clients.ConfigMapper | |
import spark.implicits._ | |
import scala.collection.JavaConverters._ | |
val hcValues = sc.broadcast( sc.hadoopConfiguration.iterator.asScala | |
.filter(c => c.getKey.startsWith("fs.") || c.getKey.startsWith("lakefs.")) | |
.map(entry => (entry.getKey, entry.getValue)) | |
.toArray) | |
val configMapper = new ConfigMapper(hcValues) | |
val apiConf = new io.treeverse.clients.APIConfigurations(apiURL, apiAccessKey, apiSecretKey, "1", "10") | |
val getter = new io.treeverse.clients.LakeFSRangeGetter(apiConf, configMapper) | |
val helper = new io.treeverse.clients.LakeFSRangeHelper(getter) | |
val gc = new io.treeverse.clients.GarbageCollector(getter, configMapper) | |
import scala.collection.mutable.HashMap | |
var commitMap = new HashMap[String, Commit] | |
// COMMAND ---------- | |
// get some information about our commits from lakeFS. This may take a few minutes. | |
import io.lakefs.clients.api.model.Commit | |
import scala.collection.JavaConverters._ | |
import spark.implicits._ | |
import org.apache.spark.sql.functions._ | |
import io.treeverse.clients.RequestRetryWrapper | |
val retryWrapper = new RequestRetryWrapper(30000) | |
for (c <- commits.collect()) { | |
var wasHereAlready = commitMap.contains(c(0).asInstanceOf[String]) | |
var offset = "" | |
var hasMore = true | |
while (!wasHereAlready && hasMore) { | |
print(".") | |
val cl = retryWrapper.wrapWithRetry(() => { | |
refsApi.logCommits(GC_REPO_NAME, c(0).asInstanceOf[String], offset, 1000, null, null) | |
}) | |
for (r <- cl.getResults.asScala) { | |
if (commitMap.contains(r.getId)) { | |
wasHereAlready = true | |
} | |
commitMap.put(r.getId, r) | |
} | |
offset = cl.getPagination.getNextOffset | |
hasMore = cl.getPagination.getHasMore | |
} | |
} | |
// save our commits as a dataframe | |
var commitsDF = commitMap.values.map(c=>(c.getId, c.getCreationDate, c.getParents.asScala)).toSeq.toDF("commit_id", "creation_date", "parents") | |
commitsDF.write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}commits") | |
commitsDF = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}commits").withColumn("creation_date", from_unixtime(col("creation_date"))).join(commits, "commit_id") | |
// COMMAND ---------- | |
// How many of our commits are expired? | |
display(commitsDF.groupBy("expired").count) | |
// COMMAND ---------- | |
// Some examples of expired commits: | |
display(commitsDF.filter(commitsDF("expired") === true)) | |
// COMMAND ---------- | |
// Some examples of non-expired commits: | |
display(commitsDF.filter(commitsDF("expired") === false)) | |
// COMMAND ---------- | |
// MAGIC %md | |
// MAGIC #### Exploring specific deletions | |
// MAGIC | |
// MAGIC The following cells will allow you to dive in and understand why specific objects were marked for deletion. | |
// COMMAND ---------- | |
// Collect metadata about the GC run and about your repository. | |
// This is the heavy stuff. It can take 15 minutes or more, depending on the size of your cluster. | |
// Change the number of partitions to 4 * (number of cores in your cluster) | |
var NUM_PARTITIONS = 4 * 192 | |
import org.apache.spark.sql.functions.rand | |
import org.apache.spark.sql.functions._ | |
var addresses = spark.read.parquet(GC_ADDRESSES_PATH) | |
val commitIDs = commits.select("commit_id").as[String].repartition(NUM_PARTITIONS) | |
// get a dataset of all commit-range pairs: | |
helper.getRangeIDsWithCommitID(commitIDs, GC_REPO_NAME).toDF("commit_id", "range_id").write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}commit_ranges") | |
val commitRanges = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}commit_ranges") | |
// get all distinct range IDs | |
commitRanges.select("range_id").distinct.write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}range_ids") | |
val rangeIDs = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}range_ids").select("range_id").as[String].repartition(NUM_PARTITIONS) | |
// get a dataset of all range-address-logical_key tuple: | |
val rangeAddresses = helper.getRangeEntriesWithRangeID(rangeIDs, GC_REPO_NAME).toDF("range_id", "address", "key") | |
rangeAddresses.write.partitionBy("range_id").mode("overwrite").option("partitionOverwriteMode", "dynamic").parquet(s"${GC_PARQUET_METADATA_PREFIX}/addresses") | |
val allRanges = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}/addresses/") | |
// COMMAND ---------- | |
// Write a summary of this GC run | |
addresses | |
.join(allRanges.as("ar"), "address") | |
.join(commitRanges.as("cr"), "range_id") | |
.join(commitsDF.as("c"), "commit_id") | |
.select("ar.address","ar.key","cr.commit_id","c.expired", "c.creation_date") | |
.write.mode("overwrite").parquet(s"${GC_PARQUET_METADATA_PREFIX}summary/run_id=${RUN_ID}") | |
val summaryDF = spark.read.parquet(s"${GC_PARQUET_METADATA_PREFIX}summary/run_id=${RUN_ID}") | |
// COMMAND ---------- | |
// How many objects were marked for deletion? | |
addresses.count | |
// COMMAND ---------- | |
// These are some objects that were marked for deletion from your storage. | |
// Those are addresses in the underlying storage, so they shouldn't mean a lot. | |
import org.apache.spark.sql.functions.rand | |
display(addresses.orderBy(rand()).limit(10)) | |
// COMMAND ---------- | |
// choose an address from the previous cell and paste it here: | |
val ADDRESS_TO_RESEARCH = "b563daccbbff464ea430b551dbf14431" | |
// we will see the commits containing this object. these are not necessarily commits where it was changed, but commits where the object was present. | |
// the commits will be sorted in descending creation order, so the ones at the top are likely the last ones in which the object appeared | |
// therefore, the children of these commits are good candidates for commits where the object was deleted. | |
// the children column will contain links to the UI, where you can see whether this object was indeed deleted in this commit. | |
display( | |
summaryDF.as("f") | |
.filter(col("address") === ADDRESS_TO_RESEARCH) | |
.join(commitsDF.as("cd"), array_contains(col("cd.parents"), col("f.commit_id"))) | |
.select(col("f.*"), concat(lit(apiURL.replace("api/v1", s"repositories/${GC_REPO_NAME}/commits/")), col("cd.commit_id"), lit("?prefix="), col("key")).as("children")) | |
.groupBy("address", "key", "f.commit_id") | |
.agg(min("expired").as("expired"),max("f.creation_date").as("creation_date"), collect_set("children").as("children")) | |
.orderBy(desc("creation_date")).limit(1000) | |
.select("f.key","f.commit_id","creation_date","expired","children") | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment