Skip to content

Instantly share code, notes, and snippets.

@AlexeyLevin
Created February 26, 2019 21:10
Show Gist options
  • Select an option

  • Save AlexeyLevin/f649979f3139e6a11de7a9a85580bcaa to your computer and use it in GitHub Desktop.

Select an option

Save AlexeyLevin/f649979f3139e6a11de7a9a85580bcaa to your computer and use it in GitHub Desktop.

Revisions

  1. AlexeyLevin created this gist Feb 26, 2019.
    47 changes: 47 additions & 0 deletions SimpleKafkaProducer
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,47 @@
    package ru.utair.esb._1c.adapter.script;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.time.LocalDate;
    import java.time.format.DateTimeFormatter;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;

    public class KafkaDocumentsEventProducer {

    private static final DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd");

    public static void main(String[] args) {
    LocalDate localDate = LocalDate.parse("20190225", FORMAT);
    List<String> messages = IntStream.range(0, 56)
    .mapToObj(localDate::minusDays)
    .map(date -> date.format(FORMAT))
    .collect(Collectors.toList());

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "scriptCorpDocEventProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


    KafkaProducer<Long,String> kafkaProducer = new KafkaProducer<>(props);
    for (String message : messages) {
    ProducerRecord<Long, String> record = new ProducerRecord<>("cronCorpDocEvent", message);
    try {
    RecordMetadata metadata = kafkaProducer.send(record).get();
    System.out.println("Message " + message + " sent to partition " + metadata.partition()
    + " with offset " + metadata.offset());
    } catch (ExecutionException | InterruptedException e) {
    System.out.println("Error in sending record");
    System.out.println(e);
    }
    }
    }
    }