Skip to content

Instantly share code, notes, and snippets.

@erickguan
Last active October 22, 2024 15:49
Show Gist options
  • Save erickguan/80bbf2ea82a10c69d260c53f0dd2f97b to your computer and use it in GitHub Desktop.
Save erickguan/80bbf2ea82a10c69d260c53f0dd2f97b to your computer and use it in GitHub Desktop.
Concurrent list for opendal
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index d61b0f3b1b..3eb33eed46 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -107,6 +107,8 @@ pub struct OpList {
///
/// Default to `false`
version: bool,
+ /// Executor for concurrent list operations
+ executor: Option<Executor>,
}
impl Default for OpList {
@@ -119,6 +121,7 @@ impl Default for OpList {
metakey: Metakey::Mode.into(),
concurrent: 1,
version: false,
+ executor: None,
}
}
}
@@ -193,6 +196,17 @@ impl OpList {
self.concurrent
}
+ /// Get the executor from option
+ pub fn executor(&self) -> Option<&Executor> {
+ self.executor.as_ref()
+ }
+
+ /// Set the executor of the option
+ pub fn with_executor(mut self, executor: Executor) -> Self {
+ self.executor = Some(executor);
+ self
+ }
+
/// Change the version of this list operation
pub fn with_version(mut self, version: bool) -> Self {
self.version = version;
diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs
index 9e3e08e6e6..009b950fa1 100644
--- a/core/src/services/gdrive/lister.rs
+++ b/core/src/services/gdrive/lister.rs
@@ -34,9 +34,22 @@ pub struct GdriveLister {
op: OpList,
}
-async fn stat_file(core: Arc<GdriveCore>, path: &str) -> Result<GdriveFile, Error> {
+// Handle for Google Drive file metadata request
+// Allows deferring `Metadata` creation
+struct GdriveStatHandle {
+ path: Arc<String>,
+ file_type: EntryMode,
+}
+
+// Result from gdrive_stat
+struct GdriveStatResult {
+ gdrive_file: GdriveFile,
+ handle: Arc<GdriveStatHandle>,
+}
+
+async fn stat_file(core: Arc<GdriveCore>, handle: &GdriveStatHandle) -> Result<GdriveFile, Error> {
// reuse gdrive_stat which resolves `file_id` by path via core's `path_cache`.
- let resp = core.gdrive_stat(path).await?;
+ let resp = core.gdrive_stat(handle.path.as_str()).await?;
if resp.status() != StatusCode::OK {
return Err(parse_error(resp));
@@ -90,19 +103,26 @@ impl oio::PageList for GdriveLister {
// Return self at the first page.
if ctx.token.is_empty() && !ctx.done {
- let path = build_rel_path(&self.core.root, &self.path);
+ let path = Arc::new(build_rel_path(&self.core.root, &self.path));
let mut metadata = Metadata::new(EntryMode::DIR);
if stat_file_metadata {
- let gdrive_file = stat_file(self.core.clone(), &path).await?;
+ let handle = GdriveStatHandle {
+ path: path.clone(),
+ file_type: EntryMode::DIR, // not used because we have created metadata
+ };
+ let gdrive_file = stat_file(self.core.clone(), &handle).await?;
if let Some(v) = gdrive_file.size {
metadata.set_content_length(v.parse::<u64>().map_err(|e| {
Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
})?);
}
if let Some(v) = gdrive_file.modified_time {
- metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
- Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
- })?);
+ metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(
+ |e| {
+ Error::new(ErrorKind::Unexpected, "parse last modified time")
+ .set_source(e)
+ },
+ )?);
}
}
let e = oio::Entry::new(&path, metadata);
@@ -118,6 +138,29 @@ impl oio::PageList for GdriveLister {
ctx.done = true;
}
+ let executor = self.op.executor().cloned().unwrap_or_default();
+ let mut tasks = ConcurrentTasks::new(
+ executor,
+ self.op.concurrent(),
+ |input: (Arc<GdriveCore>, Arc<GdriveStatHandle>)| {
+ Box::pin({
+ async move {
+ let handle = input.1.clone();
+ match stat_file(input.0.clone(), &handle).await {
+ Ok(gdrive_file) => (
+ input,
+ Ok(GdriveStatResult {
+ gdrive_file,
+ handle,
+ }),
+ ),
+ Err(err) => (input, Err(err)),
+ }
+ }
+ })
+ },
+ );
+
for mut file in decoded_response.files {
let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" {
if !file.name.ends_with('/') {
@@ -141,23 +184,53 @@ impl oio::PageList for GdriveLister {
let root = &self.core.root;
let normalized_path = build_rel_path(root, &path);
- let mut metadata = Metadata::new(file_type);
if stat_file_metadata {
- let gdrive_file = stat_file(self.core.clone(), &normalized_path).await?;
- if let Some(v) = gdrive_file.size {
- metadata.set_content_length(v.parse::<u64>().map_err(|e| {
- Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
- })?);
- }
- if let Some(v) = gdrive_file.modified_time {
- metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
- Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
- })?);
- }
+ let handle = GdriveStatHandle {
+ path: Arc::new(normalized_path),
+ file_type,
+ };
+ tasks
+ .execute((self.core.clone(), Arc::new(handle)))
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "executor fails to execute the task")
+ .set_source(err)
+ })?;
+ } else {
+ // create entry immediately
+ let metadata = Metadata::new(file_type);
+ let entry = oio::Entry::new(&normalized_path, metadata);
+ ctx.entries.push_back(entry);
}
- let entry = oio::Entry::new(&normalized_path, metadata);
- ctx.entries.push_back(entry);
+ loop {
+ match tasks.next().await.transpose() {
+ Ok(Some(gdrive_stat_meta)) => {
+ let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type);
+ if let Some(v) = gdrive_stat_meta.gdrive_file.size {
+ let content_length = v.parse::<u64>().map_err(|e| {
+ Error::new(ErrorKind::Unexpected, "parse content length")
+ .set_source(e)
+ })?;
+ metadata.set_content_length(content_length);
+ }
+ if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time {
+ let last_modified =
+ v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
+ Error::new(ErrorKind::Unexpected, "parse last modified time")
+ .set_source(e)
+ })?;
+ metadata.set_last_modified(last_modified);
+ }
+
+ let entry =
+ oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata);
+ ctx.entries.push_back(entry);
+ }
+ Ok(None) => break,
+ Err(_) => continue,
+ }
+ }
}
Ok(())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment