Last active
October 22, 2024 15:49
-
-
Save erickguan/80bbf2ea82a10c69d260c53f0dd2f97b to your computer and use it in GitHub Desktop.
Revisions
-
erickguan revised this gist
Oct 22, 2024 . 1 changed file with 72 additions and 117 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -38,13 +38,14 @@ index d61b0f3b1b..3eb33eed46 100644 pub fn with_version(mut self, version: bool) -> Self { self.version = version; diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs index 9e3e08e6e6..009b950fa1 100644 --- a/core/src/services/gdrive/lister.rs +++ b/core/src/services/gdrive/lister.rs @@ -34,9 +34,22 @@ pub struct GdriveLister { op: OpList, } -async fn stat_file(core: Arc<GdriveCore>, path: &str) -> Result<GdriveFile, Error> { +// Handle for Google Drive file metadata request +// Allows deferring `Metadata` creation +struct GdriveStatHandle { @@ -59,103 +60,45 @@ index d2c6bbbbf1..79ebfa0083 100644 +} + +async fn stat_file(core: Arc<GdriveCore>, handle: &GdriveStatHandle) -> Result<GdriveFile, Error> { // reuse gdrive_stat which resolves `file_id` by path via core's `path_cache`. - let resp = core.gdrive_stat(path).await?; + let resp = core.gdrive_stat(handle.path.as_str()).await?; if resp.status() != StatusCode::OK { return Err(parse_error(resp)); @@ -90,19 +103,26 @@ impl oio::PageList for GdriveLister { // Return self at the first page. if ctx.token.is_empty() && !ctx.done { - let path = build_rel_path(&self.core.root, &self.path); + let path = Arc::new(build_rel_path(&self.core.root, &self.path)); let mut metadata = Metadata::new(EntryMode::DIR); if stat_file_metadata { - let gdrive_file = stat_file(self.core.clone(), &path).await?; + let handle = GdriveStatHandle { + path: path.clone(), + file_type: EntryMode::DIR, // not used because we have created metadata + }; + let gdrive_file = stat_file(self.core.clone(), &handle).await?; if let Some(v) = gdrive_file.size { metadata.set_content_length(v.parse::<u64>().map_err(|e| { Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) })?); } if let Some(v) = gdrive_file.modified_time { - metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) - })?); + metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err( + |e| { + Error::new(ErrorKind::Unexpected, "parse last modified time") + .set_source(e) + }, + )?); } } let e = oio::Entry::new(&path, metadata); @@ -118,6 +138,29 @@ impl oio::PageList for GdriveLister { ctx.done = true; } @@ -185,16 +128,23 @@ index d2c6bbbbf1..79ebfa0083 100644 for mut file in decoded_response.files { let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" { if !file.name.ends_with('/') { @@ -141,23 +184,53 @@ impl oio::PageList for GdriveLister { let root = &self.core.root; let normalized_path = build_rel_path(root, &path); - let mut metadata = Metadata::new(file_type); if stat_file_metadata { - let gdrive_file = stat_file(self.core.clone(), &normalized_path).await?; - if let Some(v) = gdrive_file.size { - metadata.set_content_length(v.parse::<u64>().map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) - })?); - } - if let Some(v) = gdrive_file.modified_time { - metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) - })?); - } + let handle = GdriveStatHandle { + path: Arc::new(normalized_path), + file_type, @@ -211,32 +161,37 @@ index d2c6bbbbf1..79ebfa0083 100644 + let metadata = Metadata::new(file_type); + let entry = oio::Entry::new(&normalized_path, metadata); + ctx.entries.push_back(entry); } - let entry = oio::Entry::new(&normalized_path, metadata); - ctx.entries.push_back(entry); + loop { + match tasks.next().await.transpose() { + Ok(Some(gdrive_stat_meta)) => { + let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type); + if let Some(v) = gdrive_stat_meta.gdrive_file.size { + let content_length = v.parse::<u64>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse content length") + .set_source(e) + })?; + metadata.set_content_length(content_length); + } + if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time { + let last_modified = + v.parse::<chrono::DateTime<Utc>>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse last modified time") + .set_source(e) + })?; + metadata.set_last_modified(last_modified); + } + + let entry = + oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata); + ctx.entries.push_back(entry); + } + Ok(None) => break, + Err(_) => continue, + } + } } -
erickguan created this gist
Oct 22, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,243 @@ diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index d61b0f3b1b..3eb33eed46 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -107,6 +107,8 @@ pub struct OpList { /// /// Default to `false` version: bool, + /// Executor for concurrent list operations + executor: Option<Executor>, } impl Default for OpList { @@ -119,6 +121,7 @@ impl Default for OpList { metakey: Metakey::Mode.into(), concurrent: 1, version: false, + executor: None, } } } @@ -193,6 +196,17 @@ impl OpList { self.concurrent } + /// Get the executor from option + pub fn executor(&self) -> Option<&Executor> { + self.executor.as_ref() + } + + /// Set the executor of the option + pub fn with_executor(mut self, executor: Executor) -> Self { + self.executor = Some(executor); + self + } + /// Change the version of this list operation pub fn with_version(mut self, version: bool) -> Self { self.version = version; diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs index d2c6bbbbf1..79ebfa0083 100644 --- a/core/src/services/gdrive/lister.rs +++ b/core/src/services/gdrive/lister.rs @@ -34,44 +34,38 @@ pub struct GdriveLister { op: OpList, } +// Handle for Google Drive file metadata request +// Allows deferring `Metadata` creation +struct GdriveStatHandle { + path: Arc<String>, + file_type: EntryMode, +} + +// Result from gdrive_stat +struct GdriveStatResult { + gdrive_file: GdriveFile, + handle: Arc<GdriveStatHandle>, +} + +async fn stat_file(core: Arc<GdriveCore>, handle: &GdriveStatHandle) -> Result<GdriveFile, Error> { + // reuse gdrive_stat which will get "file id" by `path_cache` in the `core` + let resp = core.gdrive_stat(handle.path.as_str()).await?; + + if resp.status() != StatusCode::OK { + return Err(parse_error(resp)); + } + + let bs = resp.into_body(); + let gdrive_file: GdriveFile = + serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; + + Ok(gdrive_file) +} + impl GdriveLister { pub fn new(path: String, core: Arc<GdriveCore>, op: OpList) -> Self { Self { path, core, op } } - - async fn get_meta( - &self, - file_type: EntryMode, - normalized_path: &String, - ) -> Result<Metadata, Error> { - let mut meta = Metadata::new(file_type); - if self.op.metakey().contains(Metakey::ContentLength) - || self.op.metakey().contains(Metakey::LastModified) - { - let resp = self.core.gdrive_stat(&normalized_path).await?; - - if resp.status() != StatusCode::OK { - return Err(parse_error(resp)); - } - - let bs = resp.into_body(); - let gdrive_file: GdriveFile = - serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; - - if let Some(v) = gdrive_file.size { - meta.set_content_length(v.parse::<u64>().map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) - })?); - } - if let Some(v) = gdrive_file.modified_time { - meta.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) - })?); - } - }; - - Ok(meta) - } } impl oio::PageList for GdriveLister { @@ -102,12 +96,36 @@ impl oio::PageList for GdriveLister { return Ok(()); } + let stat_metadata = !self + .op + .metakey() + .is_disjoint(Metakey::ContentLength | Metakey::LastModified); + // Return self at the first page. if ctx.token.is_empty() && !ctx.done { - let path = build_rel_path(&self.core.root, &self.path); - let meta = self.get_meta(EntryMode::DIR, &path).await?; - let e = oio::Entry::new(&path, meta); - ctx.entries.push_back(e); + let path = Arc::new(build_rel_path(&self.core.root, &self.path)); + let mut metadata = Metadata::new(EntryMode::DIR); + if stat_metadata { + let handle = GdriveStatHandle { + path: path.clone(), + file_type: EntryMode::DIR, // not used because we have created metadata + }; + let gdrive_file = stat_file(self.core.clone(), &handle).await?; + if let Some(v) = gdrive_file.size { + let content_length = v.parse::<u64>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) + })?; + metadata.set_content_length(content_length); + } + if let Some(v) = gdrive_file.modified_time { + let last_modified = v.parse::<chrono::DateTime<Utc>>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e) + })?; + metadata.set_last_modified(last_modified); + } + } + let entry = oio::Entry::new(&path, metadata); + ctx.entries.push_back(entry); } let decoded_response = @@ -119,6 +137,29 @@ impl oio::PageList for GdriveLister { ctx.done = true; } + let executor = self.op.executor().cloned().unwrap_or_default(); + let mut tasks = ConcurrentTasks::new( + executor, + self.op.concurrent(), + |input: (Arc<GdriveCore>, Arc<GdriveStatHandle>)| { + Box::pin({ + async move { + let handle = input.1.clone(); + match stat_file(input.0.clone(), &handle).await { + Ok(gdrive_file) => ( + input, + Ok(GdriveStatResult { + gdrive_file, + handle, + }), + ), + Err(err) => (input, Err(err)), + } + } + }) + }, + ); + for mut file in decoded_response.files { let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" { if !file.name.ends_with('/') { @@ -142,10 +183,52 @@ impl oio::PageList for GdriveLister { let root = &self.core.root; let normalized_path = build_rel_path(root, &path); - let meta = self.get_meta(file_type, &normalized_path).await?; - let entry = oio::Entry::new(&normalized_path, meta); - ctx.entries.push_back(entry); + // enqueue and run tasks + if stat_metadata { + let handle = GdriveStatHandle { + path: Arc::new(normalized_path), + file_type, + }; + tasks + .execute((self.core.clone(), Arc::new(handle))) + .await + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "executor fails to execute the task") + .set_source(err) + })?; + } else { + // create entry immediately + let metadata = Metadata::new(file_type); + let entry = oio::Entry::new(&normalized_path, metadata); + ctx.entries.push_back(entry); + } + } + + loop { + match tasks.next().await.transpose() { + Ok(Some(gdrive_stat_meta)) => { + let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type); + if let Some(v) = gdrive_stat_meta.gdrive_file.size { + let content_length = v.parse::<u64>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e) + })?; + metadata.set_content_length(content_length); + } + if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time { + let last_modified = v.parse::<chrono::DateTime<Utc>>().map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse last modified time") + .set_source(e) + })?; + metadata.set_last_modified(last_modified); + } + + let entry = oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata); + ctx.entries.push_back(entry); + } + Ok(None) => break, + Err(_) => continue, + } } Ok(())