-
-
Save ahmedomarjee/3b37c17a26884956c3bb9561227fd00a to your computer and use it in GitHub Desktop.
Revisions
-
jivimberg revised this gist
Mar 18, 2019 . 4 changed files with 28 additions and 17 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,23 @@ package com.jivimberg.sqs.published import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive import kotlinx.coroutines.yield import java.lang.Thread.currentThread suspend fun CoroutineScope.repeatUntilCancelled(block: suspend () -> Unit) { while (isActive) { try { block() yield() } catch (ex: CancellationException) { println("coroutine on ${currentThread().name} cancelled") } catch (ex: Exception) { println("${currentThread().name} failed with {$ex}. Retrying...") ex.printStackTrace() } } println("coroutine on ${currentThread().name} exiting") } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,13 +0,0 @@ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,6 @@ package com.jivimberg.sqs.published import com.jivimberg.sqs.SQS_URL import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel @@ -30,7 +31,7 @@ class SqsSampleConsumerChannels( } private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch { repeatUntilCancelled { val receiveRequest = ReceiveMessageRequest.builder() .queueUrl(SQS_URL) .waitTimeSeconds(20) @@ -47,7 +48,7 @@ class SqsSampleConsumerChannels( } private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch { repeatUntilCancelled { for (msg in channel) { try { processMsg(msg) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,4 @@ package com.jivimberg.sqs.published import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking -
jivimberg created this gist
Mar 11, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,13 @@ package com.jivimberg.sqs internal inline fun runForever(block: () -> Unit): Nothing { while (true) { try { block() } catch (ex: Exception) { println("${Thread.currentThread().name} failed with exception. Retrying...") ex.printStackTrace() continue } } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,99 @@ package com.jivimberg.sqs import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.future.await import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest import kotlin.coroutines.CoroutineContext class SqsSampleConsumerChannels( private val sqs: SqsAsyncClient ) : CoroutineScope { private val supervisorJob = SupervisorJob() override val coroutineContext: CoroutineContext get() = Dispatchers.IO + supervisorJob fun start() = launch { val messageChannel = Channel<Message>() repeat(N_WORKERS) { launchWorker(messageChannel) } launchMsgReceiver(messageChannel) } fun stop() { supervisorJob.cancel() } private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch { runForever { val receiveRequest = ReceiveMessageRequest.builder() .queueUrl(SQS_URL) .waitTimeSeconds(20) .maxNumberOfMessages(10) .build() val messages = sqs.receiveMessage(receiveRequest).await().messages() println("${Thread.currentThread().name} Retrieved ${messages.size} messages") messages.forEach { channel.send(it) } } } private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch { runForever { for (msg in channel) { try { processMsg(msg) deleteMessage(msg) } catch (ex: Exception) { println("${Thread.currentThread().name} exception trying to process message ${msg.body()}") ex.printStackTrace() changeVisibility(msg) } } } } private suspend fun processMsg(message: Message) { println("${Thread.currentThread().name} Started processing message: ${message.body()}") delay((1000L..2000L).random()) println("${Thread.currentThread().name} Finished processing of message: ${message.body()}") } private suspend fun deleteMessage(message: Message) { sqs.deleteMessage { req -> req.queueUrl(SQS_URL) req.receiptHandle(message.receiptHandle()) }.await() println("${Thread.currentThread().name} Message deleted: ${message.body()}") } private suspend fun changeVisibility(message: Message) { sqs.changeMessageVisibility { req -> req.queueUrl(SQS_URL) req.receiptHandle(message.receiptHandle()) req.visibilityTimeout(10) }.await() println("${Thread.currentThread().name} Changed visibility of message: ${message.body()}") } } fun main() = runBlocking { println("${Thread.currentThread().name} Starting program") val sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build() val consumer = SqsSampleConsumerChannels(sqs) consumer.start() delay(30000) consumer.stop() } private const val N_WORKERS = 4 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,28 @@ package com.jivimberg.sqs import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.SendMessageRequest fun main() = runBlocking { val sqs = SqsClient.builder() .region(Region.US_EAST_1) .build() var id = 0 while (true) { id++ val sendMsgRequest = SendMessageRequest.builder() .queueUrl(SQS_URL) .messageBody("hello world $id") .build() sqs.sendMessage(sendMsgRequest) println("Message sent with id: $id") delay((1000L..5000L).random()) } }