Skip to content

Instantly share code, notes, and snippets.

@goeh
Created May 25, 2015 12:10
Show Gist options
  • Save goeh/b33363550de083ef541f to your computer and use it in GitHub Desktop.
Save goeh/b33363550de083ef541f to your computer and use it in GitHub Desktop.

Revisions

  1. goeh created this gist May 25, 2015.
    61 changes: 61 additions & 0 deletions ExtendedEvents.groovy
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    import groovy.transform.CompileStatic
    import org.springframework.beans.factory.annotation.Autowired
    import reactor.Environment
    import reactor.bus.Event
    import reactor.bus.EventBus
    import reactor.bus.registry.Registration
    import reactor.bus.registry.Registry
    import reactor.bus.selector.Selector
    import reactor.bus.selector.Selectors
    import reactor.rx.Promise
    import reactor.rx.Promises

    import java.util.concurrent.Callable
    import java.util.concurrent.CountDownLatch
    import java.util.concurrent.ForkJoinPool
    import java.util.concurrent.TimeUnit

    /**
    * A trait that adds sendAndCollect support to Grails 3.0.1 controllers.
    */
    @CompileStatic
    trait ExtendedEvents<T> {

    @Autowired
    EventBus eventBus

    @Autowired
    Environment reactorEnv

    private ForkJoinPool forkJoinPool = new ForkJoinPool(2)

    public Promise<Collection<T>> sendAndCollect(Object key, data) {
    sendAndCollect(key, data, 10L, TimeUnit.SECONDS)
    }

    public Promise<Collection<T>> sendAndCollect(Object key, Object data, long timeout, TimeUnit timeUnit) {
    final List<T> list = Collections.synchronizedList([])
    final Registry consumerRegistry = eventBus.getConsumerRegistry()
    final int registrations = consumerRegistry.select(key).size()
    final CountDownLatch latch = new CountDownLatch(registrations)
    final Selector sel = Selectors.anonymous()
    final Registration registration = eventBus.on(sel, { Event<T> ev ->
    list.add(ev.getData())
    latch.countDown()
    });

    final Promise<Collection<T>> p = Promises.prepare()

    forkJoinPool.submit({
    latch.await(timeout, timeUnit)

    p.accept(list)

    registration.cancel()
    } as Callable)

    eventBus.send key, Event.wrap(data, sel.getObject())

    p
    }
    }