Last active
October 29, 2019 06:00
-
-
Save ramazancesur/b47830d2f0b67be5341264af8f9e43b6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.avro.Schema; | |
| import org.apache.avro.io.BinaryDecoder; | |
| import org.apache.avro.io.DatumReader; | |
| import org.apache.avro.io.DecoderFactory; | |
| import org.apache.avro.specific.SpecificDatumReader; | |
| import org.apache.avro.specific.SpecificRecordBase; | |
| import org.apache.kafka.clients.consumer.ConsumerRecord; | |
| import org.apache.kafka.clients.consumer.ConsumerRecords; | |
| import org.apache.kafka.clients.consumer.KafkaConsumer; | |
| import java.io.ByteArrayInputStream; | |
| import java.io.IOException; | |
| import java.util.ArrayList; | |
| import java.util.Collections; | |
| import java.util.List; | |
| import java.util.Properties; | |
| public class Consumer { | |
| private final KafkaConsumer<String,byte[]> consumer; | |
| private final String topic; | |
| private Schema avroSchema; | |
| public Consumer(Properties props, String topic, String avroSchemaPath) throws IOException { | |
| consumer= new KafkaConsumer<String, byte[]>(props); | |
| this.topic= topic; | |
| avroSchema= new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream(avroSchemaPath)); | |
| } | |
| public <T extends SpecificRecordBase> List<T> getConsumedData(){ | |
| consumer.subscribe(Collections.singletonList(this.topic)); | |
| ConsumerRecords<String, byte[]> records= consumer.poll(1000l); | |
| List<T> lstDataTemp= new ArrayList<>(); | |
| for (ConsumerRecord<String, byte[]> record : records) { | |
| DatumReader<T> datumReader= new SpecificDatumReader<>(avroSchema); | |
| ByteArrayInputStream stream= new ByteArrayInputStream(record.value()); | |
| BinaryDecoder decoder= DecoderFactory.get().directBinaryDecoder(stream,null); | |
| try { | |
| T result= datumReader.read( null,decoder); | |
| lstDataTemp.add(result); | |
| System.out.println(result.toString()); | |
| } catch (IOException e) { | |
| e.printStackTrace(); | |
| } | |
| } | |
| return lstDataTemp; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| bootstramp.server= localhost:9092group.id= sample | |
| key.deserializer= com.apache.kafka.common.serialization.StringDeserializer | |
| value.deserializer= com.apache.kafka.common.serialization.ByteArrayDeserializer | |
| key.serializer= com.apache.kafka.common.serialization.StringSerializer | |
| value.serializer= com.apache.kafka.common.serialization.ByteArraySerializer | |
| thread.name= kafkaSample | |
| topic.name= test | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.avro.io.BinaryEncoder; | |
| import org.apache.avro.io.DatumWriter; | |
| import org.apache.avro.io.EncoderFactory; | |
| import org.apache.avro.specific.SpecificDatumWriter; | |
| import org.apache.avro.specific.SpecificRecordBase; | |
| import org.apache.kafka.clients.producer.KafkaProducer; | |
| import org.apache.kafka.clients.producer.ProducerRecord; | |
| import java.io.ByteArrayOutputStream; | |
| import java.util.ArrayList; | |
| import java.util.Properties; | |
| public class Producer { | |
| private final KafkaProducer<String,byte[]> producer; | |
| private final String topic; | |
| public Producer(Properties props, String topic){ | |
| producer= new KafkaProducer<String, byte[]>(props); | |
| this.topic=topic; | |
| } | |
| public <T extends SpecificRecordBase> void send(ArrayList<T> lstData){ | |
| lstData.forEach(data->{ | |
| byte[] bytes= null; | |
| DatumWriter<T> datumWriter= new SpecificDatumWriter<>(data.getSchema()); | |
| ByteArrayOutputStream stream= new ByteArrayOutputStream(8192); | |
| BinaryEncoder encoder= EncoderFactory.get().directBinaryEncoder(stream,null); | |
| try{ | |
| datumWriter.write(data,encoder); | |
| encoder.flush(); | |
| bytes= stream.toByteArray(); | |
| ProducerRecord<String,byte[]> record= new ProducerRecord<>(topic,bytes); | |
| producer.send(record); | |
| } catch (Exception ex){ | |
| ex.printStackTrace(); | |
| } | |
| }); | |
| } | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?xml version="1.0" encoding="UTF-8"?> | |
| <project xmlns="http://maven.apache.org/POM/4.0.0" | |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
| xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
| <modelVersion>4.0.0</modelVersion> | |
| <groupId>tr.com.kafka</groupId> | |
| <artifactId>kafka-producer</artifactId> | |
| <version>1.0-SNAPSHOT</version> | |
| <build> | |
| <plugins> | |
| <plugin> | |
| <groupId>org.apache.maven.plugins</groupId> | |
| <artifactId>maven-compiler-plugin</artifactId> | |
| <configuration> | |
| <source>1.7</source> | |
| <target>1.7</target> | |
| </configuration> | |
| </plugin> | |
| </plugins> | |
| </build> | |
| <dependencies> | |
| <dependency> | |
| <groupId>org.apache.kafka</groupId> | |
| <artifactId>kafka-clients</artifactId> | |
| <version>0.9.0.1</version> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.apache.avro</groupId> | |
| <artifactId>avro</artifactId> | |
| <version>1.8.0</version> | |
| </dependency> | |
| <dependency> | |
| <groupId>commons-io</groupId> | |
| <artifactId>commons-io</artifactId> | |
| <version>2.4</version> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.slf4j</groupId> | |
| <artifactId>slf4j-api</artifactId> | |
| <version>1.7.13</version> | |
| </dependency> | |
| <dependency> | |
| <groupId>org.slf4j</groupId> | |
| <artifactId>slf4j-simple</artifactId> | |
| <version>1.7.13</version> | |
| </dependency> | |
| </dependencies> | |
| </project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment