Skip to content

Instantly share code, notes, and snippets.

@athlan
Created July 11, 2019 14:58
Show Gist options
  • Save athlan/b357b718a28a9ea83973ed9ff25eb77d to your computer and use it in GitHub Desktop.
Save athlan/b357b718a28a9ea83973ed9ff25eb77d to your computer and use it in GitHub Desktop.

Revisions

  1. athlan created this gist Jul 11, 2019.
    81 changes: 81 additions & 0 deletions FlyweightConcurrentSupplier.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    package pl.athlan.common.concurrent;

    import static java.util.Objects.requireNonNull;

    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    import java.util.function.Supplier;

    /**
    * Thread-safe Supplier implementation of accessing decorated {@link Supplier}
    * in a way that only one thread can access it at the time and value obtained from decorated Supplier
    * is shared across waiting threads.
    *
    * <p>If no threads are waiting after execution, shared value is not stored.
    *
    * <p>Use case: When computation of value is memory consuming and result between obtaining it
    * within t0 and t+1 does not matter (it can be cached/shared). Crucial is to free up a memory after execution.
    *
    * @param <T> the type of results supplied by this supplier
    */
    public final class FlyweightConcurrentSupplier<T> implements Supplier<T> {


    private final Supplier<T> delegate;
    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
    private final AtomicInteger queueLength = new AtomicInteger();

    private T sharedValue;
    private volatile boolean sharedValueAvailable;

    public FlyweightConcurrentSupplier(Supplier<T> delegate) {
    this.delegate = requireNonNull(delegate, "delegate cannot be null");
    }

    @Override
    public T get() {
    queueLength.getAndIncrement();
    rwl.readLock().lock();

    if (!sharedValueAvailable) {
    // Must release read lock before acquiring write lock
    rwl.readLock().unlock();
    rwl.writeLock().lock();
    try {
    // Recheck state because another thread might have
    // acquired write lock and changed state before we did.
    if (!sharedValueAvailable) {
    sharedValue = delegate.get();
    sharedValueAvailable = true;
    }
    // Downgrade by acquiring read lock before releasing write lock
    rwl.readLock().lock();
    } finally {
    rwl.writeLock().unlock(); // Unlock write, still hold read
    }
    }

    try {
    T result = sharedValue;

    // No thread is waiting for shared value, so can be cleaned up.
    if (queueLength.decrementAndGet() == 0) {
    rwl.readLock().unlock();
    rwl.writeLock().lock();
    // Recheck if no thread joined at the meantime
    // and would potentially get null.
    if (queueLength.compareAndSet(0, 0)) {
    sharedValue = null;
    sharedValueAvailable = false;
    }
    rwl.readLock().lock();
    rwl.writeLock().unlock();
    }

    return result;
    } finally {
    rwl.readLock().unlock();
    }
    }
    }