Skip to content

Instantly share code, notes, and snippets.

@yaroncon
Last active November 19, 2020 06:12
Show Gist options
  • Select an option

  • Save yaroncon/6ce06005429b2c4d6aa8 to your computer and use it in GitHub Desktop.

Select an option

Save yaroncon/6ce06005429b2c4d6aa8 to your computer and use it in GitHub Desktop.

Revisions

  1. Yaron renamed this gist Jun 4, 2015. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. Yaron revised this gist Jun 4, 2015. 1 changed file with 10 additions and 10 deletions.
    20 changes: 10 additions & 10 deletions PartailPom.xml
    Original file line number Diff line number Diff line change
    @@ -1,10 +1,10 @@
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.7.7</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.7.7</version>
    </dependency>
  3. Yaron revised this gist Jun 4, 2015. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions PartailPom.xml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,10 @@
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.7.7</version>
    </dependency>
  4. Yaron revised this gist Jun 4, 2015. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion KafkaProducer.java
    Original file line number Diff line number Diff line change
    @@ -15,7 +15,7 @@
    import java.io.IOException;
    import java.util.Properties;

    public class test{
    public class KafkaProducer{

    public void produce()
    {
  5. Yaron revised this gist Jun 4, 2015. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions KafkaProducer.java
    Original file line number Diff line number Diff line change
    @@ -40,8 +40,7 @@ public void produce()
    datum.put("firstName", "yaron");

    KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(properties);
    ProducerRecord<String, byte[]> producerRecord =
    null;
    ProducerRecord<String, byte[]> producerRecord = null;
    try {
    producerRecord = new ProducerRecord<String, byte[]>("topic",
    Partition,
  6. Yaron renamed this gist Jun 4, 2015. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  7. Yaron renamed this gist Jun 4, 2015. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  8. Yaron created this gist Jun 4, 2015.
    70 changes: 70 additions & 0 deletions gistfile1.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericDatumWriter;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.Encoder;
    import org.apache.avro.io.EncoderFactory;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.stereotype.Controller;

    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.Properties;

    public class test{

    public void produce()
    {
    Integer Partition = 1;

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:8080");
    properties.put("acks","all");
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
    properties.put("block.on.buffer.full","false");

    String schemaStr = "{\"namespace\": \"org.test.data\",\n" +
    " \"type\": \"record\",\n" +
    " \"name\": \"user\",\n" +
    " \"fields\": [\n" +
    " {\"name\": \"firstName\", \"type\": \"string\"},\n" +
    " ]\n" +
    "}";
    Schema schema = new Schema.Parser().parse(schemaStr);
    GenericRecord datum = new GenericData.Record(schema);
    datum.put("firstName", "yaron");

    KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(properties);
    ProducerRecord<String, byte[]> producerRecord =
    null;
    try {
    producerRecord = new ProducerRecord<String, byte[]>("topic",
    Partition,
    "key",
    datumToByteArray(schema, datum));
    } catch (IOException e) {
    e.printStackTrace();
    }
    messageProducer.send(producerRecord);
    }

    public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {
    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    try {
    Encoder e = EncoderFactory.get().binaryEncoder(os, null);
    writer.write(datum, e);
    e.flush();
    byte[] byteData = os.toByteArray();
    return byteData;
    } finally {
    os.close();
    }
    }

    }