import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Controller; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Properties; public class KafkaProducer{ public void produce() { Integer Partition = 1; Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:8080"); properties.put("acks","all"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put("block.on.buffer.full","false"); String schemaStr = "{\"namespace\": \"org.test.data\",\n" + " \"type\": \"record\",\n" + " \"name\": \"user\",\n" + " \"fields\": [\n" + " {\"name\": \"firstName\", \"type\": \"string\"},\n" + " ]\n" + "}"; Schema schema = new Schema.Parser().parse(schemaStr); GenericRecord datum = new GenericData.Record(schema); datum.put("firstName", "yaron"); KafkaProducer messageProducer = new KafkaProducer(properties); ProducerRecord producerRecord = null; try { producerRecord = new ProducerRecord("topic", Partition, "key", datumToByteArray(schema, datum)); } catch (IOException e) { e.printStackTrace(); } messageProducer.send(producerRecord); } public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException { GenericDatumWriter writer = new GenericDatumWriter(schema); ByteArrayOutputStream os = new ByteArrayOutputStream(); try { Encoder e = EncoderFactory.get().binaryEncoder(os, null); writer.write(datum, e); e.flush(); byte[] byteData = os.toByteArray(); return byteData; } finally { os.close(); } } }