Skip to content

Instantly share code, notes, and snippets.

@ramazancesur
Last active October 29, 2019 06:00
Show Gist options
  • Save ramazancesur/b47830d2f0b67be5341264af8f9e43b6 to your computer and use it in GitHub Desktop.
Save ramazancesur/b47830d2f0b67be5341264af8f9e43b6 to your computer and use it in GitHub Desktop.
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
public class Consumer {
private final KafkaConsumer<String,byte[]> consumer;
private final String topic;
private Schema avroSchema;
public Consumer(Properties props, String topic, String avroSchemaPath) throws IOException {
consumer= new KafkaConsumer<String, byte[]>(props);
this.topic= topic;
avroSchema= new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream(avroSchemaPath));
}
public <T extends SpecificRecordBase> List<T> getConsumedData(){
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<String, byte[]> records= consumer.poll(1000l);
List<T> lstDataTemp= new ArrayList<>();
for (ConsumerRecord<String, byte[]> record : records) {
DatumReader<T> datumReader= new SpecificDatumReader<>(avroSchema);
ByteArrayInputStream stream= new ByteArrayInputStream(record.value());
BinaryDecoder decoder= DecoderFactory.get().directBinaryDecoder(stream,null);
try {
T result= datumReader.read( null,decoder);
lstDataTemp.add(result);
System.out.println(result.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
return lstDataTemp;
}
}
bootstramp.server= localhost:9092group.id= sample
key.deserializer= com.apache.kafka.common.serialization.StringDeserializer
value.deserializer= com.apache.kafka.common.serialization.ByteArrayDeserializer
key.serializer= com.apache.kafka.common.serialization.StringSerializer
value.serializer= com.apache.kafka.common.serialization.ByteArraySerializer
thread.name= kafkaSample
topic.name= test
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Properties;
public class Producer {
private final KafkaProducer<String,byte[]> producer;
private final String topic;
public Producer(Properties props, String topic){
producer= new KafkaProducer<String, byte[]>(props);
this.topic=topic;
}
public <T extends SpecificRecordBase> void send(ArrayList<T> lstData){
lstData.forEach(data->{
byte[] bytes= null;
DatumWriter<T> datumWriter= new SpecificDatumWriter<>(data.getSchema());
ByteArrayOutputStream stream= new ByteArrayOutputStream(8192);
BinaryEncoder encoder= EncoderFactory.get().directBinaryEncoder(stream,null);
try{
datumWriter.write(data,encoder);
encoder.flush();
bytes= stream.toByteArray();
ProducerRecord<String,byte[]> record= new ProducerRecord<>(topic,bytes);
producer.send(record);
} catch (Exception ex){
ex.printStackTrace();
}
});
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tr.com.kafka</groupId>
<artifactId>kafka-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.13</version>
</dependency>
</dependencies>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment