Skip to content

Instantly share code, notes, and snippets.

@AlexeyLevin
Created February 26, 2019 21:10
Show Gist options
  • Select an option

  • Save AlexeyLevin/f649979f3139e6a11de7a9a85580bcaa to your computer and use it in GitHub Desktop.

Select an option

Save AlexeyLevin/f649979f3139e6a11de7a9a85580bcaa to your computer and use it in GitHub Desktop.
package ru.utair.esb._1c.adapter.script;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class KafkaDocumentsEventProducer {
private static final DateTimeFormatter FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd");
public static void main(String[] args) {
LocalDate localDate = LocalDate.parse("20190225", FORMAT);
List<String> messages = IntStream.range(0, 56)
.mapToObj(localDate::minusDays)
.map(date -> date.format(FORMAT))
.collect(Collectors.toList());
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "scriptCorpDocEventProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<Long,String> kafkaProducer = new KafkaProducer<>(props);
for (String message : messages) {
ProducerRecord<Long, String> record = new ProducerRecord<>("cronCorpDocEvent", message);
try {
RecordMetadata metadata = kafkaProducer.send(record).get();
System.out.println("Message " + message + " sent to partition " + metadata.partition()
+ " with offset " + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment