Skip to content

Instantly share code, notes, and snippets.

@grondag
Last active February 25, 2019 20:45
Show Gist options
  • Select an option

  • Save grondag/e35afcfbf7dbe388f03eacc1ca467c2f to your computer and use it in GitHub Desktop.

Select an option

Save grondag/e35afcfbf7dbe388f03eacc1ca467c2f to your computer and use it in GitHub Desktop.

Revisions

  1. grondag renamed this gist Feb 25, 2019. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. grondag renamed this gist Feb 25, 2019. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  3. grondag created this gist Feb 25, 2019.
    205 changes: 205 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,205 @@
    public interface SimulationTickable
    {
    /**
    * If true, then {@link #doOnTick(int)} will be called during
    * world tick from server thread. Is generally only checked
    * at setup so result should not be dynamic.
    */
    public default boolean doesUpdateOnTick() { return false; }

    /**
    * See {@link #doesUpdateOnTick()}
    */
    public default void doOnTick() {}

    /**
    * If true, then {@link #doOffTick(int)} will be called once per server tick
    * from simulation thread pool. Is generally only checked
    * at setup so result should not be dynamic.
    */
    public default boolean doesUpdateOffTick() { return false; }

    /**
    * See {@link #doesUpdateOffTick()}
    */
    public default void doOffTick(ScatterGatherThreadPool pool) {}
    }

    /**
    * Thread pool optimized for scatter-gather processing patterns with an array, list or
    * other linearly-addressable data structure. Performance is equivalent to a Java fork-join
    * pool for large work loads and seems to have somewhat lower overhead and lower latency for small batches.<p>
    *
    * The main motivation is simplicity: it is much easier (for the author at least) to understand and debug
    * than a custom counted-completer fork join task. (Based on actual experience creating same.)
    * It's also easier to use and requires less code for its intended use cases. <p>
    *
    * The pool does not have a queue, and all calls to the various flavors of completeTask() are blocking.
    * This design is consistent with the scatter-gather patterns for which this pool is used - the intention
    * is to complete the entire task <em>now</em>, as quickly as possible, and then move on with another
    * task that may depend on those results.<p>
    *
    * Calls into the pool for execution are not thread-safe! (Again, no queue - it can only do one thing at a time.)
    * While usage could be externally synchronized, the intended usage pattern is to call into the pool
    * from a consumer thread that generates tasks dynamically and/or drain a queue of tasks into the pool.<p>
    *
    * Size of the pool is always the system parallelism level, less one, because the calling thread is
    * recruited to do some of the work.<p>
    *
    * Without a queue, there is no work stealing, however tasks are apportioned incrementally, with worker threads
    * claiming work only as they get scheduled. Generally it will not be worthwhile to use the pool
    * unless the submitted tasks have enough tasks to keep all threads occupied. Some execution methods include
    * concurrency thresholds that, if not met, will simply execute the entire task on the calling thread so that
    * special case logic isn't needed in the calling code.<p>
    *
    * A perfectly efficient pool would always have all threads finishing at the same time.
    * Even with dynamic work assignment, some thread will always finish some finite amount of time after
    * the other threads finish. This waste can be minimized by slicing the work into smaller batches but
    * this comes with the price of increased overhead because shared state must be updated with each new batch.
    * Some execution parameters can be used to tune the batch size for a particular work load.<p>
    *
    * Note this pool is <em>NOT</em> suitable as a generic thread pool for tasks that cannot be shared across
    * multiple cores and/or that are meant to be completed asynchronously. For that, the common ForkJoin pool,
    * a fixed thread pool, dedicated threads, will all be better.
    */
    public interface ScatterGatherThreadPool {
    /**
    * Applies the given operation to every in-range element of the array. If the number of elements to be
    * processed is less than the given concurrency threshold, the operations will happen on the calling thread.
    * In either case, will block until all elements are processed.<p>
    *
    * Use larger batch sizes (and larger thresholds) for fast operations on many elements. Use smaller values
    * for long-running elements.
    */
    <V> void completeTask(V[] inputs, int startIndex, int count, int concurrencyThreshold, Consumer<V> operation, int batchSize);

    <V> void completeTask(V[] inputs, int concurrencyThreshold, Consumer<V> operation, int batchSize);

    <V> void completeTask(V[] inputs, int concurrencyThreshold, Consumer<V> operation);

    <V> void completeTask(V[] inputs, Consumer<V> operation, int batchSize);

    <V> void completeTask(V[] inputs, Consumer<V> operation);

    <V> void completeTask(V[] inputs, int startIndex, int count, Consumer<V> operation, int batchSize);

    <V> void completeTask(V[] inputs, int startIndex, int count, Consumer<V> operation);

    <V> void completeTask(V[] inputs, int startIndex, int count, int concurrencyThreshold, Consumer<V> operation);

    /**
    * Like {@link #completeTask(Object[], int, int, int, Consumer, int)} but with a mapping consumer.
    */
    <T, V> void completeTask(final T[] inputs, final int startIndex, final int count, final int concurrencyThreshold, final ArrayMappingConsumer<T,V> operation, int batchSize);

    <T, V> void completeTask(T[] inputs, int concurrencyThreshold, final ArrayMappingConsumer<T,V> operation, int batchSize);

    <T, V> void completeTask(T[] inputs, int concurrencyThreshold, final ArrayMappingConsumer<T,V> operation);

    <T, V> void completeTask(T[] inputs, final ArrayMappingConsumer<T,V> operation, int batchSize);

    <T, V> void completeTask(T[] inputs, final ArrayMappingConsumer<T,V> operation);

    <T, V> void completeTask(T[] inputs, int startIndex, int count, final ArrayMappingConsumer<T,V>operation, int batchSize);

    <T, V> void completeTask(T[] inputs, int startIndex, int count, final ArrayMappingConsumer<T,V>operation);

    <T, V> void completeTask(T[] inputs, int startIndex, int count, int concurrencyThreshold, ArrayMappingConsumer<T,V>operation);

    /**
    * Process a specialized task. Will always attempt to use the pool because no information is
    * provided that would allow evaluation of fitness for concurrency. Blocks until all done.
    */
    void completeTask(SharableTask task);

    /**
    * Convenient when your work happens to be in a SimpleConcurrentList
    */
    <V> void completeTask(SimpleConcurrentList<V> list, Consumer<V> operation);

    <V> void completeTask(SimpleConcurrentList<V> list, int concurrencyThreshold, Consumer<V> operation);
    }


    /**
    * Similar to a Collector in a Java stream - accumulates results from the mapping function in each thread
    * and then dumps them into a collector after all batches are completed.<p>
    *
    * The right half of the BiConsumer (another consumer) provides access to the in-thread sink for map outputs.
    * It's not represented as a map function in order to support functions that might not be 1:1 maps.
    */
    public class ArrayMappingConsumer<T,V>
    {
    private final BiConsumer<T, Consumer<V>> operation;
    private final Consumer<AbstractUnorderedArrayList<V>> collector;

    protected final ThreadLocal<WorkerState> workerStates = new ThreadLocal<WorkerState>()
    {
    @Override
    protected ArrayMappingConsumer<T, V>.WorkerState initialValue()
    {
    return new WorkerState();
    }
    };

    /**
    * For custom collectors - the collector provided must accept a SimpleUnorderedArrayList and will be
    * called in each thread where work as done after all batches are complete. <p>
    *
    * The collector MUST be thread safe.
    */
    public ArrayMappingConsumer(BiConsumer<T, Consumer<V>> operation, Consumer<AbstractUnorderedArrayList<V>> collector)
    {
    this.operation = operation;
    this.collector = collector;
    }

    /**
    * The easy way - provide a simple concurrent list a the collector. Note that
    * this implementation does not clear the list between runs. If a consumer is reused, this
    * will need to be handled externally if necessary.
    */
    public ArrayMappingConsumer(BiConsumer<T, Consumer<V>> operation, SimpleConcurrentList<V> target)
    {
    this.operation = operation;
    this.collector = (r) -> {if(!r.isEmpty()) target.addAll(r);};
    }

    /**
    * Holds the per-thread results and provides access to the mapping function.
    */
    private class WorkerState extends AbstractUnorderedArrayList<V> implements Consumer<T>
    {
    @Override
    public final void accept(@SuppressWarnings("null") T t)
    {
    operation.accept(t, v -> this.add(v));
    }

    /**
    * Called in each thread after all batches (for that thread) are complete.
    */
    protected final void completeThread()
    {
    collector.accept(this);
    this.clear();
    }
    }

    /**
    * Gets the mapping function for this thread. Using the function will collect output
    * in the calling thread for later consolidation via {@link #completeThread()}
    */
    protected final Consumer<T> getWorkerConsumer()
    {
    return workerStates.get();
    }

    /**
    * Signals worker state to perform result consolidation for this thread.
    */
    protected final void completeThread()
    {
    workerStates.get().completeThread();
    }
    }