Skip to content

Instantly share code, notes, and snippets.

@ahmedomarjee
Forked from jivimberg/CoroutinesUtils.kt
Created July 7, 2020 21:13
Show Gist options
  • Save ahmedomarjee/3b37c17a26884956c3bb9561227fd00a to your computer and use it in GitHub Desktop.
Save ahmedomarjee/3b37c17a26884956c3bb9561227fd00a to your computer and use it in GitHub Desktop.

Revisions

  1. @jivimberg jivimberg revised this gist Mar 18, 2019. 4 changed files with 28 additions and 17 deletions.
    23 changes: 23 additions & 0 deletions CoroutinesUtils.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,23 @@
    package com.jivimberg.sqs.published

    import kotlinx.coroutines.CancellationException
    import kotlinx.coroutines.CoroutineScope
    import kotlinx.coroutines.isActive
    import kotlinx.coroutines.yield
    import java.lang.Thread.currentThread

    suspend fun CoroutineScope.repeatUntilCancelled(block: suspend () -> Unit) {
    while (isActive) {
    try {
    block()
    yield()
    } catch (ex: CancellationException) {
    println("coroutine on ${currentThread().name} cancelled")
    } catch (ex: Exception) {
    println("${currentThread().name} failed with {$ex}. Retrying...")
    ex.printStackTrace()
    }
    }

    println("coroutine on ${currentThread().name} exiting")
    }
    13 changes: 0 additions & 13 deletions FunUtils.kt
    Original file line number Diff line number Diff line change
    @@ -1,13 +0,0 @@
    package com.jivimberg.sqs

    internal inline fun runForever(block: () -> Unit): Nothing {
    while (true) {
    try {
    block()
    } catch (ex: Exception) {
    println("${Thread.currentThread().name} failed with exception. Retrying...")
    ex.printStackTrace()
    continue
    }
    }
    }
    7 changes: 4 additions & 3 deletions SqsConsumer.kt
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,6 @@
    package com.jivimberg.sqs
    package com.jivimberg.sqs.published

    import com.jivimberg.sqs.SQS_URL
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.channels.ReceiveChannel
    @@ -30,7 +31,7 @@ class SqsSampleConsumerChannels(
    }

    private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch {
    runForever {
    repeatUntilCancelled {
    val receiveRequest = ReceiveMessageRequest.builder()
    .queueUrl(SQS_URL)
    .waitTimeSeconds(20)
    @@ -47,7 +48,7 @@ class SqsSampleConsumerChannels(
    }

    private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch {
    runForever {
    repeatUntilCancelled {
    for (msg in channel) {
    try {
    processMsg(msg)
    2 changes: 1 addition & 1 deletion SqsProducer.kt
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    package com.jivimberg.sqs
    package com.jivimberg.sqs.published

    import kotlinx.coroutines.delay
    import kotlinx.coroutines.runBlocking
  2. @jivimberg jivimberg created this gist Mar 11, 2019.
    13 changes: 13 additions & 0 deletions FunUtils.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,13 @@
    package com.jivimberg.sqs

    internal inline fun runForever(block: () -> Unit): Nothing {
    while (true) {
    try {
    block()
    } catch (ex: Exception) {
    println("${Thread.currentThread().name} failed with exception. Retrying...")
    ex.printStackTrace()
    continue
    }
    }
    }
    99 changes: 99 additions & 0 deletions SqsConsumer.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,99 @@
    package com.jivimberg.sqs

    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.channels.ReceiveChannel
    import kotlinx.coroutines.channels.SendChannel
    import kotlinx.coroutines.future.await
    import software.amazon.awssdk.regions.Region
    import software.amazon.awssdk.services.sqs.SqsAsyncClient
    import software.amazon.awssdk.services.sqs.model.Message
    import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
    import kotlin.coroutines.CoroutineContext

    class SqsSampleConsumerChannels(
    private val sqs: SqsAsyncClient
    ) : CoroutineScope {

    private val supervisorJob = SupervisorJob()
    override val coroutineContext: CoroutineContext
    get() = Dispatchers.IO + supervisorJob

    fun start() = launch {
    val messageChannel = Channel<Message>()
    repeat(N_WORKERS) { launchWorker(messageChannel) }
    launchMsgReceiver(messageChannel)
    }

    fun stop() {
    supervisorJob.cancel()
    }

    private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch {
    runForever {
    val receiveRequest = ReceiveMessageRequest.builder()
    .queueUrl(SQS_URL)
    .waitTimeSeconds(20)
    .maxNumberOfMessages(10)
    .build()

    val messages = sqs.receiveMessage(receiveRequest).await().messages()
    println("${Thread.currentThread().name} Retrieved ${messages.size} messages")

    messages.forEach {
    channel.send(it)
    }
    }
    }

    private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch {
    runForever {
    for (msg in channel) {
    try {
    processMsg(msg)
    deleteMessage(msg)
    } catch (ex: Exception) {
    println("${Thread.currentThread().name} exception trying to process message ${msg.body()}")
    ex.printStackTrace()
    changeVisibility(msg)
    }
    }
    }
    }

    private suspend fun processMsg(message: Message) {
    println("${Thread.currentThread().name} Started processing message: ${message.body()}")
    delay((1000L..2000L).random())
    println("${Thread.currentThread().name} Finished processing of message: ${message.body()}")
    }

    private suspend fun deleteMessage(message: Message) {
    sqs.deleteMessage { req ->
    req.queueUrl(SQS_URL)
    req.receiptHandle(message.receiptHandle())
    }.await()
    println("${Thread.currentThread().name} Message deleted: ${message.body()}")
    }

    private suspend fun changeVisibility(message: Message) {
    sqs.changeMessageVisibility { req ->
    req.queueUrl(SQS_URL)
    req.receiptHandle(message.receiptHandle())
    req.visibilityTimeout(10)
    }.await()
    println("${Thread.currentThread().name} Changed visibility of message: ${message.body()}")
    }
    }

    fun main() = runBlocking {
    println("${Thread.currentThread().name} Starting program")
    val sqs = SqsAsyncClient.builder()
    .region(Region.US_EAST_1)
    .build()
    val consumer = SqsSampleConsumerChannels(sqs)
    consumer.start()
    delay(30000)
    consumer.stop()
    }

    private const val N_WORKERS = 4
    28 changes: 28 additions & 0 deletions SqsProducer.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,28 @@
    package com.jivimberg.sqs

    import kotlinx.coroutines.delay
    import kotlinx.coroutines.runBlocking
    import software.amazon.awssdk.regions.Region
    import software.amazon.awssdk.services.sqs.SqsClient
    import software.amazon.awssdk.services.sqs.model.SendMessageRequest

    fun main() = runBlocking {
    val sqs = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build()

    var id = 0

    while (true) {
    id++

    val sendMsgRequest = SendMessageRequest.builder()
    .queueUrl(SQS_URL)
    .messageBody("hello world $id")
    .build()

    sqs.sendMessage(sendMsgRequest)
    println("Message sent with id: $id")
    delay((1000L..5000L).random())
    }
    }