Created
June 15, 2017 12:28
-
-
Save hogmoru/a8c68747d2ca7b720db93fb2472f4c59 to your computer and use it in GitHub Desktop.
Revisions
-
hogmoru created this gist
Jun 15, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,159 @@ import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.LinkedBlockingQueue; import static java.util.concurrent.TimeUnit.SECONDS; public class MessageProcessor { private static final long CLEANUP_PERIOD_S = 10; private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>(); private final ExecutorService executorService; public MessageProcessor(int nbThreads) { executorService = Executors.newFixedThreadPool(nbThreads); ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1); cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS); } public void addMessageToProcess(Message message) { ConvoQueue queue = getQueue(message.getConversationId()); queue.addMessage(message); } private ConvoQueue getQueue(Long convoId) { synchronized (queuesByConvo) { return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService)); } } private void removeEmptyQueues() { synchronized (queuesByConvo) { queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty()); } } } class ConvoQueue { private Queue<MessageTask> queue; private MessageTask activeTask; private ExecutorService executorService; ConvoQueue(ExecutorService executorService) { this.executorService = executorService; this.queue = new LinkedBlockingQueue<>(); } private void runNextIfPossible() { synchronized(this) { if (activeTask == null) { activeTask = queue.poll(); if (activeTask != null) { executorService.submit(activeTask); } } } } void complete(MessageTask task) { synchronized(this) { if (task == activeTask) { activeTask = null; runNextIfPossible(); } else { throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task); } } } boolean isEmpty() { return queue.isEmpty(); } void addMessage(Message message) { add(new MessageTask(this, message)); } private void add(MessageTask task) { synchronized(this) { queue.add(task); runNextIfPossible(); } } } public class MessageTask implements Runnable { private ConvoQueue convoQueue; private Message message; MessageTask(ConvoQueue convoQueue, Message message) { this.convoQueue = convoQueue; this.message = message; } @Override public void run() { try { processMessage(); } finally { convoQueue.complete(this); } } private void processMessage() { // Dummy processing with random delay to observe reordered messages & preserved convo order try { Thread.sleep((long) (50*Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); } } class Message { private long id; private long conversationId; private String data; Message(long id, long conversationId, String someData) { this.id = id; this.conversationId = conversationId; this.data = someData; } long getConversationId() { return conversationId; } String getData() { return data; } public String toString() { return "Message{" + id + "," + conversationId + "," + data + "}"; } } public class MessageProcessorTest { public static void main(String[] args) { MessageProcessor test = new MessageProcessor(2); for (int i=1; i<100; i++) { test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i)); } // Kill after 4 seconds for online test try {Thread.sleep(4000);} catch(Exception e) {} System.exit(0); } }