Skip to content

Instantly share code, notes, and snippets.

@sphrak
Last active January 6, 2023 14:03
Show Gist options
  • Save sphrak/d3a60f33b3ba8f51a77be5fb77bf0e27 to your computer and use it in GitHub Desktop.
Save sphrak/d3a60f33b3ba8f51a77be5fb77bf0e27 to your computer and use it in GitHub Desktop.

Revisions

  1. sphrak revised this gist Jan 6, 2023. 1 changed file with 5 additions and 4 deletions.
    9 changes: 5 additions & 4 deletions MessageService.kt
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@

    @Serialized
    data class MessageDto(
    @SerialName(value = "id")
    @@ -7,15 +6,17 @@ data class MessageDto(
    val message: String
    )

    class MessageService constructor(private val httpClient: HttpClient) {
    class MessageService constructor(
    private val httpClient: HttpClient
    ) {

    private val sendMessageFlow: Flow<MessageDto> = MutableSharedFlow()

    fun sendMessage(messageDto) {
    sendMessageFlow.emit(messageDto)
    }

    fun messages(): Flow<List<MessageDto>> = flow {
    fun messages(): Flow<MessageDto> = flow {
    httpClient.wss(
    method = HttpMethod.Get,
    host = getHostname(),
    @@ -28,7 +29,7 @@ class MessageService constructor(private val httpClient: HttpClient) {
    .map(::sendSerialized)
    .launchIn(this)
    while (true) {
    emit(receiveDeserialized<List<MessageDto>>())
    emit(receiveDeserialized<MessageDto>())
    }
    }
    }
  2. sphrak created this gist Jan 6, 2023.
    37 changes: 37 additions & 0 deletions MessageService.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,37 @@

    @Serialized
    data class MessageDto(
    @SerialName(value = "id")
    val id: String,
    @SerialName(value = "message")
    val message: String
    )

    class MessageService constructor(private val httpClient: HttpClient) {

    private val sendMessageFlow: Flow<MessageDto> = MutableSharedFlow()

    fun sendMessage(messageDto) {
    sendMessageFlow.emit(messageDto)
    }

    fun messages(): Flow<List<MessageDto>> = flow {
    httpClient.wss(
    method = HttpMethod.Get,
    host = getHostname(),
    port = getPort(),
    path = "/v1/messages"
    ) {
    coroutineScope {
    sendMessageFlow
    .conflate()
    .map(::sendSerialized)
    .launchIn(this)
    while (true) {
    emit(receiveDeserialized<List<MessageDto>>())
    }
    }
    }
    }.retryWithBackoff()
    .flowOn(coroutineDispatcher)
    }
    34 changes: 34 additions & 0 deletions retry.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,34 @@
    inline val defaultExceptions: Set<Class<out Exception>>
    get() = setOf(
    ClosedReceiveChannelException::class.java,
    ConnectException::class.java,
    UnresolvedAddressException::class.java
    )
    /**
    * Retries a `Flow<T>` indefinitely with exponential backoff
    *
    * @param delay -- delay in seconds between retries
    * @param delayAtMost -- maximum delay in seconds after which this value is used to delay
    */
    public fun <T> Flow<T>.retryWithBackoff(
    delay: Int = 1,
    delayAtMost: Int = 10,
    exceptions: Set<Class<out Exception>> = defaultExceptions
    ): Flow<T> {

    var _delay = delay

    return retryWhen { cause, attempt ->

    val shouldRetry = exceptions.any { (cause::class.java == it) }

    if (shouldRetry) {
    delay(_delay.seconds.inWholeMilliseconds)
    val next = _delay * 2
    _delay = if (next < delayAtMost) {
    next
    } else delayAtMost
    true
    } else false
    }
    }