Skip to content

Instantly share code, notes, and snippets.

@hogmoru
Created June 15, 2017 12:28
Show Gist options
  • Select an option

  • Save hogmoru/a8c68747d2ca7b720db93fb2472f4c59 to your computer and use it in GitHub Desktop.

Select an option

Save hogmoru/a8c68747d2ca7b720db93fb2472f4c59 to your computer and use it in GitHub Desktop.

Revisions

  1. hogmoru created this gist Jun 15, 2017.
    159 changes: 159 additions & 0 deletions QueuePartitionerTest
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,159 @@
    import java.util.*;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;

    import static java.util.concurrent.TimeUnit.SECONDS;

    public class MessageProcessor {

    private static final long CLEANUP_PERIOD_S = 10;
    private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
    private final ExecutorService executorService;

    public MessageProcessor(int nbThreads) {
    executorService = Executors.newFixedThreadPool(nbThreads);
    ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
    cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
    }

    public void addMessageToProcess(Message message) {
    ConvoQueue queue = getQueue(message.getConversationId());
    queue.addMessage(message);
    }

    private ConvoQueue getQueue(Long convoId) {
    synchronized (queuesByConvo) {
    return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
    }
    }

    private void removeEmptyQueues() {
    synchronized (queuesByConvo) {
    queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
    }
    }

    }


    class ConvoQueue {

    private Queue<MessageTask> queue;
    private MessageTask activeTask;
    private ExecutorService executorService;

    ConvoQueue(ExecutorService executorService) {
    this.executorService = executorService;
    this.queue = new LinkedBlockingQueue<>();
    }

    private void runNextIfPossible() {
    synchronized(this) {
    if (activeTask == null) {
    activeTask = queue.poll();
    if (activeTask != null) {
    executorService.submit(activeTask);
    }
    }
    }
    }

    void complete(MessageTask task) {
    synchronized(this) {
    if (task == activeTask) {
    activeTask = null;
    runNextIfPossible();
    }
    else {
    throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
    }
    }
    }

    boolean isEmpty() {
    return queue.isEmpty();
    }

    void addMessage(Message message) {
    add(new MessageTask(this, message));
    }

    private void add(MessageTask task) {
    synchronized(this) {
    queue.add(task);
    runNextIfPossible();
    }
    }

    }

    public class MessageTask implements Runnable {

    private ConvoQueue convoQueue;
    private Message message;

    MessageTask(ConvoQueue convoQueue, Message message) {
    this.convoQueue = convoQueue;
    this.message = message;
    }

    @Override
    public void run() {
    try {
    processMessage();
    }
    finally {
    convoQueue.complete(this);
    }
    }

    private void processMessage() {
    // Dummy processing with random delay to observe reordered messages & preserved convo order
    try {
    Thread.sleep((long) (50*Math.random()));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(message);
    }

    }

    class Message {

    private long id;
    private long conversationId;
    private String data;

    Message(long id, long conversationId, String someData) {
    this.id = id;
    this.conversationId = conversationId;
    this.data = someData;
    }

    long getConversationId() {
    return conversationId;
    }

    String getData() {
    return data;
    }

    public String toString() {
    return "Message{" + id + "," + conversationId + "," + data + "}";
    }
    }

    public class MessageProcessorTest {
    public static void main(String[] args) {
    MessageProcessor test = new MessageProcessor(2);
    for (int i=1; i<100; i++) {
    test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
    }
    // Kill after 4 seconds for online test
    try {Thread.sleep(4000);} catch(Exception e) {}
    System.exit(0);
    }
    }