Skip to content

Instantly share code, notes, and snippets.

@cyberdelia
Created October 21, 2020 23:29
Show Gist options
  • Select an option

  • Save cyberdelia/1b9dfd259a64c0c3c50455587f79d41c to your computer and use it in GitHub Desktop.

Select an option

Save cyberdelia/1b9dfd259a64c0c3c50455587f79d41c to your computer and use it in GitHub Desktop.

Revisions

  1. cyberdelia revised this gist Oct 21, 2020. No changes.
  2. cyberdelia created this gist Oct 21, 2020.
    13 changes: 13 additions & 0 deletions Consumer.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,13 @@
    package com.lapanthere.bohemia

    import kotlinx.coroutines.flow.Flow
    import kotlinx.coroutines.flow.flow
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.clients.consumer.KafkaConsumer
    import java.time.Duration


    fun <K, V> KafkaConsumer<K, V>.asFlow(timeout: Duration = Duration.ofMillis(500)): Flow<ConsumerRecord<K, V>> =
    flow {
    poll(timeout).forEach { emit(it) }
    }
    19 changes: 19 additions & 0 deletions Producer.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,19 @@
    package com.lapanthere.bohemia

    import kotlinx.coroutines.CompletableDeferred
    import kotlinx.coroutines.Deferred
    import org.apache.kafka.clients.producer.KafkaProducer
    import org.apache.kafka.clients.producer.ProducerRecord
    import org.apache.kafka.clients.producer.RecordMetadata


    fun <K, V> KafkaProducer<K, V>.sendAsync(record: ProducerRecord<K, V>): Deferred<RecordMetadata> =
    CompletableDeferred<RecordMetadata>().apply {
    send(record) { metadata, exception ->
    if (exception != null) {
    completeExceptionally(exception)
    } else {
    complete(metadata)
    }
    }
    }