Skip to content

Instantly share code, notes, and snippets.

@aaronkub
Forked from davideicardi/README.md
Created June 3, 2021 16:57
Show Gist options
  • Select an option

  • Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.

Select an option

Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.
Write and read Avro records from bytes array
case class User(name: String, favorite_number: Int,favorite_color: String )
val outputStream = new ByteArrayOutputStream()
val os = AvroOutputStream.data[User](outputStream)
os.write(Seq(
User("davide", 6, "red"),
User("mark", 4, "white")))
os.flush()
os.close()
val bytes = outputStream.toByteArray
val is = AvroInputStream.data[User](bytes)
val users = is.iterator.toSet
is.close()
println(users.mkString("\n"))
object SampleAvro {
def main(args: Array[String]): Unit = {
val schema =
"""
|{"namespace": "example.avro",
| "type": "record",
| "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": "int"},
| {"name": "favorite_color", "type": "string"}
| ]
|}
""".stripMargin
import org.apache.avro.generic.GenericData
val schemaObj = new org.apache.avro.Schema.Parser().parse(schema)
val user1 = new GenericData.Record(schemaObj)
user1.put("name", "Alyssa")
user1.put("favorite_number", 256)
user1.put("favorite_color", "blue")
val user2 = new GenericData.Record(schemaObj)
user2.put("name", "Ben")
user2.put("favorite_number", 7)
user2.put("favorite_color", "red")
val bytes = write(List(user1, user2), schemaObj)
val users = read(bytes, schemaObj)
println(users.mkString("\n"))
}
def write(records: Seq[org.apache.avro.generic.GenericData.Record],
schema: org.apache.avro.Schema): Array[Byte] = {
import java.io.ByteArrayOutputStream
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
val outputStream = new ByteArrayOutputStream()
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, outputStream)
for (record <- records)
dataFileWriter.append(record)
dataFileWriter.flush()
dataFileWriter.close()
outputStream.toByteArray
}
def read(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)
import scala.collection.JavaConverters._
val list = dataFileReader.iterator().asScala.toList
dataFileReader.close()
list
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment