Skip to content

Instantly share code, notes, and snippets.

@harshavardhana
Last active February 24, 2023 21:07
Show Gist options
  • Select an option

  • Save harshavardhana/84ffa04b41f31c4804b230d80fddd1c7 to your computer and use it in GitHub Desktop.

Select an option

Save harshavardhana/84ffa04b41f31c4804b230d80fddd1c7 to your computer and use it in GitHub Desktop.

Revisions

  1. harshavardhana revised this gist Feb 24, 2023. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions spark-checkpointing.md
    Original file line number Diff line number Diff line change
    @@ -218,29 +218,29 @@ Actual objects on namespace without versioning lookup
    205
    ```

    Optimization can be seen in terms of total time taken for Batch '0'

    #### Optimization can be seen in terms of total time taken for Batch '0'
    | Without Optimization | With Optimization |
    |------------------|-----------------|
    | 72secs | 17secs|

    Total number of namespace pollution
    #### Total number of namespace pollution
    | Total DEL markers without optimization | Total DEL markers with optimization |
    |------------------|-----------------|
    | 409 | 0 |

    Total number of excess objects on namespace
    #### Total number of excess objects on namespace
    | Total excess objects without optimization | Total excess objects with optimization |
    |------------------|-----------------|
    | 818 (out of which 409 are DEL markers) | 0 |

    Total number of API calls
    #### Total number of API calls
    | Total number of API calls without optimization | Total number of API calls with optimization |
    |------------------|-----------------|
    | 6938 | 224 |

    The amount of excess calls to object ratio
    #### The amount of excess calls to object ratio
    | API Calls / Objects without optimization | API Calls / objects with optimization |
    |------------------|-----------------|
    | 33.8x | 1.09x |

    This tells us that default checkpoint implementation shipped with Spark is very poorly optmized to use object storage and it is recommended that Direct Checkpointing to be used instead as the benefits are visible beyond doubt.
  2. harshavardhana revised this gist Feb 24, 2023. 1 changed file with 38 additions and 0 deletions.
    38 changes: 38 additions & 0 deletions spark-checkpointing.md
    Original file line number Diff line number Diff line change
    @@ -110,6 +110,12 @@ and almost `409` delete markers (soft deletes)
    409
    ```

    Actual objects on namespace without versioning lookup
    ```
    ~ mc ls -r myminio/process-runner/ | wc -l
    205
    ```

    ### After Direct Checkpointing Write Optimization

    ```
    @@ -206,3 +212,35 @@ Actual number of valid objects
    205
    ```

    Actual objects on namespace without versioning lookup
    ```
    ~ mc ls -r myminio/process-runner/ | wc -l
    205
    ```

    Optimization can be seen in terms of total time taken for Batch '0'

    | Without Optimization | With Optimization |
    |------------------|-----------------|
    | 72secs | 17secs|

    Total number of namespace pollution
    | Total DEL markers without optimization | Total DEL markers with optimization |
    |------------------|-----------------|
    | 409 | 0 |

    Total number of excess objects on namespace
    | Total excess objects without optimization | Total excess objects with optimization |
    |------------------|-----------------|
    | 818 (out of which 409 are DEL markers) | 0 |

    Total number of API calls
    | Total number of API calls without optimization | Total number of API calls with optimization |
    |------------------|-----------------|
    | 6938 | 224 |

    The amount of excess calls to object ratio
    | API Calls / Objects without optimization | API Calls / objects with optimization |
    |------------------|-----------------|
    | 33.8x | 1.09x |

  3. harshavardhana revised this gist Feb 24, 2023. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions spark-checkpointing.md
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,16 @@
    ### Spark-shell with S3A based checkpointing

    ```
    Spark context Web UI available at http://nirisvara:4040
    Spark context available as 'sc' (master = local[*], app id = local-1677271782301).
    Spark session available as 'spark'.
    Welcome to
    ____ __
    / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/ '_/
    /___/ .__/\_,_/_/ /_/\_\ version 3.3.2
    /_/
    Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
    Type in expressions to have them evaluated.
    Type :help for more information.
  4. harshavardhana created this gist Feb 24, 2023.
    198 changes: 198 additions & 0 deletions spark-checkpointing.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,198 @@
    ### Spark-shell with S3A based checkpointing

    ```
    Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> :load SparkStreamingFromDirectory-S3A.scala
    Loading SparkStreamingFromDirectory-S3A.scala...
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    defined object SparkStreamingFromDirectory
    scala> SparkStreamingFromDirectory.main(Array(""))
    23/02/25 02:14:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
    root
    |-- RecordNumber: integer (nullable = true)
    |-- Zipcode: string (nullable = true)
    |-- ZipCodeType: string (nullable = true)
    |-- City: string (nullable = true)
    |-- State: string (nullable = true)
    |-- LocationType: string (nullable = true)
    |-- Lat: string (nullable = true)
    |-- Long: string (nullable = true)
    |-- Xaxis: string (nullable = true)
    |-- Yaxis: string (nullable = true)
    |-- Zaxis: string (nullable = true)
    |-- WorldRegion: string (nullable = true)
    |-- Country: string (nullable = true)
    |-- LocationText: string (nullable = true)
    |-- Location: string (nullable = true)
    |-- Decommisioned: string (nullable = true)
    root
    |-- Zipcode: string (nullable = true)
    |-- count: long (nullable = false)
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-------+-----+
    |Zipcode|count|
    +-------+-----+
    |76166 |2 |
    |32564 |2 |
    |85210 |2 |
    |36275 |3 |
    |709 |3 |
    |35146 |3 |
    |708 |2 |
    |35585 |3 |
    |32046 |2 |
    |27203 |4 |
    |34445 |2 |
    |27007 |4 |
    |704 |10 |
    |27204 |4 |
    |34487 |2 |
    |85209 |2 |
    |76177 |4 |
    +-------+-----+
    ```

    Amount of calls
    ```
    mc support top api myminio/
    API RX TX CALLS ERRORS
    s3.CopyObject 48 KiB 47 KiB 208 0
    s3.DeleteMultipleObjects 146 KiB 47 KiB 417 0
    s3.DeleteObject 32 KiB 0 B 211 0
    s3.GetObject 168 B 1.3 KiB 1 0
    s3.HeadObject 441 KiB 0 B 2950 0
    s3.ListObjectsV2 408 KiB 1.4 MiB 2732 0
    s3.PutObject 128 KiB 0 B 419 0
    Summary:
    Total: 6938 CALLS, 1.2 MiB RX, 1.5 MiB TX - in 72.36s
    ```

    The amount of files left over in the wake of this behavior on a versioned buckets.

    ```
    ~ mc ls -r --versions myminio/process-runner/ | wc -l
    1023
    ```

    Our of which `614` actual objects

    ```
    ~ mc ls -r --versions myminio/process-runner/ | grep PUT | wc -l
    614
    ```

    and almost `409` delete markers (soft deletes)

    ```
    ~ mc ls -r --versions myminio/process-runner/ | grep DEL | wc -l
    409
    ```

    ### After Direct Checkpointing Write Optimization

    ```
    Spark context Web UI available at http://nirisvara:4040
    Spark context available as 'sc' (master = local[*], app id = local-1677271782301).
    Spark session available as 'spark'.
    Welcome to
    ____ __
    / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/ '_/
    /___/ .__/\_,_/_/ /_/\_\ version 3.3.2
    /_/
    Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> :load SparkStreamingFromDirectory
    SparkStreamingFromDirectory-S3A.scala SparkStreamingFromDirectory.scala
    scala> :load SparkStreamingFromDirectory.scala
    Loading SparkStreamingFromDirectory.scala...
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    defined object SparkStreamingFromDirectory
    scala> SparkStreamingFromDirectory.main(Array(""))
    23/02/25 02:20:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
    root
    |-- RecordNumber: integer (nullable = true)
    |-- Zipcode: string (nullable = true)
    |-- ZipCodeType: string (nullable = true)
    |-- City: string (nullable = true)
    |-- State: string (nullable = true)
    |-- LocationType: string (nullable = true)
    |-- Lat: string (nullable = true)
    |-- Long: string (nullable = true)
    |-- Xaxis: string (nullable = true)
    |-- Yaxis: string (nullable = true)
    |-- Zaxis: string (nullable = true)
    |-- WorldRegion: string (nullable = true)
    |-- Country: string (nullable = true)
    |-- LocationText: string (nullable = true)
    |-- Location: string (nullable = true)
    |-- Decommisioned: string (nullable = true)
    root
    |-- Zipcode: string (nullable = true)
    |-- count: long (nullable = false)
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-------+-----+
    |Zipcode|count|
    +-------+-----+
    |76166 |2 |
    |32564 |2 |
    |85210 |2 |
    |36275 |3 |
    |709 |3 |
    |35146 |3 |
    |708 |2 |
    |35585 |3 |
    |32046 |2 |
    |27203 |4 |
    |34445 |2 |
    |27007 |4 |
    |704 |10 |
    |27204 |4 |
    |34487 |2 |
    |85209 |2 |
    |76177 |4 |
    +-------+-----+
    ```

    ```
    ~ mc support top api myminio/
    API RX TX CALLS ERRORS
    s3.GetObject 159 B 1.3 KiB 1 0
    s3.HeadObject 1.5 KiB 0 B 10 0
    s3.ListObjectVersions 765 B 2.0 KiB 5 0
    s3.PutObject 88 KiB 0 B 208 0
    Summary:
    Total: 224 CALLS, 90 KiB RX, 3.3 KiB TX - in 17.00s
    ```

    Actual number of valid objects
    ```
    ~ mc ls -r --versions myminio/process-runner/ | wc -l
    205
    ```