Skip to content

Instantly share code, notes, and snippets.

@aaronkub
Forked from davideicardi/README.md
Created June 3, 2021 16:57
Show Gist options
  • Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.
Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.

Revisions

  1. @davideicardi davideicardi revised this gist Oct 22, 2018. 1 changed file with 96 additions and 0 deletions.
    96 changes: 96 additions & 0 deletions SampleAvroEvolve.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,96 @@
    import org.apache.avro.Schema
    import test.avro.User

    object SampleAvroEvolve {

    def main(args: Array[String]): Unit = {

    // writeV1()

    val result = readV2fromV1()

    println(result)
    }

    def readV1() = {
    val schemaV1 = new Schema.Parser().parse(
    """
    |{
    | "namespace": "test.avro",
    | "type": "record",
    | "name": "User",
    | "fields": [
    | {"name": "name", "type": "string"},
    | {"name": "favorite_number", "type": "int"},
    | {"name": "favorite_color", "type": "string"}
    | ]
    |}
    """.stripMargin
    )

    val bytesV1 = avroUtils.readFile("/tmp/userv1.bin")
    avroUtils.readSpecificBinary(bytesV1, schemaV1, schemaV1, new User)
    }

    def readV2fromV1() = {
    val schemaV1 = new Schema.Parser().parse(
    """
    |{
    | "namespace": "test.avro",
    | "type": "record",
    | "name": "User",
    | "fields": [
    | {"name": "name", "type": "string"},
    | {"name": "favorite_number", "type": "int"},
    | {"name": "favorite_color", "type": "string"}
    | ]
    |}
    """.stripMargin
    )

    val schemaV2 = new Schema.Parser().parse(
    """
    |{
    | "namespace": "test.avro",
    | "type": "record",
    | "name": "User",
    | "fields": [
    | {"name": "name", "type": "string"},
    | {"name": "favorite_number", "type": "int"},
    | {"name": "favorite_animal", "type": "string", "default": "tiger"},
    | {"name": "favorite_color", "type": "string"}
    | ]
    |}
    """.stripMargin
    )

    val bytesV1 = avroUtils.readFile("/tmp/userv1.bin")
    avroUtils.readSpecificBinary(bytesV1, schemaV2, schemaV1, new User)
    }

    def writeV1(): Unit = {
    val schemaV1 = new Schema.Parser().parse(
    """
    |{
    | "namespace": "test.avro",
    | "type": "record",
    | "name": "User",
    | "fields": [
    | {"name": "name", "type": "string"},
    | {"name": "favorite_number", "type": "int"},
    | {"name": "favorite_color", "type": "string"}
    | ]
    |}
    """.stripMargin
    )

    val userV1 = new User
    userV1.name = "Alyssa"
    userV1.favorite_number = 256
    userV1.favorite_color = "blue"

    val bytesV1 = avroUtils.writeSpecificBinary(userV1, schemaV1)

    avroUtils.writeFile("/tmp/userv1.bin", bytesV1)
    }
    }
  2. @davideicardi davideicardi revised this gist Oct 22, 2018. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion avroUtils.scala
    Original file line number Diff line number Diff line change
    @@ -131,7 +131,11 @@ object avroUtils {
    outputStream.toByteArray
    }

    def readSpecificBinary[T](bytes: Array[Byte], readerSchema: org.apache.avro.Schema, writerSchema: org.apache.avro.Schema, reuse: T): T = {
    def readSpecificBinary[T](
    bytes: Array[Byte],
    readerSchema: org.apache.avro.Schema, // destination schema
    writerSchema: org.apache.avro.Schema, // source schema
    reuse: T): T = {
    val datumReader = new ReflectDatumReader[T](writerSchema, readerSchema)
    val inputStream = new SeekableByteArrayInput(bytes)

  3. @davideicardi davideicardi revised this gist Oct 22, 2018. 1 changed file with 22 additions and 0 deletions.
    22 changes: 22 additions & 0 deletions avroUtils.scala
    Original file line number Diff line number Diff line change
    @@ -118,6 +118,28 @@ object avroUtils {
    schema.toString(true)
    }

    def writeSpecificBinary[T](value: T, schema: org.apache.avro.Schema): Array[Byte] = {
    val datumWriter = new ReflectDatumWriter[T](schema)
    val outputStream = new ByteArrayOutputStream()

    val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)

    datumWriter.write(value, encoder)

    encoder.flush()

    outputStream.toByteArray
    }

    def readSpecificBinary[T](bytes: Array[Byte], readerSchema: org.apache.avro.Schema, writerSchema: org.apache.avro.Schema, reuse: T): T = {
    val datumReader = new ReflectDatumReader[T](writerSchema, readerSchema)
    val inputStream = new SeekableByteArrayInput(bytes)

    val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)

    datumReader.read(reuse, decoder)
    }

    def writeFile(fileName: String, bytes: Array[Byte]): Unit = {
    val file = new FileOutputStream(fileName)

  4. @davideicardi davideicardi revised this gist Oct 17, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion README.md
    Original file line number Diff line number Diff line change
    @@ -183,7 +183,7 @@ object SampleAvro {
    }
    ```

    The same can also be performed using specific class instead of using `GenericData.Record`. One way is to use `ReflectDatumWriter/ReflectDatumReader` instead of `GenericDatumWriter/GenericDatumReader`.
    The same can also be performed using specific class instead of using `GenericRecord`. One way is to use `ReflectDatumWriter/ReflectDatumReader` instead of `GenericDatumWriter/GenericDatumReader`.

    ## Schema resolution

  5. @davideicardi davideicardi revised this gist Oct 17, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion README.md
    Original file line number Diff line number Diff line change
    @@ -183,7 +183,7 @@ object SampleAvro {
    }
    ```

    The same can also be performed using specific class instead of using `GenericData.Record`. One way is to use `ReflectDatumWriter` and `ReflectDatumReader`.
    The same can also be performed using specific class instead of using `GenericData.Record`. One way is to use `ReflectDatumWriter/ReflectDatumReader` instead of `GenericDatumWriter/GenericDatumReader`.

    ## Schema resolution

  6. @davideicardi davideicardi revised this gist Oct 17, 2018. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -183,6 +183,8 @@ object SampleAvro {
    }
    ```

    The same can also be performed using specific class instead of using `GenericData.Record`. One way is to use `ReflectDatumWriter` and `ReflectDatumReader`.

    ## Schema resolution

    Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
  7. @davideicardi davideicardi revised this gist Oct 17, 2018. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion README.md
    Original file line number Diff line number Diff line change
    @@ -185,5 +185,9 @@ object SampleAvro {

    ## Schema resolution

    https://avro.apache.org/docs/current/spec.html#Schema+Resolution
    Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution

    Note that when reading a binary avro you should always provide the original schema used to write it. It can be provided as an header (see data serialization) or from some where else.
    If you want to read data to a new schema (a new class) you should provide the old and the new schema. The old schema is used to read the binary data, the new schema is used to map old fields to new fields (following the above rules).


  8. @davideicardi davideicardi revised this gist Oct 17, 2018. 1 changed file with 134 additions and 0 deletions.
    134 changes: 134 additions & 0 deletions avroUtils.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,134 @@
    import java.io.{ByteArrayOutputStream, FileOutputStream}
    import java.nio.file.{Files, Paths}

    import org.apache.avro.file.{DataFileReader, DataFileWriter, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
    import org.apache.avro.io.{DecoderFactory, EncoderFactory}
    import org.apache.avro.reflect.{ReflectData, ReflectDatumReader, ReflectDatumWriter}

    import scala.collection.mutable
    import scala.reflect.ClassTag
    import scala.collection.JavaConverters._

    object avroUtils {
    def writeGenericData(records: Seq[org.apache.avro.generic.GenericData.Record],
    schema: org.apache.avro.Schema): Array[Byte] = {

    val outputStream = new ByteArrayOutputStream()
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
    dataFileWriter.create(schema, outputStream)

    for (record <- records)
    dataFileWriter.append(record)

    dataFileWriter.flush()
    dataFileWriter.close()

    outputStream.toByteArray
    }

    def readGenericData(bytes: Array[Byte],
    schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)

    val list = dataFileReader.iterator().asScala.toList

    dataFileReader.close()

    list
    }

    def writeGenericBinary(records: Seq[org.apache.avro.generic.GenericData.Record],
    schema: org.apache.avro.Schema): Array[Byte] = {

    val outputStream = new ByteArrayOutputStream()
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)

    for (record <- records)
    datumWriter.write(record, encoder)

    encoder.flush()

    outputStream.toByteArray
    }

    def readGenericBinary(bytes: Array[Byte],
    schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)

    val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
    while (!decoder.isEnd) {
    val item = datumReader.read(null, decoder)

    result += item
    }

    result.toList
    }

    def writeSpecificData[T](records: Seq[T])(implicit classTag: ClassTag[T]): Array[Byte] = {

    val outputStream = new ByteArrayOutputStream()
    val schema = ReflectData.get().getSchema(classTag.runtimeClass)

    val datumWriter = new ReflectDatumWriter[T](schema)
    val dataFileWriter = new DataFileWriter[T](datumWriter)
    dataFileWriter.create(schema, outputStream)

    for (record <- records)
    dataFileWriter.append(record)

    dataFileWriter.flush()
    dataFileWriter.close()

    outputStream.toByteArray
    }

    def readSpecificData[T](bytes: Array[Byte])(implicit classTag: ClassTag[T]): List[T] = {

    val schema = ReflectData.get().getSchema(classTag.runtimeClass)

    readSpecificData(bytes, schema)
    }

    def readSpecificData[T](bytes: Array[Byte], schema: org.apache.avro.Schema): List[T] = {

    val datumReader = new ReflectDatumReader[T](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val dataFileReader = new DataFileReader[T](inputStream, datumReader)

    val list = dataFileReader.iterator().asScala.toList

    dataFileReader.close()

    list
    }

    def generateSchema[T]()(implicit classTag: ClassTag[T]): String = {
    val schema = ReflectData.get().getSchema(classTag.runtimeClass)

    schema.toString(true)
    }

    def writeFile(fileName: String, bytes: Array[Byte]): Unit = {
    val file = new FileOutputStream(fileName)

    try {
    file.write(bytes)
    } finally {
    file.close()
    }
    }

    def readFile(fileName: String): Array[Byte] = {
    Files.readAllBytes(Paths.get(fileName))
    }
    }
  9. @davideicardi davideicardi revised this gist Oct 17, 2018. 1 changed file with 6 additions and 1 deletion.
    7 changes: 6 additions & 1 deletion README.md
    Original file line number Diff line number Diff line change
    @@ -181,4 +181,9 @@ object SampleAvro {
    result.toList
    }
    }
    ```
    ```

    ## Schema resolution

    https://avro.apache.org/docs/current/spec.html#Schema+Resolution

  10. @davideicardi davideicardi revised this gist Jul 31, 2018. 1 changed file with 51 additions and 2 deletions.
    53 changes: 51 additions & 2 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -55,9 +55,13 @@ Serialization using the official java library.
    Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"`


    Example of **Avro Data Serialization**.
    Example of **Avro Data Serialization** and **Binary Encoding**.

    ```
    import org.apache.avro.io.{DecoderFactory, EncoderFactory}
    import scala.collection.mutable
    object SampleAvro {
    def main(args: Array[String]): Unit = {
    @@ -88,11 +92,19 @@ object SampleAvro {
    user2.put("favorite_number", 7)
    user2.put("favorite_color", "red")
    // Data serialization (data + schema)
    val bytes = write(List(user1, user2), schemaObj)
    val users = read(bytes, schemaObj)
    println("Data serialization")
    println(users.mkString("\n"))
    // Binary encoding only (only data without schema)
    val bytes2 = writeBinary(List(user1, user2), schemaObj)
    val users2 = readBinary(bytes2, schemaObj)
    println("Binary encoding")
    println(users2.mkString("\n"))
    }
    def write(records: Seq[org.apache.avro.generic.GenericData.Record],
    @@ -131,5 +143,42 @@ object SampleAvro {
    list
    }
    def writeBinary(records: Seq[org.apache.avro.generic.GenericData.Record],
    schema: org.apache.avro.Schema): Array[Byte] = {
    import java.io.ByteArrayOutputStream
    import org.apache.avro.file.DataFileWriter
    import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
    val outputStream = new ByteArrayOutputStream()
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)
    for (record <- records)
    datumWriter.write(record, encoder)
    encoder.flush()
    outputStream.toByteArray
    }
    def readBinary(bytes: Array[Byte],
    schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
    import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
    val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
    while (!decoder.isEnd) {
    val item = datumReader.read(null, decoder)
    result += item
    }
    result.toList
    }
    }
    ```
  11. @davideicardi davideicardi revised this gist Jul 31, 2018. 1 changed file with 27 additions and 16 deletions.
    43 changes: 27 additions & 16 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -18,22 +18,33 @@ Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1

    Example of **Avro Data Serialization**:
    ```
    case class User(name: String, favorite_number: Int,favorite_color: String )
    val outputStream = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[User](outputStream)
    os.write(Seq(
    User("davide", 6, "red"),
    User("mark", 4, "white")))
    os.flush()
    os.close()
    val bytes = outputStream.toByteArray
    val is = AvroInputStream.data[User](bytes)
    val users = is.iterator.toSet
    is.close()
    println(users.mkString("\n"))
    import java.io.ByteArrayOutputStream
    import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream}
    object SampleAvro4sData {
    case class User(name: String, favorite_number: Int,favorite_color: String )
    def main(args: Array[String]): Unit = {
    val outputStream = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[User](outputStream)
    os.write(Seq(
    User("davide", 6, "red"),
    User("mark", 4, "white")))
    os.flush()
    os.close()
    val bytes = outputStream.toByteArray
    val is = AvroInputStream.data[User](bytes)
    val users = is.iterator.toSet
    is.close()
    println("len: " + bytes.length)
    println(users.mkString("\n"))
    }
    }
    ```
    To use **Avro Binary Encoding** just change `AvroOutputStream.data` to `AvroOutputStream.binary`.

  12. @davideicardi davideicardi revised this gist Jul 31, 2018. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -2,12 +2,12 @@

    There are 4 possible serialization format when using avro:

    1- Avro Json encoding
    2- Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization)
    - Avro Json encoding
    - Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization)
    Binary format with an header that contains the full schema, this is the format usually used when writing Avro files
    3- Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding)
    - Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding)
    Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see [this](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L85)
    4- Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding)
    - Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding)
    Binary format without the header, the most compact format

    ## Avro4s
  13. @davideicardi davideicardi revised this gist Jul 31, 2018. 1 changed file with 20 additions and 1 deletion.
    21 changes: 20 additions & 1 deletion README.md
    Original file line number Diff line number Diff line change
    @@ -1,9 +1,22 @@
    # Avro serialization

    ## Scala4s
    There are 4 possible serialization format when using avro:

    1- Avro Json encoding
    2- Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization)
    Binary format with an header that contains the full schema, this is the format usually used when writing Avro files
    3- Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding)
    Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see [this](https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#L85)
    4- Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding)
    Binary format without the header, the most compact format

    ## Avro4s

    Serialization using the avro4s library, that have the feature to generate a schema and a record (GenericRecord) given a case class.

    Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"`

    Example of **Avro Data Serialization**:
    ```
    case class User(name: String, favorite_number: Int,favorite_color: String )
    @@ -22,11 +35,17 @@ val users = is.iterator.toSet
    is.close()
    println(users.mkString("\n"))
    ```
    To use **Avro Binary Encoding** just change `AvroOutputStream.data` to `AvroOutputStream.binary`.

    ## org.apache.avro

    Serialization using the official java library.

    Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"`


    Example of **Avro Data Serialization**.

    ```
    object SampleAvro {
    def main(args: Array[String]): Unit = {
  14. @davideicardi davideicardi revised this gist Jul 31, 2018. 2 changed files with 31 additions and 16 deletions.
    31 changes: 31 additions & 0 deletions plain-avro.scala → README.md
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,33 @@
    # Avro serialization

    ## Scala4s

    Add library: `libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"`

    ```
    case class User(name: String, favorite_number: Int,favorite_color: String )
    val outputStream = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[User](outputStream)
    os.write(Seq(
    User("davide", 6, "red"),
    User("mark", 4, "white")))
    os.flush()
    os.close()
    val bytes = outputStream.toByteArray
    val is = AvroInputStream.data[User](bytes)
    val users = is.iterator.toSet
    is.close()
    println(users.mkString("\n"))
    ```

    ## org.apache.avro

    Add library: `libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"`

    ```
    object SampleAvro {
    def main(args: Array[String]): Unit = {
    @@ -72,3 +102,4 @@ object SampleAvro {
    list
    }
    }
    ```
    16 changes: 0 additions & 16 deletions avro4s.scala
    Original file line number Diff line number Diff line change
    @@ -1,16 +0,0 @@
    case class User(name: String, favorite_number: Int,favorite_color: String )

    val outputStream = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[User](outputStream)
    os.write(Seq(
    User("davide", 6, "red"),
    User("mark", 4, "white")))
    os.flush()
    os.close()

    val bytes = outputStream.toByteArray

    val is = AvroInputStream.data[User](bytes)
    val users = is.iterator.toSet
    is.close()
    println(users.mkString("\n"))
  15. @davideicardi davideicardi created this gist Jul 24, 2018.
    16 changes: 16 additions & 0 deletions avro4s.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,16 @@
    case class User(name: String, favorite_number: Int,favorite_color: String )

    val outputStream = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[User](outputStream)
    os.write(Seq(
    User("davide", 6, "red"),
    User("mark", 4, "white")))
    os.flush()
    os.close()

    val bytes = outputStream.toByteArray

    val is = AvroInputStream.data[User](bytes)
    val users = is.iterator.toSet
    is.close()
    println(users.mkString("\n"))
    74 changes: 74 additions & 0 deletions plain-avro.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,74 @@
    object SampleAvro {
    def main(args: Array[String]): Unit = {

    val schema =
    """
    |{"namespace": "example.avro",
    | "type": "record",
    | "name": "User",
    | "fields": [
    | {"name": "name", "type": "string"},
    | {"name": "favorite_number", "type": "int"},
    | {"name": "favorite_color", "type": "string"}
    | ]
    |}
    """.stripMargin

    import org.apache.avro.generic.GenericData

    val schemaObj = new org.apache.avro.Schema.Parser().parse(schema)

    val user1 = new GenericData.Record(schemaObj)
    user1.put("name", "Alyssa")
    user1.put("favorite_number", 256)
    user1.put("favorite_color", "blue")

    val user2 = new GenericData.Record(schemaObj)
    user2.put("name", "Ben")
    user2.put("favorite_number", 7)
    user2.put("favorite_color", "red")

    val bytes = write(List(user1, user2), schemaObj)

    val users = read(bytes, schemaObj)

    println(users.mkString("\n"))
    }

    def write(records: Seq[org.apache.avro.generic.GenericData.Record],
    schema: org.apache.avro.Schema): Array[Byte] = {
    import java.io.ByteArrayOutputStream
    import org.apache.avro.file.DataFileWriter
    import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}

    val outputStream = new ByteArrayOutputStream()
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
    dataFileWriter.create(schema, outputStream)

    for (record <- records)
    dataFileWriter.append(record)

    dataFileWriter.flush()
    dataFileWriter.close()

    outputStream.toByteArray
    }

    def read(bytes: Array[Byte],
    schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
    import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)

    import scala.collection.JavaConverters._
    val list = dataFileReader.iterator().asScala.toList

    dataFileReader.close()

    list
    }
    }