-
-
Save anthonny/817517f37f4977e8daae279d38e83bb0 to your computer and use it in GitHub Desktop.
Revisions
-
longcao revised this gist
Jan 4, 2017 . 1 changed file with 6 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -26,10 +26,12 @@ def rowsToInputStream(rows: Iterator[Row], delimiter: String): InputStream = { (row.mkString(delimiter) + "\n").getBytes }.flatten new InputStream { override def read(): Int = if (bytes.hasNext) { bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255 } else { -1 } } } -
longcao revised this gist
Jul 11, 2016 . 1 changed file with 4 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -26,8 +26,10 @@ def rowsToInputStream(rows: Iterator[Row], delimiter: String): InputStream = { (row.mkString(delimiter) + "\n").getBytes }.flatten override def read(): Int = if (bytes.hasNext) { bytes.next & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255 } else { -1 } } -
longcao revised this gist
Jul 11, 2016 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -31,6 +31,7 @@ def rowsToInputStream(rows: Iterator[Row], delimiter: String): InputStream = { } } // Beware: this will open a db connection for every partition of your DataFrame. frame.foreachPartition { rows => val conn = cf() val cm = new CopyManager(conn.asInstanceOf[BaseConnection]) -
longcao revised this gist
Jul 11, 2016 . No changes.There are no files selected for viewing
-
longcao created this gist
Jul 11, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,43 @@ import java.io.InputStream import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.{ DataFrame, Row } import org.postgresql.copy.CopyManager import org.postgresql.core.BaseConnection val jdbcUrl = s"jdbc:postgresql://..." // db credentials elided val connectionProperties = { val props = new java.util.Properties() props.setProperty("driver", "org.postgresql.Driver") props } // Spark reads the "driver" property to allow users to override the default driver selected, otherwise // it picks the Redshift driver, which doesn't support JDBC CopyManager. // https://github.com/apache/spark/blob/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L44-51 val cf: () => Connection = JdbcUtils.createConnectionFactory(jdbcUrl, connectionProperties) // Convert every partition (an `Iterator[Row]`) to bytes (InputStream) def rowsToInputStream(rows: Iterator[Row], delimiter: String): InputStream = { val bytes: Iterator[Byte] = rows.map { row => (row.mkString(delimiter) + "\n").getBytes }.flatten new InputStream { override def read(): Int = if (bytes.hasNext) bytes.next.toInt else -1 } } frame.foreachPartition { rows => val conn = cf() val cm = new CopyManager(conn.asInstanceOf[BaseConnection]) cm.copyIn( """COPY my_schema._mytable FROM STDIN WITH (NULL 'null', FORMAT CSV, DELIMITER E'\t')""", // adjust COPY settings as you desire, options from https://www.postgresql.org/docs/9.5/static/sql-copy.html rowsToInputStream(rows, "\t")) conn.close() }