Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save loihd/d2c0329a31880705b6a446efd92a722a to your computer and use it in GitHub Desktop.
Save loihd/d2c0329a31880705b6a446efd92a722a to your computer and use it in GitHub Desktop.

Revisions

  1. @naturalwarren naturalwarren revised this gist Jan 28, 2019. No changes.
  2. @naturalwarren naturalwarren revised this gist Jan 24, 2019. 1 changed file with 14 additions and 1 deletion.
    15 changes: 14 additions & 1 deletion NetworkResponseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -49,7 +49,20 @@ internal class KotlinRxJava2CallAdapter<T : Any, U : Any>(
    val errorBody = when {
    error == null -> null
    error.contentLength() == 0L -> null
    else -> errorConverter.convert(error)
    else -> {
    try {
    errorConverter.convert(error)
    } catch (e: Exception) {
    return@Function Observable.just(
    NetworkResponse.NetworkError(
    IOException(
    "Couldn't deserialize error body: ${error.string()}",
    e
    )
    )
    )
    }
    }
    }
    val serverError = NetworkResponse.ServerError(
    errorBody,
  3. @naturalwarren naturalwarren revised this gist Jan 23, 2019. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion NetworkResponse.kt
    Original file line number Diff line number Diff line change
    @@ -23,7 +23,6 @@ import java.io.IOException
    *
    * @param T success body type for 2xx response.
    * @param U error body type for non-2xx response.
    *
    */
    sealed class NetworkResponse<out T : Any, out U : Any> {

  4. @naturalwarren naturalwarren revised this gist Jan 23, 2019. 6 changed files with 147 additions and 244 deletions.
    40 changes: 0 additions & 40 deletions CoinbaseResponse.kt
    Original file line number Diff line number Diff line change
    @@ -1,40 +0,0 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import java.io.IOException

    /**
    * A response from the backend.
    *
    * In the event that a 2xx is returned [isSuccessful] will be set to true and [body] will be set.
    * In the event that a non-2xx response is returned [isSuccessful] will be set to false and
    * [errorBody] will be set.
    * In the event that a request did not result in a response from the monorail [networkError] will
    * be set.
    */
    class CoinbaseResponse<T : Any, U : Any>(
    response: Response<T>?,
    val errorBody: U?,
    val networkError: IOException?
    ) {

    val body: T? = response?.body()
    val isSuccessful = response?.isSuccessful ?: false
    val code: Int? = response?.code()
    }
    77 changes: 0 additions & 77 deletions CoinbaseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -1,77 +0,0 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import io.reactivex.Observable
    import io.reactivex.functions.Function
    import okhttp3.ResponseBody
    import retrofit2.Call
    import retrofit2.CallAdapter
    import retrofit2.Converter
    import java.io.IOException
    import java.lang.reflect.Type

    /**
    * Decorates stream emissions from [delegateAdapter] in [CoinbaseResponse].
    */
    internal class CoinbaseRxJava2CallAdapter(
    private val successBodyType: Type,
    private val delegateAdapter: CallAdapter<Any, Any>,
    private val errorConverter: Converter<ResponseBody, Any?>,
    private val supportedType: SupportedType<Any, Any>
    ) : CallAdapter<Any, Any> {

    override fun adapt(call: Call<Any>): Any {
    val stream = delegateAdapter.adapt(call)
    val coinbaseStream = supportedType.toObservable(stream)
    .map { response ->
    when {
    response.isSuccessful -> CoinbaseResponse<Any, Any>(
    response,
    response.errorBody(),
    null
    )
    else -> {
    val error = response.errorBody()
    val errorBody = when {
    error == null -> null
    error.contentLength() == 0L -> null
    else -> errorConverter.convert(error)
    }
    CoinbaseResponse(
    response,
    errorBody,
    null
    )
    }
    }
    }.onErrorResumeNext(
    Function<Throwable, Observable<CoinbaseResponse<Any, Any>>> { throwable ->
    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    null,
    null,
    throwable as IOException
    )
    Observable.just(response)
    })

    return supportedType.fromObservable(coinbaseStream)
    }

    override fun responseType(): Type = successBodyType
    }
    54 changes: 22 additions & 32 deletions CoinbaseRxJava2CallAdapterFactory.kt
    Original file line number Diff line number Diff line change
    @@ -14,43 +14,33 @@
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import com.squareup.moshi.Types
    import io.reactivex.Completable
    import io.reactivex.Flowable
    import io.reactivex.Maybe
    import io.reactivex.Observable
    import io.reactivex.Single
    import retrofit2.CallAdapter
    import retrofit2.Response
    import retrofit2.Retrofit
    import java.lang.reflect.ParameterizedType
    import java.lang.reflect.Type

    /**
    * A [CallAdapter.Factory] which allows [CoinbaseResponse] objects to be returned from RxJava
    * A [CallAdapter.Factory] which allows [NetworkResponse] objects to be returned from RxJava
    * streams.
    *
    * Adding this class to [Retrofit] allows you to return [Observable], [Flowable], [Single], or
    * [Maybe] types parameterized with [CoinbaseResponse] from service methods. This adapter must be
    * registered before an adapter that is capable of adapting RxJava streams.
    *
    * For the type [Observable<CoinbaseResponse<SuccessBody, ErrorBody>>], the following semantics are
    * provided:
    * [Maybe] types parameterized with [NetworkResponse] from service methods.
    *
    * 1. 2xx responses call onNext with the deserialized body set as [CoinbaseResponse.body]
    * 2. non-2xx responses call onNext with the deserialized error body set as
    * [CoinbaseResponse.errorBody]
    * 3. Calls that fail due to network issues call onNext with network errors set as
    * [CoinbaseResponse.networkError]
    * Note: This adapter must be registered before an adapter that is capable of adapting RxJava
    * streams.
    */
    class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Factory() {
    class KotlinRxJava2CallAdapterFactory private constructor() : CallAdapter.Factory() {

    companion object {
    @JvmStatic
    fun create() = CoinbaseRxJava2CallAdapterFactory()
    fun create() = KotlinRxJava2CallAdapterFactory()
    }

    override fun get(
    @@ -60,15 +50,13 @@ class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Fact
    ): CallAdapter<*, *>? {
    val rawType = getRawType(returnType)

    if (rawType == Completable::class.java) {
    // This type can't be parameterized with CoinbaseResponse, let another adapter
    // handle it.
    val isFlowable = rawType === Flowable::class.java
    val isSingle = rawType === Single::class.java
    val isMaybe = rawType === Maybe::class.java
    if (rawType !== Observable::class.java && !isFlowable && !isSingle && !isMaybe) {
    return null
    }

    // Check if we support this type, if not let another adapter handle it.
    val supportedType = SupportedType.create(rawType) ?: return null

    if (returnType !is ParameterizedType) {
    throw IllegalStateException(
    "${rawType.simpleName} return type must be parameterized as " +
    @@ -77,20 +65,20 @@ class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Fact
    }

    val observableEmissionType = getParameterUpperBound(0, returnType)
    if (getRawType(observableEmissionType) != CoinbaseResponse::class.java) {
    if (getRawType(observableEmissionType) != NetworkResponse::class.java) {
    return null
    }

    if (observableEmissionType !is ParameterizedType) {
    throw IllegalStateException(
    "CoinbaseResponse must be parameterized as CoinbaseResponse<SuccessBody, ErrorBody>"
    "NetworkResponse must be parameterized as NetworkResponse<SuccessBody, ErrorBody>"
    )
    }

    val successBodyType = getParameterUpperBound(0, observableEmissionType)
    val delegateType = Types.newParameterizedType(
    getRawType(returnType),
    Types.newParameterizedType(Response::class.java, successBodyType)
    Observable::class.java,
    successBodyType
    )
    val delegateAdapter = retrofit.nextCallAdapter(
    this,
    @@ -99,18 +87,20 @@ class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Fact
    )

    val errorBodyType = getParameterUpperBound(1, observableEmissionType)
    val errorBodyConverter = retrofit.nextResponseBodyConverter<Any?>(
    val errorBodyConverter = retrofit.nextResponseBodyConverter<Any>(
    null,
    errorBodyType,
    annotations
    )

    @Suppress("UNCHECKED_CAST") // CallAdapter type is not known at compile time.
    return CoinbaseRxJava2CallAdapter(
    @Suppress("UNCHECKED_CAST") // Type of delegateAdapter is not known at compile time.
    return KotlinRxJava2CallAdapter(
    successBodyType,
    delegateAdapter as CallAdapter<Any, Any>,
    delegateAdapter as CallAdapter<Any, Observable<Any>>,
    errorBodyConverter,
    supportedType as SupportedType<Any, Any>
    isFlowable,
    isSingle,
    isMaybe
    )
    }
    }
    }
    44 changes: 44 additions & 0 deletions NetworkResponse.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package com.coinbase.network.adapter

    import java.io.IOException

    /**
    * Represents the result of making a network request.
    *
    * @param T success body type for 2xx response.
    * @param U error body type for non-2xx response.
    *
    */
    sealed class NetworkResponse<out T : Any, out U : Any> {

    /**
    * A request that resulted in a response with a 2xx status code that has a body.
    */
    data class Success<T : Any>(val body: T) : NetworkResponse<T, Nothing>()

    /**
    * A request that resulted in a response with a non-2xx status code.
    */
    data class ServerError<U : Any>(val body: U?, val code: Int) : NetworkResponse<Nothing, U>()

    /**
    * A request that didn't result in a response.
    */
    data class NetworkError(val error: IOException) : NetworkResponse<Nothing, Nothing>()
    }
    81 changes: 81 additions & 0 deletions NetworkResponseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package com.coinbase.network.adapter

    import io.reactivex.BackpressureStrategy
    import io.reactivex.Observable
    import io.reactivex.functions.Function
    import okhttp3.ResponseBody
    import retrofit2.Call
    import retrofit2.CallAdapter
    import retrofit2.Converter
    import retrofit2.HttpException
    import java.io.IOException
    import java.lang.reflect.Type

    internal class KotlinRxJava2CallAdapter<T : Any, U : Any>(
    private val successBodyType: Type,
    private val delegateAdapter: CallAdapter<T, Observable<T>>,
    private val errorConverter: Converter<ResponseBody, U>,
    private val isFlowable: Boolean,
    private val isSingle: Boolean,
    private val isMaybe: Boolean
    ) : CallAdapter<T, Any> {

    override fun adapt(call: Call<T>): Any =
    delegateAdapter.adapt(call)
    .flatMap {
    Observable.just<NetworkResponse<T, U>>(NetworkResponse.Success(it))
    }
    .onErrorResumeNext(
    Function<Throwable, Observable<NetworkResponse<T, U>>> { throwable ->
    when (throwable) {
    is HttpException -> {
    val error = throwable.response().errorBody()
    val errorBody = when {
    error == null -> null
    error.contentLength() == 0L -> null
    else -> errorConverter.convert(error)
    }
    val serverError = NetworkResponse.ServerError(
    errorBody,
    throwable.response().code()
    )
    Observable.just(serverError)
    }
    is IOException -> {
    Observable.just(
    NetworkResponse.NetworkError(
    throwable
    )
    )
    }
    else -> {
    throw throwable
    }
    }
    }).run {
    when {
    isFlowable -> this.toFlowable(BackpressureStrategy.LATEST)
    isSingle -> this.singleOrError()
    isMaybe -> this.singleElement()
    else -> this
    }
    }

    override fun responseType(): Type = successBodyType
    }
    95 changes: 0 additions & 95 deletions SupportedType.kt
    Original file line number Diff line number Diff line change
    @@ -1,95 +0,0 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import io.reactivex.BackpressureStrategy
    import io.reactivex.Flowable
    import io.reactivex.Maybe
    import io.reactivex.Observable
    import io.reactivex.Single
    import retrofit2.Response

    /**
    * Types that [CoinbaseRxJava2CallAdapterFactory] supports. Performs conversions between reactive
    * types.
    *
    * @param I type that our delegate adapter hands us, ie Single<SuccessBody>
    * @param O type that our caller requested, ie Single<CoinbaseResponse<SuccessBody,ErrorBody>>
    */
    internal sealed class SupportedType<I, O> {

    abstract fun toObservable(retrofitStream: I): Observable<Response<Any>>

    abstract fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>): O

    companion object Factory {

    fun create(clazz: Class<*>): SupportedType<*, *>? {
    return when (clazz) {
    Observable::class.java -> ObservableType
    Single::class.java -> SingleType
    Maybe::class.java -> MaybeType
    Flowable::class.java -> FlowableType
    else -> null
    }
    }
    }
    }

    internal object ObservableType :
    SupportedType<Observable<Response<Any>>, Observable<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Observable<Response<Any>>):
    Observable<Response<Any>> = retrofitStream

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Observable<CoinbaseResponse<Any, Any>> = observable
    }

    internal object SingleType :
    SupportedType<Single<Response<Any>>, Single<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Single<Response<Any>>): Observable<Response<Any>> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>):
    Single<CoinbaseResponse<Any, Any>> =
    observable.singleOrError()
    }

    internal object MaybeType :
    SupportedType<Maybe<Response<Any>>, Maybe<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Maybe<Response<Any>>): Observable<Response<Any>> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Maybe<CoinbaseResponse<Any, Any>> =
    observable.singleElement()
    }

    internal object FlowableType :
    SupportedType<Flowable<Response<Any>>, Flowable<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Flowable<Response<Any>>): Observable<Response<Any>> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Flowable<CoinbaseResponse<Any, Any>> =
    observable.toFlowable(BackpressureStrategy.LATEST)
    }
  5. @naturalwarren naturalwarren revised this gist Jan 9, 2019. No changes.
  6. @naturalwarren naturalwarren revised this gist Jan 9, 2019. 4 changed files with 63 additions and 54 deletions.
    12 changes: 8 additions & 4 deletions CoinbaseResponse.kt
    Original file line number Diff line number Diff line change
    @@ -28,9 +28,13 @@ import java.io.IOException
    * In the event that a request did not result in a response from the monorail [networkError] will
    * be set.
    */
    data class CoinbaseResponse<T : Any, U : Any>(
    val isSuccessful: Boolean,
    val body: T?,
    class CoinbaseResponse<T : Any, U : Any>(
    response: Response<T>?,
    val errorBody: U?,
    val networkError: IOException?
    )
    ) {

    val body: T? = response?.body()
    val isSuccessful = response?.isSuccessful ?: false
    val code: Int? = response?.code()
    }
    73 changes: 34 additions & 39 deletions CoinbaseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -23,60 +23,55 @@ import okhttp3.ResponseBody
    import retrofit2.Call
    import retrofit2.CallAdapter
    import retrofit2.Converter
    import retrofit2.HttpException
    import java.io.IOException
    import java.lang.reflect.Type

    /**
    * Decorates stream emissions from [delegateAdapter] in [CoinbaseResponse].
    */
    internal class CoinbaseRxJava2CallAdapter(
    private val successBodyType: Type,
    private val delegateAdapter: CallAdapter<Any, Any>,
    private val errorConverterFactory: Converter<ResponseBody, Any?>,
    private val errorConverter: Converter<ResponseBody, Any?>,
    private val supportedType: SupportedType<Any, Any>
    ) : CallAdapter<Any, Any> {

    override fun adapt(call: Call<Any>): Any {
    val retrofitStream = delegateAdapter.adapt(call)

    val coinbaseStream = supportedType.toObservable(retrofitStream)
    .map {
    CoinbaseResponse<Any, Any>(true, it, null, null)
    }
    .onErrorResumeNext(
    Function<Throwable, Observable<CoinbaseResponse<Any, Any>>> { throwable ->
    when (throwable) {
    is IOException -> {
    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    false,
    null,
    null,
    throwable
    )
    Observable.just(response)
    }
    is HttpException -> {
    val error = throwable.response().errorBody()
    val errorBody = when {
    error == null -> null
    error.contentLength() == 0L -> null
    else -> errorConverterFactory.convert(error)
    }

    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    false,
    null,
    errorBody,
    null
    )
    Observable.just(response)
    }
    else -> {
    throw IllegalStateException("Unrecognized exception.")
    val stream = delegateAdapter.adapt(call)
    val coinbaseStream = supportedType.toObservable(stream)
    .map { response ->
    when {
    response.isSuccessful -> CoinbaseResponse<Any, Any>(
    response,
    response.errorBody(),
    null
    )
    else -> {
    val error = response.errorBody()
    val errorBody = when {
    error == null -> null
    error.contentLength() == 0L -> null
    else -> errorConverter.convert(error)
    }
    CoinbaseResponse(
    response,
    errorBody,
    null
    )
    }
    }
    }.onErrorResumeNext(
    Function<Throwable, Observable<CoinbaseResponse<Any, Any>>> { throwable ->
    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    null,
    null,
    throwable as IOException
    )
    Observable.just(response)
    })

    return supportedType.fromObservable(coinbaseStream)
    }

    override fun responseType(): Type = successBodyType
    }
    }
    8 changes: 6 additions & 2 deletions CoinbaseRxJava2CallAdapterFactory.kt
    Original file line number Diff line number Diff line change
    @@ -24,6 +24,7 @@ import io.reactivex.Maybe
    import io.reactivex.Observable
    import io.reactivex.Single
    import retrofit2.CallAdapter
    import retrofit2.Response
    import retrofit2.Retrofit
    import java.lang.reflect.ParameterizedType
    import java.lang.reflect.Type
    @@ -87,7 +88,10 @@ class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Fact
    }

    val successBodyType = getParameterUpperBound(0, observableEmissionType)
    val delegateType = Types.newParameterizedType(getRawType(returnType), successBodyType)
    val delegateType = Types.newParameterizedType(
    getRawType(returnType),
    Types.newParameterizedType(Response::class.java, successBodyType)
    )
    val delegateAdapter = retrofit.nextCallAdapter(
    this,
    delegateType,
    @@ -109,4 +113,4 @@ class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Fact
    supportedType as SupportedType<Any, Any>
    )
    }
    }
    }
    24 changes: 15 additions & 9 deletions SupportedType.kt
    Original file line number Diff line number Diff line change
    @@ -22,6 +22,7 @@ import io.reactivex.Flowable
    import io.reactivex.Maybe
    import io.reactivex.Observable
    import io.reactivex.Single
    import retrofit2.Response

    /**
    * Types that [CoinbaseRxJava2CallAdapterFactory] supports. Performs conversions between reactive
    @@ -32,7 +33,7 @@ import io.reactivex.Single
    */
    internal sealed class SupportedType<I, O> {

    abstract fun toObservable(retrofitStream: I): Observable<Any>
    abstract fun toObservable(retrofitStream: I): Observable<Response<Any>>

    abstract fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>): O

    @@ -50,37 +51,42 @@ internal sealed class SupportedType<I, O> {
    }
    }

    internal object ObservableType : SupportedType<Observable<Any>, Observable<CoinbaseResponse<Any, Any>>>() {
    internal object ObservableType :
    SupportedType<Observable<Response<Any>>, Observable<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Observable<Any>): Observable<Any> = retrofitStream
    override fun toObservable(retrofitStream: Observable<Response<Any>>):
    Observable<Response<Any>> = retrofitStream

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Observable<CoinbaseResponse<Any, Any>> = observable
    }

    internal object SingleType : SupportedType<Single<Any>, Single<CoinbaseResponse<Any, Any>>>() {
    internal object SingleType :
    SupportedType<Single<Response<Any>>, Single<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Single<Any>): Observable<Any> =
    override fun toObservable(retrofitStream: Single<Response<Any>>): Observable<Response<Any>> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>):
    Single<CoinbaseResponse<Any, Any>> =
    observable.singleOrError()
    }

    internal object MaybeType : SupportedType<Maybe<Any>, Maybe<CoinbaseResponse<Any, Any>>>() {
    internal object MaybeType :
    SupportedType<Maybe<Response<Any>>, Maybe<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Maybe<Any>): Observable<Any> =
    override fun toObservable(retrofitStream: Maybe<Response<Any>>): Observable<Response<Any>> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Maybe<CoinbaseResponse<Any, Any>> =
    observable.singleElement()
    }

    internal object FlowableType : SupportedType<Flowable<Any>, Flowable<CoinbaseResponse<Any, Any>>>() {
    internal object FlowableType :
    SupportedType<Flowable<Response<Any>>, Flowable<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Flowable<Any>): Observable<Any> =
    override fun toObservable(retrofitStream: Flowable<Response<Any>>): Observable<Response<Any>> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
  7. @naturalwarren naturalwarren revised this gist Jan 9, 2019. 4 changed files with 69 additions and 1 deletion.
    19 changes: 18 additions & 1 deletion CoinbaseResponse.kt
    Original file line number Diff line number Diff line change
    @@ -1,9 +1,26 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import java.io.IOException

    /**
    * A response from the monorail.
    * A response from the backend.
    *
    * In the event that a 2xx is returned [isSuccessful] will be set to true and [body] will be set.
    * In the event that a non-2xx response is returned [isSuccessful] will be set to false and
    17 changes: 17 additions & 0 deletions CoinbaseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,20 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import io.reactivex.Observable
    17 changes: 17 additions & 0 deletions CoinbaseRxJava2CallAdapterFactory.kt
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,20 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import com.squareup.moshi.Types
    17 changes: 17 additions & 0 deletions SupportedType.kt
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,20 @@
    /**
    * Copyright 2019 Coinbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied. See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */

    package com.coinbase.network.adapter

    import io.reactivex.BackpressureStrategy
  8. @naturalwarren naturalwarren revised this gist Dec 10, 2018. 3 changed files with 28 additions and 16 deletions.
    2 changes: 1 addition & 1 deletion CoinbaseResponse.kt
    Original file line number Diff line number Diff line change
    @@ -16,4 +16,4 @@ data class CoinbaseResponse<T : Any, U : Any>(
    val body: T?,
    val errorBody: U?,
    val networkError: IOException?
    )
    )
    3 changes: 1 addition & 2 deletions CoinbaseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -25,8 +25,7 @@ internal class CoinbaseRxJava2CallAdapter(
    CoinbaseResponse<Any, Any>(true, it, null, null)
    }
    .onErrorResumeNext(
    Function<Throwable, Observable<CoinbaseResponse<Any, Any>>>
    { throwable ->
    Function<Throwable, Observable<CoinbaseResponse<Any, Any>>> { throwable ->
    when (throwable) {
    is IOException -> {
    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    39 changes: 26 additions & 13 deletions CoinbaseRxJava2CallAdapterFactory.kt
    Original file line number Diff line number Diff line change
    @@ -28,7 +28,12 @@ import java.lang.reflect.Type
    * 3. Calls that fail due to network issues call onNext with network errors set as
    * [CoinbaseResponse.networkError]
    */
    class CoinbaseRxJava2CallAdapterFactory : CallAdapter.Factory() {
    class CoinbaseRxJava2CallAdapterFactory private constructor() : CallAdapter.Factory() {

    companion object {
    @JvmStatic
    fun create() = CoinbaseRxJava2CallAdapterFactory()
    }

    override fun get(
    returnType: Type,
    @@ -44,20 +49,28 @@ class CoinbaseRxJava2CallAdapterFactory : CallAdapter.Factory() {
    }

    // Check if we support this type, if not let another adapter handle it.
    // Will throw an exception if the Rx type used or CoinbaseResponse is not parameterized.
    val supportedType = SupportedType.create(rawType) ?: return null

    val observableEmissionType = getParameterUpperBound(
    0,
    returnType as ParameterizedType
    )
    val successBodyType = getParameterUpperBound(
    0, observableEmissionType as ParameterizedType
    )
    val delegateType = Types.newParameterizedType(
    getRawType(returnType),
    successBodyType
    )
    if (returnType !is ParameterizedType) {
    throw IllegalStateException(
    "${rawType.simpleName} return type must be parameterized as " +
    "${rawType.simpleName}<Foo> or ${rawType.simpleName}<? extends Foo>"
    )
    }

    val observableEmissionType = getParameterUpperBound(0, returnType)
    if (getRawType(observableEmissionType) != CoinbaseResponse::class.java) {
    return null
    }

    if (observableEmissionType !is ParameterizedType) {
    throw IllegalStateException(
    "CoinbaseResponse must be parameterized as CoinbaseResponse<SuccessBody, ErrorBody>"
    )
    }

    val successBodyType = getParameterUpperBound(0, observableEmissionType)
    val delegateType = Types.newParameterizedType(getRawType(returnType), successBodyType)
    val delegateAdapter = retrofit.nextCallAdapter(
    this,
    delegateType,
  9. @naturalwarren naturalwarren revised this gist Dec 10, 2018. 3 changed files with 220 additions and 0 deletions.
    66 changes: 66 additions & 0 deletions CoinbaseRxJava2CallAdapter.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    package com.coinbase.network.adapter

    import io.reactivex.Observable
    import io.reactivex.functions.Function
    import okhttp3.ResponseBody
    import retrofit2.Call
    import retrofit2.CallAdapter
    import retrofit2.Converter
    import retrofit2.HttpException
    import java.io.IOException
    import java.lang.reflect.Type

    internal class CoinbaseRxJava2CallAdapter(
    private val successBodyType: Type,
    private val delegateAdapter: CallAdapter<Any, Any>,
    private val errorConverterFactory: Converter<ResponseBody, Any?>,
    private val supportedType: SupportedType<Any, Any>
    ) : CallAdapter<Any, Any> {

    override fun adapt(call: Call<Any>): Any {
    val retrofitStream = delegateAdapter.adapt(call)

    val coinbaseStream = supportedType.toObservable(retrofitStream)
    .map {
    CoinbaseResponse<Any, Any>(true, it, null, null)
    }
    .onErrorResumeNext(
    Function<Throwable, Observable<CoinbaseResponse<Any, Any>>>
    { throwable ->
    when (throwable) {
    is IOException -> {
    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    false,
    null,
    null,
    throwable
    )
    Observable.just(response)
    }
    is HttpException -> {
    val error = throwable.response().errorBody()
    val errorBody = when {
    error == null -> null
    error.contentLength() == 0L -> null
    else -> errorConverterFactory.convert(error)
    }

    val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
    false,
    null,
    errorBody,
    null
    )
    Observable.just(response)
    }
    else -> {
    throw IllegalStateException("Unrecognized exception.")
    }
    }
    })

    return supportedType.fromObservable(coinbaseStream)
    }

    override fun responseType(): Type = successBodyType
    }
    82 changes: 82 additions & 0 deletions CoinbaseRxJava2CallAdapterFactory.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,82 @@
    package com.coinbase.network.adapter

    import com.squareup.moshi.Types
    import io.reactivex.Completable
    import io.reactivex.Flowable
    import io.reactivex.Maybe
    import io.reactivex.Observable
    import io.reactivex.Single
    import retrofit2.CallAdapter
    import retrofit2.Retrofit
    import java.lang.reflect.ParameterizedType
    import java.lang.reflect.Type

    /**
    * A [CallAdapter.Factory] which allows [CoinbaseResponse] objects to be returned from RxJava
    * streams.
    *
    * Adding this class to [Retrofit] allows you to return [Observable], [Flowable], [Single], or
    * [Maybe] types parameterized with [CoinbaseResponse] from service methods. This adapter must be
    * registered before an adapter that is capable of adapting RxJava streams.
    *
    * For the type [Observable<CoinbaseResponse<SuccessBody, ErrorBody>>], the following semantics are
    * provided:
    *
    * 1. 2xx responses call onNext with the deserialized body set as [CoinbaseResponse.body]
    * 2. non-2xx responses call onNext with the deserialized error body set as
    * [CoinbaseResponse.errorBody]
    * 3. Calls that fail due to network issues call onNext with network errors set as
    * [CoinbaseResponse.networkError]
    */
    class CoinbaseRxJava2CallAdapterFactory : CallAdapter.Factory() {

    override fun get(
    returnType: Type,
    annotations: Array<Annotation>,
    retrofit: Retrofit
    ): CallAdapter<*, *>? {
    val rawType = getRawType(returnType)

    if (rawType == Completable::class.java) {
    // This type can't be parameterized with CoinbaseResponse, let another adapter
    // handle it.
    return null
    }

    // Check if we support this type, if not let another adapter handle it.
    // Will throw an exception if the Rx type used or CoinbaseResponse is not parameterized.
    val supportedType = SupportedType.create(rawType) ?: return null

    val observableEmissionType = getParameterUpperBound(
    0,
    returnType as ParameterizedType
    )
    val successBodyType = getParameterUpperBound(
    0, observableEmissionType as ParameterizedType
    )
    val delegateType = Types.newParameterizedType(
    getRawType(returnType),
    successBodyType
    )
    val delegateAdapter = retrofit.nextCallAdapter(
    this,
    delegateType,
    annotations
    )

    val errorBodyType = getParameterUpperBound(1, observableEmissionType)
    val errorBodyConverter = retrofit.nextResponseBodyConverter<Any?>(
    null,
    errorBodyType,
    annotations
    )

    @Suppress("UNCHECKED_CAST") // CallAdapter type is not known at compile time.
    return CoinbaseRxJava2CallAdapter(
    successBodyType,
    delegateAdapter as CallAdapter<Any, Any>,
    errorBodyConverter,
    supportedType as SupportedType<Any, Any>
    )
    }
    }
    72 changes: 72 additions & 0 deletions SupportedType.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,72 @@
    package com.coinbase.network.adapter

    import io.reactivex.BackpressureStrategy
    import io.reactivex.Flowable
    import io.reactivex.Maybe
    import io.reactivex.Observable
    import io.reactivex.Single

    /**
    * Types that [CoinbaseRxJava2CallAdapterFactory] supports. Performs conversions between reactive
    * types.
    *
    * @param I type that our delegate adapter hands us, ie Single<SuccessBody>
    * @param O type that our caller requested, ie Single<CoinbaseResponse<SuccessBody,ErrorBody>>
    */
    internal sealed class SupportedType<I, O> {

    abstract fun toObservable(retrofitStream: I): Observable<Any>

    abstract fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>): O

    companion object Factory {

    fun create(clazz: Class<*>): SupportedType<*, *>? {
    return when (clazz) {
    Observable::class.java -> ObservableType
    Single::class.java -> SingleType
    Maybe::class.java -> MaybeType
    Flowable::class.java -> FlowableType
    else -> null
    }
    }
    }
    }

    internal object ObservableType : SupportedType<Observable<Any>, Observable<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Observable<Any>): Observable<Any> = retrofitStream

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Observable<CoinbaseResponse<Any, Any>> = observable
    }

    internal object SingleType : SupportedType<Single<Any>, Single<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Single<Any>): Observable<Any> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>):
    Single<CoinbaseResponse<Any, Any>> =
    observable.singleOrError()
    }

    internal object MaybeType : SupportedType<Maybe<Any>, Maybe<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Maybe<Any>): Observable<Any> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Maybe<CoinbaseResponse<Any, Any>> =
    observable.singleElement()
    }

    internal object FlowableType : SupportedType<Flowable<Any>, Flowable<CoinbaseResponse<Any, Any>>>() {

    override fun toObservable(retrofitStream: Flowable<Any>): Observable<Any> =
    retrofitStream.toObservable()

    override fun fromObservable(observable: Observable<CoinbaseResponse<Any, Any>>)
    : Flowable<CoinbaseResponse<Any, Any>> =
    observable.toFlowable(BackpressureStrategy.LATEST)
    }
  10. @naturalwarren naturalwarren created this gist Dec 10, 2018.
    19 changes: 19 additions & 0 deletions CoinbaseResponse.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,19 @@
    package com.coinbase.network.adapter

    import java.io.IOException

    /**
    * A response from the monorail.
    *
    * In the event that a 2xx is returned [isSuccessful] will be set to true and [body] will be set.
    * In the event that a non-2xx response is returned [isSuccessful] will be set to false and
    * [errorBody] will be set.
    * In the event that a request did not result in a response from the monorail [networkError] will
    * be set.
    */
    data class CoinbaseResponse<T : Any, U : Any>(
    val isSuccessful: Boolean,
    val body: T?,
    val errorBody: U?,
    val networkError: IOException?
    )