Created
October 21, 2020 23:29
-
-
Save cyberdelia/1b9dfd259a64c0c3c50455587f79d41c to your computer and use it in GitHub Desktop.
Revisions
-
cyberdelia revised this gist
Oct 21, 2020 . No changes.There are no files selected for viewing
-
cyberdelia created this gist
Oct 21, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,13 @@ package com.lapanthere.bohemia import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import java.time.Duration fun <K, V> KafkaConsumer<K, V>.asFlow(timeout: Duration = Duration.ofMillis(500)): Flow<ConsumerRecord<K, V>> = flow { poll(timeout).forEach { emit(it) } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,19 @@ package com.lapanthere.bohemia import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Deferred import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata fun <K, V> KafkaProducer<K, V>.sendAsync(record: ProducerRecord<K, V>): Deferred<RecordMetadata> = CompletableDeferred<RecordMetadata>().apply { send(record) { metadata, exception -> if (exception != null) { completeExceptionally(exception) } else { complete(metadata) } } }