-
-
Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.
Revisions
-
davideicardi revised this gist
Oct 22, 2018 . 1 changed file with 96 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,96 @@ import org.apache.avro.Schema import test.avro.User object SampleAvroEvolve { def main(args: Array[String]): Unit = { // writeV1() val result = readV2fromV1() println(result) } def readV1() = { val schemaV1 = new Schema.Parser().parse( """ |{ | "namespace": "test.avro", | "type": "record", | "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": "int"}, | {"name": "favorite_color", "type": "string"} | ] |} """.stripMargin ) val bytesV1 = avroUtils.readFile("/tmp/userv1.bin") avroUtils.readSpecificBinary(bytesV1, schemaV1, schemaV1, new User) } def readV2fromV1() = { val schemaV1 = new Schema.Parser().parse( """ |{ | "namespace": "test.avro", | "type": "record", | "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": "int"}, | {"name": "favorite_color", "type": "string"} | ] |} """.stripMargin ) val schemaV2 = new Schema.Parser().parse( """ |{ | "namespace": "test.avro", | "type": "record", | "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": "int"}, | {"name": "favorite_animal", "type": "string", "default": "tiger"}, | {"name": "favorite_color", "type": "string"} | ] |} """.stripMargin ) val bytesV1 = avroUtils.readFile("/tmp/userv1.bin") avroUtils.readSpecificBinary(bytesV1, schemaV2, schemaV1, new User) } def writeV1(): Unit = { val schemaV1 = new Schema.Parser().parse( """ |{ | "namespace": "test.avro", | "type": "record", | "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": "int"}, | {"name": "favorite_color", "type": "string"} | ] |} """.stripMargin ) val userV1 = new User userV1.name = "Alyssa" userV1.favorite_number = 256 userV1.favorite_color = "blue" val bytesV1 = avroUtils.writeSpecificBinary(userV1, schemaV1) avroUtils.writeFile("/tmp/userv1.bin", bytesV1) } } -
davideicardi revised this gist
Oct 22, 2018 . 1 changed file with 5 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -131,7 +131,11 @@ object avroUtils { outputStream.toByteArray } def readSpecificBinary[T]( bytes: Array[Byte], readerSchema: org.apache.avro.Schema, // destination schema writerSchema: org.apache.avro.Schema, // source schema reuse: T): T = { val datumReader = new ReflectDatumReader[T](writerSchema, readerSchema) val inputStream = new SeekableByteArrayInput(bytes) -
davideicardi revised this gist
Oct 22, 2018 . 1 changed file with 22 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -118,6 +118,28 @@ object avroUtils { schema.toString(true) } def writeSpecificBinary[T](value: T, schema: org.apache.avro.Schema): Array[Byte] = { val datumWriter = new ReflectDatumWriter[T](schema) val outputStream = new ByteArrayOutputStream() val encoder = EncoderFactory.get.binaryEncoder(outputStream, null) datumWriter.write(value, encoder) encoder.flush() outputStream.toByteArray } def readSpecificBinary[T](bytes: Array[Byte], readerSchema: org.apache.avro.Schema, writerSchema: org.apache.avro.Schema, reuse: T): T = { val datumReader = new ReflectDatumReader[T](writerSchema, readerSchema) val inputStream = new SeekableByteArrayInput(bytes) val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) datumReader.read(reuse, decoder) } def writeFile(fileName: String, bytes: Array[Byte]): Unit = { val file = new FileOutputStream(fileName) -
davideicardi revised this gist
Oct 17, 2018 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -183,7 +183,7 @@ object SampleAvro { } ``` The same can also be performed using specific class instead of using `GenericRecord`. One way is to use `ReflectDatumWriter/ReflectDatumReader` instead of `GenericDatumWriter/GenericDatumReader`. ## Schema resolution -
davideicardi revised this gist
Oct 17, 2018 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -183,7 +183,7 @@ object SampleAvro { } ``` The same can also be performed using specific class instead of using `GenericData.Record`. One way is to use `ReflectDatumWriter/ReflectDatumReader` instead of `GenericDatumWriter/GenericDatumReader`. ## Schema resolution -
davideicardi revised this gist
Oct 17, 2018 . 1 changed file with 2 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -183,6 +183,8 @@ object SampleAvro { } ``` The same can also be performed using specific class instead of using `GenericData.Record`. One way is to use `ReflectDatumWriter` and `ReflectDatumReader`. ## Schema resolution Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution -
davideicardi revised this gist
Oct 17, 2018 . 1 changed file with 5 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -185,5 +185,9 @@ object SampleAvro { ## Schema resolution Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution Note that when reading a binary avro you should always provide the original schema used to write it. It can be provided as an header (see data serialization) or from some where else. If you want to read data to a new schema (a new class) you should provide the old and the new schema. The old schema is used to read the binary data, the new schema is used to map old fields to new fields (following the above rules). -
davideicardi revised this gist
Oct 17, 2018 . 1 changed file with 134 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,134 @@ import java.io.{ByteArrayOutputStream, FileOutputStream} import java.nio.file.{Files, Paths} import org.apache.avro.file.{DataFileReader, DataFileWriter, SeekableByteArrayInput} import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.io.{DecoderFactory, EncoderFactory} import org.apache.avro.reflect.{ReflectData, ReflectDatumReader, ReflectDatumWriter} import scala.collection.mutable import scala.reflect.ClassTag import scala.collection.JavaConverters._ object avroUtils { def writeGenericData(records: Seq[org.apache.avro.generic.GenericData.Record], schema: org.apache.avro.Schema): Array[Byte] = { val outputStream = new ByteArrayOutputStream() val datumWriter = new GenericDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) dataFileWriter.create(schema, outputStream) for (record <- records) dataFileWriter.append(record) dataFileWriter.flush() dataFileWriter.close() outputStream.toByteArray } def readGenericData(bytes: Array[Byte], schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { val datumReader = new GenericDatumReader[GenericRecord](schema) val inputStream = new SeekableByteArrayInput(bytes) val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader) val list = dataFileReader.iterator().asScala.toList dataFileReader.close() list } def writeGenericBinary(records: Seq[org.apache.avro.generic.GenericData.Record], schema: org.apache.avro.Schema): Array[Byte] = { val outputStream = new ByteArrayOutputStream() val datumWriter = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get.binaryEncoder(outputStream, null) for (record <- records) datumWriter.write(record, encoder) encoder.flush() outputStream.toByteArray } def readGenericBinary(bytes: Array[Byte], schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { val datumReader = new GenericDatumReader[GenericRecord](schema) val inputStream = new SeekableByteArrayInput(bytes) val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord] while (!decoder.isEnd) { val item = datumReader.read(null, decoder) result += item } result.toList } def writeSpecificData[T](records: Seq[T])(implicit classTag: ClassTag[T]): Array[Byte] = { val outputStream = new ByteArrayOutputStream() val schema = ReflectData.get().getSchema(classTag.runtimeClass) val datumWriter = new ReflectDatumWriter[T](schema) val dataFileWriter = new DataFileWriter[T](datumWriter) dataFileWriter.create(schema, outputStream) for (record <- records) dataFileWriter.append(record) dataFileWriter.flush() dataFileWriter.close() outputStream.toByteArray } def readSpecificData[T](bytes: Array[Byte])(implicit classTag: ClassTag[T]): List[T] = { val schema = ReflectData.get().getSchema(classTag.runtimeClass) readSpecificData(bytes, schema) } def readSpecificData[T](bytes: Array[Byte], schema: org.apache.avro.Schema): List[T] = { val datumReader = new ReflectDatumReader[T](schema) val inputStream = new SeekableByteArrayInput(bytes) val dataFileReader = new DataFileReader[T](inputStream, datumReader) val list = dataFileReader.iterator().asScala.toList dataFileReader.close() list } def generateSchema[T]()(implicit classTag: ClassTag[T]): String = { val schema = ReflectData.get().getSchema(classTag.runtimeClass) schema.toString(true) } def writeFile(fileName: String, bytes: Array[Byte]): Unit = { val file = new FileOutputStream(fileName) try { file.write(bytes) } finally { file.close() } } def readFile(fileName: String): Array[Byte] = { Files.readAllBytes(Paths.get(fileName)) } } -
davideicardi revised this gist
Oct 17, 2018 . 1 changed file with 6 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -181,4 +181,9 @@ object SampleAvro { result.toList } } ``` ## Schema resolution https://avro.apache.org/docs/current/spec.html#Schema+Resolution -
davideicardi revised this gist
Jul 31, 2018 . 1 changed file with 51 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -55,9 +55,13 @@ Serialization using the official java library. Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"` Example of **Avro Data Serialization** and **Binary Encoding**. ``` import org.apache.avro.io.{DecoderFactory, EncoderFactory} import scala.collection.mutable object SampleAvro { def main(args: Array[String]): Unit = { @@ -88,11 +92,19 @@ object SampleAvro { user2.put("favorite_number", 7) user2.put("favorite_color", "red") // Data serialization (data + schema) val bytes = write(List(user1, user2), schemaObj) val users = read(bytes, schemaObj) println("Data serialization") println(users.mkString("\n")) // Binary encoding only (only data without schema) val bytes2 = writeBinary(List(user1, user2), schemaObj) val users2 = readBinary(bytes2, schemaObj) println("Binary encoding") println(users2.mkString("\n")) } def write(records: Seq[org.apache.avro.generic.GenericData.Record], @@ -131,5 +143,42 @@ object SampleAvro { list } def writeBinary(records: Seq[org.apache.avro.generic.GenericData.Record], schema: org.apache.avro.Schema): Array[Byte] = { import java.io.ByteArrayOutputStream import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} val outputStream = new ByteArrayOutputStream() val datumWriter = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get.binaryEncoder(outputStream, null) for (record <- records) datumWriter.write(record, encoder) encoder.flush() outputStream.toByteArray } def readBinary(bytes: Array[Byte], schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} import org.apache.avro.generic.{GenericDatumReader, GenericRecord} val datumReader = new GenericDatumReader[GenericRecord](schema) val inputStream = new SeekableByteArrayInput(bytes) val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord] while (!decoder.isEnd) { val item = datumReader.read(null, decoder) result += item } result.toList } } ``` -
davideicardi revised this gist
Jul 31, 2018 . 1 changed file with 27 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,22 +18,33 @@ Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1 Example of **Avro Data Serialization**: ``` import java.io.ByteArrayOutputStream import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream} object SampleAvro4sData { case class User(name: String, favorite_number: Int,favorite_color: String ) def main(args: Array[String]): Unit = { val outputStream = new ByteArrayOutputStream() val os = AvroOutputStream.data[User](outputStream) os.write(Seq( User("davide", 6, "red"), User("mark", 4, "white"))) os.flush() os.close() val bytes = outputStream.toByteArray val is = AvroInputStream.data[User](bytes) val users = is.iterator.toSet is.close() println("len: " + bytes.length) println(users.mkString("\n")) } } ``` To use **Avro Binary Encoding** just change `AvroOutputStream.data` to `AvroOutputStream.binary`. -
davideicardi revised this gist
Jul 31, 2018 . 1 changed file with 4 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,12 +2,12 @@ There are 4 possible serialization format when using avro: - Avro Json encoding - Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization) Binary format with an header that contains the full schema, this is the format usually used when writing Avro files - Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding) Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see [this](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L85) - Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding) Binary format without the header, the most compact format ## Avro4s -
davideicardi revised this gist
Jul 31, 2018 . 1 changed file with 20 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,9 +1,22 @@ # Avro serialization There are 4 possible serialization format when using avro: 1- Avro Json encoding 2- Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization) Binary format with an header that contains the full schema, this is the format usually used when writing Avro files 3- Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding) Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see [this](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L85) 4- Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding) Binary format without the header, the most compact format ## Avro4s Serialization using the avro4s library, that have the feature to generate a schema and a record (GenericRecord) given a case class. Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"` Example of **Avro Data Serialization**: ``` case class User(name: String, favorite_number: Int,favorite_color: String ) @@ -22,11 +35,17 @@ val users = is.iterator.toSet is.close() println(users.mkString("\n")) ``` To use **Avro Binary Encoding** just change `AvroOutputStream.data` to `AvroOutputStream.binary`. ## org.apache.avro Serialization using the official java library. Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"` Example of **Avro Data Serialization**. ``` object SampleAvro { def main(args: Array[String]): Unit = { -
davideicardi revised this gist
Jul 31, 2018 . 2 changed files with 31 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,33 @@ # Avro serialization ## Scala4s Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"` ``` case class User(name: String, favorite_number: Int,favorite_color: String ) val outputStream = new ByteArrayOutputStream() val os = AvroOutputStream.data[User](outputStream) os.write(Seq( User("davide", 6, "red"), User("mark", 4, "white"))) os.flush() os.close() val bytes = outputStream.toByteArray val is = AvroInputStream.data[User](bytes) val users = is.iterator.toSet is.close() println(users.mkString("\n")) ``` ## org.apache.avro Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"` ``` object SampleAvro { def main(args: Array[String]): Unit = { @@ -72,3 +102,4 @@ object SampleAvro { list } } ``` This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,16 +0,0 @@ -
davideicardi created this gist
Jul 24, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,16 @@ case class User(name: String, favorite_number: Int,favorite_color: String ) val outputStream = new ByteArrayOutputStream() val os = AvroOutputStream.data[User](outputStream) os.write(Seq( User("davide", 6, "red"), User("mark", 4, "white"))) os.flush() os.close() val bytes = outputStream.toByteArray val is = AvroInputStream.data[User](bytes) val users = is.iterator.toSet is.close() println(users.mkString("\n")) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,74 @@ object SampleAvro { def main(args: Array[String]): Unit = { val schema = """ |{"namespace": "example.avro", | "type": "record", | "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": "int"}, | {"name": "favorite_color", "type": "string"} | ] |} """.stripMargin import org.apache.avro.generic.GenericData val schemaObj = new org.apache.avro.Schema.Parser().parse(schema) val user1 = new GenericData.Record(schemaObj) user1.put("name", "Alyssa") user1.put("favorite_number", 256) user1.put("favorite_color", "blue") val user2 = new GenericData.Record(schemaObj) user2.put("name", "Ben") user2.put("favorite_number", 7) user2.put("favorite_color", "red") val bytes = write(List(user1, user2), schemaObj) val users = read(bytes, schemaObj) println(users.mkString("\n")) } def write(records: Seq[org.apache.avro.generic.GenericData.Record], schema: org.apache.avro.Schema): Array[Byte] = { import java.io.ByteArrayOutputStream import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} val outputStream = new ByteArrayOutputStream() val datumWriter = new GenericDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) dataFileWriter.create(schema, outputStream) for (record <- records) dataFileWriter.append(record) dataFileWriter.flush() dataFileWriter.close() outputStream.toByteArray } def read(bytes: Array[Byte], schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} import org.apache.avro.generic.{GenericDatumReader, GenericRecord} val datumReader = new GenericDatumReader[GenericRecord](schema) val inputStream = new SeekableByteArrayInput(bytes) val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader) import scala.collection.JavaConverters._ val list = dataFileReader.iterator().asScala.toList dataFileReader.close() list } }