Last active
October 22, 2024 15:49
-
-
Save erickguan/80bbf2ea82a10c69d260c53f0dd2f97b to your computer and use it in GitHub Desktop.
Concurrent list for opendal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs | |
index d61b0f3b1b..3eb33eed46 100644 | |
--- a/core/src/raw/ops.rs | |
+++ b/core/src/raw/ops.rs | |
@@ -107,6 +107,8 @@ pub struct OpList { | |
/// | |
/// Default to `false` | |
version: bool, | |
+ /// Executor for concurrent list operations | |
+ executor: Option<Executor>, | |
} | |
impl Default for OpList { | |
@@ -119,6 +121,7 @@ impl Default for OpList { | |
metakey: Metakey::Mode.into(), | |
concurrent: 1, | |
version: false, | |
+ executor: None, | |
} | |
} | |
} | |
@@ -193,6 +196,17 @@ impl OpList { | |
self.concurrent | |
} | |
+ /// Get the executor from option | |
+ pub fn executor(&self) -> Option<&Executor> { | |
+ self.executor.as_ref() | |
+ } | |
+ | |
+ /// Set the executor of the option | |
+ pub fn with_executor(mut self, executor: Executor) -> Self { | |
+ self.executor = Some(executor); | |
+ self | |
+ } | |
+ | |
/// Change the version of this list operation | |
pub fn with_version(mut self, version: bool) -> Self { | |
self.version = version; | |
diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs | |
index 9e3e08e6e6..009b950fa1 100644 | |
--- a/core/src/services/gdrive/lister.rs | |
+++ b/core/src/services/gdrive/lister.rs | |
@@ -34,9 +34,22 @@ pub struct GdriveLister { | |
op: OpList, | |
} | |
-async fn stat_file(core: Arc<GdriveCore>, path: &str) -> Result<GdriveFile, Error> { | |
+// Handle for Google Drive file metadata request | |
+// Allows deferring `Metadata` creation | |
+struct GdriveStatHandle { | |
+ path: Arc<String>, | |
+ file_type: EntryMode, | |
+} | |
+ | |
+// Result from gdrive_stat | |
+struct GdriveStatResult { | |
+ gdrive_file: GdriveFile, | |
+ handle: Arc<GdriveStatHandle>, | |
+} | |
+ | |
+async fn stat_file(core: Arc<GdriveCore>, handle: &GdriveStatHandle) -> Result<GdriveFile, Error> { | |
// reuse gdrive_stat which resolves `file_id` by path via core's `path_cache`. | |
- let resp = core.gdrive_stat(path).await?; | |
+ let resp = core.gdrive_stat(handle.path.as_str()).await?; | |
if resp.status() != StatusCode::OK { | |
return Err(parse_error(resp)); | |
@@ -90,19 +103,26 @@ impl oio::PageList for GdriveLister { | |
// Return self at the first page. | |
if ctx.token.is_empty() && !ctx.done { | |
- let path = build_rel_path(&self.core.root, &self.path); | |
+ let path = Arc::new(build_rel_path(&self.core.root, &self.path)); | |
let mut metadata = Metadata::new(EntryMode::DIR); | |
if stat_file_metadata { | |
- let gdrive_file = stat_file(self.core.clone(), &path).await?; | |
+ let handle = GdriveStatHandle { | |
+ path: path.clone(), | |
+ file_type: EntryMode::DIR, // not used because we have created metadata | |
+ }; | |
+ let gdrive_file = stat_file(self.core.clone(), &handle).await?; | |
if let Some(v) = gdrive_file.size { | |
metadata.set_content_length(v.parse::<u64>().map_err(|e| { | |
Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) | |
})?); | |
} | |
if let Some(v) = gdrive_file.modified_time { | |
- metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| { | |
- Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) | |
- })?); | |
+ metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err( | |
+ |e| { | |
+ Error::new(ErrorKind::Unexpected, "parse last modified time") | |
+ .set_source(e) | |
+ }, | |
+ )?); | |
} | |
} | |
let e = oio::Entry::new(&path, metadata); | |
@@ -118,6 +138,29 @@ impl oio::PageList for GdriveLister { | |
ctx.done = true; | |
} | |
+ let executor = self.op.executor().cloned().unwrap_or_default(); | |
+ let mut tasks = ConcurrentTasks::new( | |
+ executor, | |
+ self.op.concurrent(), | |
+ |input: (Arc<GdriveCore>, Arc<GdriveStatHandle>)| { | |
+ Box::pin({ | |
+ async move { | |
+ let handle = input.1.clone(); | |
+ match stat_file(input.0.clone(), &handle).await { | |
+ Ok(gdrive_file) => ( | |
+ input, | |
+ Ok(GdriveStatResult { | |
+ gdrive_file, | |
+ handle, | |
+ }), | |
+ ), | |
+ Err(err) => (input, Err(err)), | |
+ } | |
+ } | |
+ }) | |
+ }, | |
+ ); | |
+ | |
for mut file in decoded_response.files { | |
let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" { | |
if !file.name.ends_with('/') { | |
@@ -141,23 +184,53 @@ impl oio::PageList for GdriveLister { | |
let root = &self.core.root; | |
let normalized_path = build_rel_path(root, &path); | |
- let mut metadata = Metadata::new(file_type); | |
if stat_file_metadata { | |
- let gdrive_file = stat_file(self.core.clone(), &normalized_path).await?; | |
- if let Some(v) = gdrive_file.size { | |
- metadata.set_content_length(v.parse::<u64>().map_err(|e| { | |
- Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) | |
- })?); | |
- } | |
- if let Some(v) = gdrive_file.modified_time { | |
- metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| { | |
- Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) | |
- })?); | |
- } | |
+ let handle = GdriveStatHandle { | |
+ path: Arc::new(normalized_path), | |
+ file_type, | |
+ }; | |
+ tasks | |
+ .execute((self.core.clone(), Arc::new(handle))) | |
+ .await | |
+ .map_err(|err| { | |
+ Error::new(ErrorKind::Unexpected, "executor fails to execute the task") | |
+ .set_source(err) | |
+ })?; | |
+ } else { | |
+ // create entry immediately | |
+ let metadata = Metadata::new(file_type); | |
+ let entry = oio::Entry::new(&normalized_path, metadata); | |
+ ctx.entries.push_back(entry); | |
} | |
- let entry = oio::Entry::new(&normalized_path, metadata); | |
- ctx.entries.push_back(entry); | |
+ loop { | |
+ match tasks.next().await.transpose() { | |
+ Ok(Some(gdrive_stat_meta)) => { | |
+ let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type); | |
+ if let Some(v) = gdrive_stat_meta.gdrive_file.size { | |
+ let content_length = v.parse::<u64>().map_err(|e| { | |
+ Error::new(ErrorKind::Unexpected, "parse content length") | |
+ .set_source(e) | |
+ })?; | |
+ metadata.set_content_length(content_length); | |
+ } | |
+ if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time { | |
+ let last_modified = | |
+ v.parse::<chrono::DateTime<Utc>>().map_err(|e| { | |
+ Error::new(ErrorKind::Unexpected, "parse last modified time") | |
+ .set_source(e) | |
+ })?; | |
+ metadata.set_last_modified(last_modified); | |
+ } | |
+ | |
+ let entry = | |
+ oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata); | |
+ ctx.entries.push_back(entry); | |
+ } | |
+ Ok(None) => break, | |
+ Err(_) => continue, | |
+ } | |
+ } | |
} | |
Ok(()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment