Last active
          January 6, 2023 14:03 
        
      - 
      
- 
        Save sphrak/d3a60f33b3ba8f51a77be5fb77bf0e27 to your computer and use it in GitHub Desktop. 
Revisions
- 
        sphrak revised this gist Jan 6, 2023 . 1 changed file with 5 additions and 4 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,3 @@ @Serialized data class MessageDto( @SerialName(value = "id") @@ -7,15 +6,17 @@ data class MessageDto( val message: String ) class MessageService constructor( private val httpClient: HttpClient ) { private val sendMessageFlow: Flow<MessageDto> = MutableSharedFlow() fun sendMessage(messageDto) { sendMessageFlow.emit(messageDto) } fun messages(): Flow<MessageDto> = flow { httpClient.wss( method = HttpMethod.Get, host = getHostname(), @@ -28,7 +29,7 @@ class MessageService constructor(private val httpClient: HttpClient) { .map(::sendSerialized) .launchIn(this) while (true) { emit(receiveDeserialized<MessageDto>()) } } } 
- 
        sphrak created this gist Jan 6, 2023 .There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,37 @@ @Serialized data class MessageDto( @SerialName(value = "id") val id: String, @SerialName(value = "message") val message: String ) class MessageService constructor(private val httpClient: HttpClient) { private val sendMessageFlow: Flow<MessageDto> = MutableSharedFlow() fun sendMessage(messageDto) { sendMessageFlow.emit(messageDto) } fun messages(): Flow<List<MessageDto>> = flow { httpClient.wss( method = HttpMethod.Get, host = getHostname(), port = getPort(), path = "/v1/messages" ) { coroutineScope { sendMessageFlow .conflate() .map(::sendSerialized) .launchIn(this) while (true) { emit(receiveDeserialized<List<MessageDto>>()) } } } }.retryWithBackoff() .flowOn(coroutineDispatcher) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,34 @@ inline val defaultExceptions: Set<Class<out Exception>> get() = setOf( ClosedReceiveChannelException::class.java, ConnectException::class.java, UnresolvedAddressException::class.java ) /** * Retries a `Flow<T>` indefinitely with exponential backoff * * @param delay -- delay in seconds between retries * @param delayAtMost -- maximum delay in seconds after which this value is used to delay */ public fun <T> Flow<T>.retryWithBackoff( delay: Int = 1, delayAtMost: Int = 10, exceptions: Set<Class<out Exception>> = defaultExceptions ): Flow<T> { var _delay = delay return retryWhen { cause, attempt -> val shouldRetry = exceptions.any { (cause::class.java == it) } if (shouldRetry) { delay(_delay.seconds.inWholeMilliseconds) val next = _delay * 2 _delay = if (next < delayAtMost) { next } else delayAtMost true } else false } }