# Avro serialization There are 4 possible serialization format when using avro: - Avro Json encoding - Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization) Binary format with an header that contains the full schema, this is the format usually used when writing Avro files - Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding) Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see [this](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L85) - Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding) Binary format without the header, the most compact format ## Avro4s Serialization using the avro4s library, that have the feature to generate a schema and a record (GenericRecord) given a case class. Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"` Example of **Avro Data Serialization**: ``` import java.io.ByteArrayOutputStream import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream} object SampleAvro4sData { case class User(name: String, favorite_number: Int,favorite_color: String ) def main(args: Array[String]): Unit = { val outputStream = new ByteArrayOutputStream() val os = AvroOutputStream.data[User](outputStream) os.write(Seq( User("davide", 6, "red"), User("mark", 4, "white"))) os.flush() os.close() val bytes = outputStream.toByteArray val is = AvroInputStream.data[User](bytes) val users = is.iterator.toSet is.close() println("len: " + bytes.length) println(users.mkString("\n")) } } ``` To use **Avro Binary Encoding** just change `AvroOutputStream.data` to `AvroOutputStream.binary`. ## org.apache.avro Serialization using the official java library. Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"` Example of **Avro Data Serialization** and **Binary Encoding**. ``` import org.apache.avro.io.{DecoderFactory, EncoderFactory} import scala.collection.mutable object SampleAvro { def main(args: Array[String]): Unit = { val schema = """ |{"namespace": "example.avro", | "type": "record", | "name": "User", | "fields": [ | {"name": "name", "type": "string"}, | {"name": "favorite_number", "type": "int"}, | {"name": "favorite_color", "type": "string"} | ] |} """.stripMargin import org.apache.avro.generic.GenericData val schemaObj = new org.apache.avro.Schema.Parser().parse(schema) val user1 = new GenericData.Record(schemaObj) user1.put("name", "Alyssa") user1.put("favorite_number", 256) user1.put("favorite_color", "blue") val user2 = new GenericData.Record(schemaObj) user2.put("name", "Ben") user2.put("favorite_number", 7) user2.put("favorite_color", "red") // Data serialization (data + schema) val bytes = write(List(user1, user2), schemaObj) val users = read(bytes, schemaObj) println("Data serialization") println(users.mkString("\n")) // Binary encoding only (only data without schema) val bytes2 = writeBinary(List(user1, user2), schemaObj) val users2 = readBinary(bytes2, schemaObj) println("Binary encoding") println(users2.mkString("\n")) } def write(records: Seq[org.apache.avro.generic.GenericData.Record], schema: org.apache.avro.Schema): Array[Byte] = { import java.io.ByteArrayOutputStream import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} val outputStream = new ByteArrayOutputStream() val datumWriter = new GenericDatumWriter[GenericRecord](schema) val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) dataFileWriter.create(schema, outputStream) for (record <- records) dataFileWriter.append(record) dataFileWriter.flush() dataFileWriter.close() outputStream.toByteArray } def read(bytes: Array[Byte], schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} import org.apache.avro.generic.{GenericDatumReader, GenericRecord} val datumReader = new GenericDatumReader[GenericRecord](schema) val inputStream = new SeekableByteArrayInput(bytes) val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader) import scala.collection.JavaConverters._ val list = dataFileReader.iterator().asScala.toList dataFileReader.close() list } def writeBinary(records: Seq[org.apache.avro.generic.GenericData.Record], schema: org.apache.avro.Schema): Array[Byte] = { import java.io.ByteArrayOutputStream import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} val outputStream = new ByteArrayOutputStream() val datumWriter = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get.binaryEncoder(outputStream, null) for (record <- records) datumWriter.write(record, encoder) encoder.flush() outputStream.toByteArray } def readBinary(bytes: Array[Byte], schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} import org.apache.avro.generic.{GenericDatumReader, GenericRecord} val datumReader = new GenericDatumReader[GenericRecord](schema) val inputStream = new SeekableByteArrayInput(bytes) val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord] while (!decoder.isEnd) { val item = datumReader.read(null, decoder) result += item } result.toList } } ``` The same can also be performed using specific class instead of using `GenericRecord`. One way is to use `ReflectDatumWriter/ReflectDatumReader` instead of `GenericDatumWriter/GenericDatumReader`. ## Schema resolution Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution Note that when reading a binary avro you should always provide the original schema used to write it. It can be provided as an header (see data serialization) or from some where else. If you want to read data to a new schema (a new class) you should provide the old and the new schema. The old schema is used to read the binary data, the new schema is used to map old fields to new fields (following the above rules).