Skip to content

Instantly share code, notes, and snippets.

@n1chre
Last active June 28, 2017 18:31
Show Gist options
  • Save n1chre/efc8da727a3ca623821ae4d470f82a7c to your computer and use it in GitHub Desktop.
Save n1chre/efc8da727a3ca623821ae4d470f82a7c to your computer and use it in GitHub Desktop.

Revisions

  1. Filip Hrenić revised this gist Jun 28, 2017. 1 changed file with 5 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions ProxyConsumer.java
    Original file line number Diff line number Diff line change
    @@ -13,6 +13,7 @@
    * To use this class, simply wrap your Consumer into this class.
    *
    * !!! TODO this class will leave a thread waiting if null isn't consumed after consuming relevant objects.
    * Or call finishConsuming() when done.
    *
    * Created by fhrenic on 28/06/2017.
    */
    @@ -57,6 +58,10 @@ public synchronized void accept(T element) {
    }
    notify();
    }

    public void finishConsuming() {
    noMoreElements.set(true);
    }

    private synchronized void startProcessing() {
    while (true) {
  2. Filip Hrenić revised this gist Jun 28, 2017. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions ProxyConsumer.java
    Original file line number Diff line number Diff line change
    @@ -11,6 +11,8 @@
    * much slower than produce(). That's why elements are queued and processed when they can be processed.
    *
    * To use this class, simply wrap your Consumer into this class.
    *
    * !!! TODO this class will leave a thread waiting if null isn't consumed after consuming relevant objects.
    *
    * Created by fhrenic on 28/06/2017.
    */
  3. Filip Hrenić created this gist Jun 28, 2017.
    90 changes: 90 additions & 0 deletions ProxyConsumer.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,90 @@
    package hrucc;

    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.function.Consumer;

    /**
    * This class should be used when some object is producing stuff for some consumer and consume() is
    * much slower than produce(). That's why elements are queued and processed when they can be processed.
    *
    * To use this class, simply wrap your Consumer into this class.
    *
    * Created by fhrenic on 28/06/2017.
    */
    public class ProxyConsumer<T> implements Consumer<T> {

    public static void main(String[] args) throws InterruptedException {
    ProxyConsumer<Integer> proxy = new ProxyConsumer<>(number -> {
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    System.out.println("can't sleep");
    return;
    }
    System.out.println("got number " + number);
    });

    for (int i = 0; i < 10; i++) {
    proxy.accept(i);
    }
    TimeUnit.SECONDS.sleep(20); // wait here
    proxy.accept(420);
    TimeUnit.SECONDS.sleep(5);
    proxy.accept(null);
    }

    private final Consumer<T> consumer;
    private final Queue<T> elements;
    private AtomicBoolean noMoreElements;

    public ProxyConsumer(Consumer<T> consumer) {
    this.consumer = consumer;
    elements = new LinkedList<>();
    noMoreElements = new AtomicBoolean(false);
    new Thread(this::startProcessing).start();
    }

    public synchronized void accept(T element) {
    if (element != null) {
    add(element);
    } else {
    noMoreElements.set(true);
    }
    notify();
    }

    private synchronized void startProcessing() {
    while (true) {
    while (hasElements()) {
    consumer.accept(take());
    }
    if (noMoreElements.get()) {
    return; // notify but no elements added
    }
    try {
    wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    return;
    }
    }
    }

    // queue operations

    private synchronized void add(T element) {
    elements.add(element);
    }

    private synchronized boolean hasElements() {
    return !elements.isEmpty();
    }

    private synchronized T take() {
    return elements.poll();
    }

    }