Skip to content

Instantly share code, notes, and snippets.

@samber
Last active October 20, 2025 11:31
Show Gist options
  • Save samber/db9ba8ea0a25f3cfce9d19e904ff2d8b to your computer and use it in GitHub Desktop.
Save samber/db9ba8ea0a25f3cfce9d19e904ff2d8b to your computer and use it in GitHub Desktop.
func MyCustomOperator[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(ro.Observable[T]) Observable[R] {
return func(source Observable[T]) Observable[R] {
return ro.NewUnsafeObservableWithContext(func(subscriberCtx context.Context, destination ro.Observer[R]) ro.Teardown {
// initialize external lib here (on subscription)
sub := source.SubscribeWithContext(
subscriberCtx,
ro.NewObserverWithContext(
func(ctx context.Context, value T) {
// handle message then pass message to the downstream operators:
// destination.Next(ctx, outputValue)
},
destination.ErrorWithContext,
destination.CompleteWithContext,
),
)
return func() {
sub.Unsubscribe()
// close lib on cancellation/error/completion
}
})
}
}
@samber
Copy link
Author

samber commented Oct 20, 2025

1- Yes, the initialization/unsubscription sticks to the main goroutine. The ObserveOn operator sends the downstream processing into a different goroutine, while SubscribeOn sends the upstream to a different goroutine.
2- In your example, you should use destination.NextWithContext(ctx, value) instead of destination.Next(value), because it breaks the context propagation.
3- In such an operator that returns the same type, you should have the following prototype: func MyCustomOperator[T any](operatorID string) func(ro.Observable[T]) ro.Observable[T]. It will allow you not to cast the value.

If you want to call runtime.LockOSThread() and runtime.UnlockOSThread() in the same goroutine, you should do this:

obs := ro.Pipe4(
    ro.Just("value-1", "value-2"),
    ro.SubscribeOn[string](2),
    ro.TapOnSubscribe(func() {
         runtime.LockOSThread()
    }),
    MyCustomOperator[string, string]("operator-1"),
    ro.TapOnFinalize(func() {
         runtime.UnlockOSThread()
    }),
    ro.SubscribeOn[string](2),
    ...
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment