-
-
Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.
Write and read Avro records from bytes array
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| case class User(name: String, favorite_number: Int,favorite_color: String ) | |
| val outputStream = new ByteArrayOutputStream() | |
| val os = AvroOutputStream.data[User](outputStream) | |
| os.write(Seq( | |
| User("davide", 6, "red"), | |
| User("mark", 4, "white"))) | |
| os.flush() | |
| os.close() | |
| val bytes = outputStream.toByteArray | |
| val is = AvroInputStream.data[User](bytes) | |
| val users = is.iterator.toSet | |
| is.close() | |
| println(users.mkString("\n")) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object SampleAvro { | |
| def main(args: Array[String]): Unit = { | |
| val schema = | |
| """ | |
| |{"namespace": "example.avro", | |
| | "type": "record", | |
| | "name": "User", | |
| | "fields": [ | |
| | {"name": "name", "type": "string"}, | |
| | {"name": "favorite_number", "type": "int"}, | |
| | {"name": "favorite_color", "type": "string"} | |
| | ] | |
| |} | |
| """.stripMargin | |
| import org.apache.avro.generic.GenericData | |
| val schemaObj = new org.apache.avro.Schema.Parser().parse(schema) | |
| val user1 = new GenericData.Record(schemaObj) | |
| user1.put("name", "Alyssa") | |
| user1.put("favorite_number", 256) | |
| user1.put("favorite_color", "blue") | |
| val user2 = new GenericData.Record(schemaObj) | |
| user2.put("name", "Ben") | |
| user2.put("favorite_number", 7) | |
| user2.put("favorite_color", "red") | |
| val bytes = write(List(user1, user2), schemaObj) | |
| val users = read(bytes, schemaObj) | |
| println(users.mkString("\n")) | |
| } | |
| def write(records: Seq[org.apache.avro.generic.GenericData.Record], | |
| schema: org.apache.avro.Schema): Array[Byte] = { | |
| import java.io.ByteArrayOutputStream | |
| import org.apache.avro.file.DataFileWriter | |
| import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} | |
| val outputStream = new ByteArrayOutputStream() | |
| val datumWriter = new GenericDatumWriter[GenericRecord](schema) | |
| val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) | |
| dataFileWriter.create(schema, outputStream) | |
| for (record <- records) | |
| dataFileWriter.append(record) | |
| dataFileWriter.flush() | |
| dataFileWriter.close() | |
| outputStream.toByteArray | |
| } | |
| def read(bytes: Array[Byte], | |
| schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = { | |
| import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} | |
| import org.apache.avro.generic.{GenericDatumReader, GenericRecord} | |
| val datumReader = new GenericDatumReader[GenericRecord](schema) | |
| val inputStream = new SeekableByteArrayInput(bytes) | |
| val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader) | |
| import scala.collection.JavaConverters._ | |
| val list = dataFileReader.iterator().asScala.toList | |
| dataFileReader.close() | |
| list | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment