Skip to content

Instantly share code, notes, and snippets.

@matthewpick
Last active August 7, 2020 20:44
Show Gist options
  • Save matthewpick/d7ca9504179c84df945d37a8c34ed107 to your computer and use it in GitHub Desktop.
Save matthewpick/d7ca9504179c84df945d37a8c34ed107 to your computer and use it in GitHub Desktop.

Revisions

  1. matthewpick revised this gist Aug 5, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion DeltaWriter.scala
    Original file line number Diff line number Diff line change
    @@ -36,7 +36,7 @@ object DeltaWriter {
    df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(deltaPath)
    generateSymlinkManifest(deltaPath, df.sparkSession)

    } else if (deltaTableExists && deltaTableEmpty) {
    } else if (deltaTableEmpty) {

    df.write.format("delta").save(deltaPath)
    generateSymlinkManifest(deltaPath, df.sparkSession)
  2. matthewpick created this gist Aug 5, 2020.
    73 changes: 73 additions & 0 deletions DeltaWriter.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    import io.delta.tables.DeltaTable
    import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}


    object DeltaWriter {

    def generateSymlinkManifest(deltaPath: String, sparkSession: SparkSession): Unit = {
    val deltaTable = DeltaTable.forPath(sparkSession, deltaPath)
    deltaTable.generate("symlink_format_manifest")
    }

    def write(deltaPath: String, df: DataFrame, primaryKey: String, overwrite: Boolean = false): Unit = {

    val deltaTableExists = DeltaTable.isDeltaTable(df.sparkSession, deltaPath)
    var deltaTableEmpty = true
    var deltaTableTransactionCount = 0L
    var deltaTable: DeltaTable = null

    if (deltaTableExists) {
    try {
    deltaTable = DeltaTable.forPath(df.sparkSession, deltaPath)
    deltaTableTransactionCount = deltaTable.history().count()
    deltaTableEmpty = deltaTableTransactionCount == 0
    } catch {
    case e: AnalysisException =>
    println("Error with " + deltaPath)
    e.printStackTrace()
    }
    }

    if (deltaTable == null) {
    deltaTableEmpty = true
    }

    if (overwrite) {
    df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(deltaPath)
    generateSymlinkManifest(deltaPath, df.sparkSession)

    } else if (deltaTableExists && deltaTableEmpty) {

    df.write.format("delta").save(deltaPath)
    generateSymlinkManifest(deltaPath, df.sparkSession)

    } else {

    try {
    deltaTable.as("oldData")
    .merge(df.as("newData"), s"oldData.${primaryKey} = newData.${primaryKey}")
    .whenMatched("newData.deleted = true")
    .delete()
    .whenMatched
    .updateAll()
    .whenNotMatched
    .insertAll()
    .execute()
    generateSymlinkManifest(deltaPath, df.sparkSession)
    } catch {
    case e: AnalysisException =>
    println("Schemas for " + deltaPath)
    deltaTable.toDF.printSchema()
    df.printSchema()
    throw e
    }

    }
    }

    def vacuum(deltaPath: String, sparkSession: SparkSession): Unit = {
    val deltaTable = DeltaTable.forPath(sparkSession, deltaPath)
    deltaTable.vacuum()
    }

    }