Skip to content

Instantly share code, notes, and snippets.

@erickguan
Last active October 22, 2024 15:49
Show Gist options
  • Select an option

  • Save erickguan/80bbf2ea82a10c69d260c53f0dd2f97b to your computer and use it in GitHub Desktop.

Select an option

Save erickguan/80bbf2ea82a10c69d260c53f0dd2f97b to your computer and use it in GitHub Desktop.

Revisions

  1. erickguan revised this gist Oct 22, 2024. 1 changed file with 72 additions and 117 deletions.
    189 changes: 72 additions & 117 deletions gdrive.patch
    Original file line number Diff line number Diff line change
    @@ -38,13 +38,14 @@ index d61b0f3b1b..3eb33eed46 100644
    pub fn with_version(mut self, version: bool) -> Self {
    self.version = version;
    diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs
    index d2c6bbbbf1..79ebfa0083 100644
    index 9e3e08e6e6..009b950fa1 100644
    --- a/core/src/services/gdrive/lister.rs
    +++ b/core/src/services/gdrive/lister.rs
    @@ -34,44 +34,38 @@ pub struct GdriveLister {
    @@ -34,9 +34,22 @@ pub struct GdriveLister {
    op: OpList,
    }

    -async fn stat_file(core: Arc<GdriveCore>, path: &str) -> Result<GdriveFile, Error> {
    +// Handle for Google Drive file metadata request
    +// Allows deferring `Metadata` creation
    +struct GdriveStatHandle {
    @@ -59,103 +60,45 @@ index d2c6bbbbf1..79ebfa0083 100644
    +}
    +
    +async fn stat_file(core: Arc<GdriveCore>, handle: &GdriveStatHandle) -> Result<GdriveFile, Error> {
    + // reuse gdrive_stat which will get "file id" by `path_cache` in the `core`
    // reuse gdrive_stat which resolves `file_id` by path via core's `path_cache`.
    - let resp = core.gdrive_stat(path).await?;
    + let resp = core.gdrive_stat(handle.path.as_str()).await?;
    +
    + if resp.status() != StatusCode::OK {
    + return Err(parse_error(resp));
    + }
    +
    + let bs = resp.into_body();
    + let gdrive_file: GdriveFile =
    + serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
    +
    + Ok(gdrive_file)
    +}
    +
    impl GdriveLister {
    pub fn new(path: String, core: Arc<GdriveCore>, op: OpList) -> Self {
    Self { path, core, op }
    }
    -
    - async fn get_meta(
    - &self,
    - file_type: EntryMode,
    - normalized_path: &String,
    - ) -> Result<Metadata, Error> {
    - let mut meta = Metadata::new(file_type);
    - if self.op.metakey().contains(Metakey::ContentLength)
    - || self.op.metakey().contains(Metakey::LastModified)
    - {
    - let resp = self.core.gdrive_stat(&normalized_path).await?;
    -
    - if resp.status() != StatusCode::OK {
    - return Err(parse_error(resp));
    - }
    -
    - let bs = resp.into_body();
    - let gdrive_file: GdriveFile =
    - serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
    -
    - if let Some(v) = gdrive_file.size {
    - meta.set_content_length(v.parse::<u64>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    - })?);
    - }
    - if let Some(v) = gdrive_file.modified_time {
    - meta.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
    - })?);
    - }
    - };
    -
    - Ok(meta)
    - }
    }

    impl oio::PageList for GdriveLister {
    @@ -102,12 +96,36 @@ impl oio::PageList for GdriveLister {
    return Ok(());
    }
    if resp.status() != StatusCode::OK {
    return Err(parse_error(resp));
    @@ -90,19 +103,26 @@ impl oio::PageList for GdriveLister {

    + let stat_metadata = !self
    + .op
    + .metakey()
    + .is_disjoint(Metakey::ContentLength | Metakey::LastModified);
    +
    // Return self at the first page.
    if ctx.token.is_empty() && !ctx.done {
    - let path = build_rel_path(&self.core.root, &self.path);
    - let meta = self.get_meta(EntryMode::DIR, &path).await?;
    - let e = oio::Entry::new(&path, meta);
    - ctx.entries.push_back(e);
    + let path = Arc::new(build_rel_path(&self.core.root, &self.path));
    + let mut metadata = Metadata::new(EntryMode::DIR);
    + if stat_metadata {
    let mut metadata = Metadata::new(EntryMode::DIR);
    if stat_file_metadata {
    - let gdrive_file = stat_file(self.core.clone(), &path).await?;
    + let handle = GdriveStatHandle {
    + path: path.clone(),
    + file_type: EntryMode::DIR, // not used because we have created metadata
    + };
    + let gdrive_file = stat_file(self.core.clone(), &handle).await?;
    + if let Some(v) = gdrive_file.size {
    + let content_length = v.parse::<u64>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    + })?;
    + metadata.set_content_length(content_length);
    + }
    + if let Some(v) = gdrive_file.modified_time {
    + let last_modified = v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
    + })?;
    + metadata.set_last_modified(last_modified);
    + }
    + }
    + let entry = oio::Entry::new(&path, metadata);
    + ctx.entries.push_back(entry);
    }

    let decoded_response =
    @@ -119,6 +137,29 @@ impl oio::PageList for GdriveLister {
    if let Some(v) = gdrive_file.size {
    metadata.set_content_length(v.parse::<u64>().map_err(|e| {
    Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    })?);
    }
    if let Some(v) = gdrive_file.modified_time {
    - metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
    - })?);
    + metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(
    + |e| {
    + Error::new(ErrorKind::Unexpected, "parse last modified time")
    + .set_source(e)
    + },
    + )?);
    }
    }
    let e = oio::Entry::new(&path, metadata);
    @@ -118,6 +138,29 @@ impl oio::PageList for GdriveLister {
    ctx.done = true;
    }

    @@ -185,16 +128,23 @@ index d2c6bbbbf1..79ebfa0083 100644
    for mut file in decoded_response.files {
    let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" {
    if !file.name.ends_with('/') {
    @@ -142,10 +183,52 @@ impl oio::PageList for GdriveLister {
    @@ -141,23 +184,53 @@ impl oio::PageList for GdriveLister {

    let root = &self.core.root;
    let normalized_path = build_rel_path(root, &path);
    - let meta = self.get_meta(file_type, &normalized_path).await?;

    - let entry = oio::Entry::new(&normalized_path, meta);
    - ctx.entries.push_back(entry);
    + // enqueue and run tasks
    + if stat_metadata {
    - let mut metadata = Metadata::new(file_type);
    if stat_file_metadata {
    - let gdrive_file = stat_file(self.core.clone(), &normalized_path).await?;
    - if let Some(v) = gdrive_file.size {
    - metadata.set_content_length(v.parse::<u64>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    - })?);
    - }
    - if let Some(v) = gdrive_file.modified_time {
    - metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
    - })?);
    - }
    + let handle = GdriveStatHandle {
    + path: Arc::new(normalized_path),
    + file_type,
    @@ -211,32 +161,37 @@ index d2c6bbbbf1..79ebfa0083 100644
    + let metadata = Metadata::new(file_type);
    + let entry = oio::Entry::new(&normalized_path, metadata);
    + ctx.entries.push_back(entry);
    + }
    + }
    }

    - let entry = oio::Entry::new(&normalized_path, metadata);
    - ctx.entries.push_back(entry);
    + loop {
    + match tasks.next().await.transpose() {
    + Ok(Some(gdrive_stat_meta)) => {
    + let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type);
    + if let Some(v) = gdrive_stat_meta.gdrive_file.size {
    + let content_length = v.parse::<u64>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse content length")
    + .set_source(e)
    + })?;
    + metadata.set_content_length(content_length);
    + }
    + if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time {
    + let last_modified =
    + v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse last modified time")
    + .set_source(e)
    + })?;
    + metadata.set_last_modified(last_modified);
    + }
    +
    + loop {
    + match tasks.next().await.transpose() {
    + Ok(Some(gdrive_stat_meta)) => {
    + let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type);
    + if let Some(v) = gdrive_stat_meta.gdrive_file.size {
    + let content_length = v.parse::<u64>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    + })?;
    + metadata.set_content_length(content_length);
    + let entry =
    + oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata);
    + ctx.entries.push_back(entry);
    + }
    + if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time {
    + let last_modified = v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse last modified time")
    + .set_source(e)
    + })?;
    + metadata.set_last_modified(last_modified);
    + }
    +
    + let entry = oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata);
    + ctx.entries.push_back(entry);
    + Ok(None) => break,
    + Err(_) => continue,
    + }
    + Ok(None) => break,
    + Err(_) => continue,
    + }
    }

  2. erickguan created this gist Oct 22, 2024.
    243 changes: 243 additions & 0 deletions gdrive.patch
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,243 @@
    diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
    index d61b0f3b1b..3eb33eed46 100644
    --- a/core/src/raw/ops.rs
    +++ b/core/src/raw/ops.rs
    @@ -107,6 +107,8 @@ pub struct OpList {
    ///
    /// Default to `false`
    version: bool,
    + /// Executor for concurrent list operations
    + executor: Option<Executor>,
    }

    impl Default for OpList {
    @@ -119,6 +121,7 @@ impl Default for OpList {
    metakey: Metakey::Mode.into(),
    concurrent: 1,
    version: false,
    + executor: None,
    }
    }
    }
    @@ -193,6 +196,17 @@ impl OpList {
    self.concurrent
    }

    + /// Get the executor from option
    + pub fn executor(&self) -> Option<&Executor> {
    + self.executor.as_ref()
    + }
    +
    + /// Set the executor of the option
    + pub fn with_executor(mut self, executor: Executor) -> Self {
    + self.executor = Some(executor);
    + self
    + }
    +
    /// Change the version of this list operation
    pub fn with_version(mut self, version: bool) -> Self {
    self.version = version;
    diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs
    index d2c6bbbbf1..79ebfa0083 100644
    --- a/core/src/services/gdrive/lister.rs
    +++ b/core/src/services/gdrive/lister.rs
    @@ -34,44 +34,38 @@ pub struct GdriveLister {
    op: OpList,
    }

    +// Handle for Google Drive file metadata request
    +// Allows deferring `Metadata` creation
    +struct GdriveStatHandle {
    + path: Arc<String>,
    + file_type: EntryMode,
    +}
    +
    +// Result from gdrive_stat
    +struct GdriveStatResult {
    + gdrive_file: GdriveFile,
    + handle: Arc<GdriveStatHandle>,
    +}
    +
    +async fn stat_file(core: Arc<GdriveCore>, handle: &GdriveStatHandle) -> Result<GdriveFile, Error> {
    + // reuse gdrive_stat which will get "file id" by `path_cache` in the `core`
    + let resp = core.gdrive_stat(handle.path.as_str()).await?;
    +
    + if resp.status() != StatusCode::OK {
    + return Err(parse_error(resp));
    + }
    +
    + let bs = resp.into_body();
    + let gdrive_file: GdriveFile =
    + serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
    +
    + Ok(gdrive_file)
    +}
    +
    impl GdriveLister {
    pub fn new(path: String, core: Arc<GdriveCore>, op: OpList) -> Self {
    Self { path, core, op }
    }
    -
    - async fn get_meta(
    - &self,
    - file_type: EntryMode,
    - normalized_path: &String,
    - ) -> Result<Metadata, Error> {
    - let mut meta = Metadata::new(file_type);
    - if self.op.metakey().contains(Metakey::ContentLength)
    - || self.op.metakey().contains(Metakey::LastModified)
    - {
    - let resp = self.core.gdrive_stat(&normalized_path).await?;
    -
    - if resp.status() != StatusCode::OK {
    - return Err(parse_error(resp));
    - }
    -
    - let bs = resp.into_body();
    - let gdrive_file: GdriveFile =
    - serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
    -
    - if let Some(v) = gdrive_file.size {
    - meta.set_content_length(v.parse::<u64>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    - })?);
    - }
    - if let Some(v) = gdrive_file.modified_time {
    - meta.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    - Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
    - })?);
    - }
    - };
    -
    - Ok(meta)
    - }
    }

    impl oio::PageList for GdriveLister {
    @@ -102,12 +96,36 @@ impl oio::PageList for GdriveLister {
    return Ok(());
    }

    + let stat_metadata = !self
    + .op
    + .metakey()
    + .is_disjoint(Metakey::ContentLength | Metakey::LastModified);
    +
    // Return self at the first page.
    if ctx.token.is_empty() && !ctx.done {
    - let path = build_rel_path(&self.core.root, &self.path);
    - let meta = self.get_meta(EntryMode::DIR, &path).await?;
    - let e = oio::Entry::new(&path, meta);
    - ctx.entries.push_back(e);
    + let path = Arc::new(build_rel_path(&self.core.root, &self.path));
    + let mut metadata = Metadata::new(EntryMode::DIR);
    + if stat_metadata {
    + let handle = GdriveStatHandle {
    + path: path.clone(),
    + file_type: EntryMode::DIR, // not used because we have created metadata
    + };
    + let gdrive_file = stat_file(self.core.clone(), &handle).await?;
    + if let Some(v) = gdrive_file.size {
    + let content_length = v.parse::<u64>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    + })?;
    + metadata.set_content_length(content_length);
    + }
    + if let Some(v) = gdrive_file.modified_time {
    + let last_modified = v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse last modified time").set_source(e)
    + })?;
    + metadata.set_last_modified(last_modified);
    + }
    + }
    + let entry = oio::Entry::new(&path, metadata);
    + ctx.entries.push_back(entry);
    }

    let decoded_response =
    @@ -119,6 +137,29 @@ impl oio::PageList for GdriveLister {
    ctx.done = true;
    }

    + let executor = self.op.executor().cloned().unwrap_or_default();
    + let mut tasks = ConcurrentTasks::new(
    + executor,
    + self.op.concurrent(),
    + |input: (Arc<GdriveCore>, Arc<GdriveStatHandle>)| {
    + Box::pin({
    + async move {
    + let handle = input.1.clone();
    + match stat_file(input.0.clone(), &handle).await {
    + Ok(gdrive_file) => (
    + input,
    + Ok(GdriveStatResult {
    + gdrive_file,
    + handle,
    + }),
    + ),
    + Err(err) => (input, Err(err)),
    + }
    + }
    + })
    + },
    + );
    +
    for mut file in decoded_response.files {
    let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" {
    if !file.name.ends_with('/') {
    @@ -142,10 +183,52 @@ impl oio::PageList for GdriveLister {

    let root = &self.core.root;
    let normalized_path = build_rel_path(root, &path);
    - let meta = self.get_meta(file_type, &normalized_path).await?;

    - let entry = oio::Entry::new(&normalized_path, meta);
    - ctx.entries.push_back(entry);
    + // enqueue and run tasks
    + if stat_metadata {
    + let handle = GdriveStatHandle {
    + path: Arc::new(normalized_path),
    + file_type,
    + };
    + tasks
    + .execute((self.core.clone(), Arc::new(handle)))
    + .await
    + .map_err(|err| {
    + Error::new(ErrorKind::Unexpected, "executor fails to execute the task")
    + .set_source(err)
    + })?;
    + } else {
    + // create entry immediately
    + let metadata = Metadata::new(file_type);
    + let entry = oio::Entry::new(&normalized_path, metadata);
    + ctx.entries.push_back(entry);
    + }
    + }
    +
    + loop {
    + match tasks.next().await.transpose() {
    + Ok(Some(gdrive_stat_meta)) => {
    + let mut metadata = Metadata::new(gdrive_stat_meta.handle.file_type);
    + if let Some(v) = gdrive_stat_meta.gdrive_file.size {
    + let content_length = v.parse::<u64>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
    + })?;
    + metadata.set_content_length(content_length);
    + }
    + if let Some(v) = gdrive_stat_meta.gdrive_file.modified_time {
    + let last_modified = v.parse::<chrono::DateTime<Utc>>().map_err(|e| {
    + Error::new(ErrorKind::Unexpected, "parse last modified time")
    + .set_source(e)
    + })?;
    + metadata.set_last_modified(last_modified);
    + }
    +
    + let entry = oio::Entry::new(gdrive_stat_meta.handle.path.as_str(), metadata);
    + ctx.entries.push_back(entry);
    + }
    + Ok(None) => break,
    + Err(_) => continue,
    + }
    }

    Ok(())