Last active
October 20, 2025 11:31
-
-
Save samber/db9ba8ea0a25f3cfce9d19e904ff2d8b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func MyCustomOperator[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(ro.Observable[T]) Observable[R] { | |
| return func(source Observable[T]) Observable[R] { | |
| return ro.NewUnsafeObservableWithContext(func(subscriberCtx context.Context, destination ro.Observer[R]) ro.Teardown { | |
| // initialize external lib here (on subscription) | |
| sub := source.SubscribeWithContext( | |
| subscriberCtx, | |
| ro.NewObserverWithContext( | |
| func(ctx context.Context, value T) { | |
| // handle message then pass message to the downstream operators: | |
| // destination.Next(ctx, outputValue) | |
| }, | |
| destination.ErrorWithContext, | |
| destination.CompleteWithContext, | |
| ), | |
| ) | |
| return func() { | |
| sub.Unsubscribe() | |
| // close lib on cancellation/error/completion | |
| } | |
| }) | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
1- Yes, the initialization/unsubscription sticks to the main goroutine. The ObserveOn operator sends the downstream processing into a different goroutine, while SubscribeOn sends the upstream to a different goroutine.
2- In your example, you should use
destination.NextWithContext(ctx, value)instead ofdestination.Next(value), because it breaks the context propagation.3- In such an operator that returns the same type, you should have the following prototype:
func MyCustomOperator[T any](operatorID string) func(ro.Observable[T]) ro.Observable[T]. It will allow you not to cast the value.If you want to call
runtime.LockOSThread()andruntime.UnlockOSThread()in the same goroutine, you should do this: