import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.LinkedBlockingQueue; import static java.util.concurrent.TimeUnit.SECONDS; public class MessageProcessor { private static final long CLEANUP_PERIOD_S = 10; private final Map queuesByConvo = new HashMap<>(); private final ExecutorService executorService; public MessageProcessor(int nbThreads) { executorService = Executors.newFixedThreadPool(nbThreads); ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1); cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS); } public void addMessageToProcess(Message message) { ConvoQueue queue = getQueue(message.getConversationId()); queue.addMessage(message); } private ConvoQueue getQueue(Long convoId) { synchronized (queuesByConvo) { return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService)); } } private void removeEmptyQueues() { synchronized (queuesByConvo) { queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty()); } } } class ConvoQueue { private Queue queue; private MessageTask activeTask; private ExecutorService executorService; ConvoQueue(ExecutorService executorService) { this.executorService = executorService; this.queue = new LinkedBlockingQueue<>(); } private void runNextIfPossible() { synchronized(this) { if (activeTask == null) { activeTask = queue.poll(); if (activeTask != null) { executorService.submit(activeTask); } } } } void complete(MessageTask task) { synchronized(this) { if (task == activeTask) { activeTask = null; runNextIfPossible(); } else { throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task); } } } boolean isEmpty() { return queue.isEmpty(); } void addMessage(Message message) { add(new MessageTask(this, message)); } private void add(MessageTask task) { synchronized(this) { queue.add(task); runNextIfPossible(); } } } public class MessageTask implements Runnable { private ConvoQueue convoQueue; private Message message; MessageTask(ConvoQueue convoQueue, Message message) { this.convoQueue = convoQueue; this.message = message; } @Override public void run() { try { processMessage(); } finally { convoQueue.complete(this); } } private void processMessage() { // Dummy processing with random delay to observe reordered messages & preserved convo order try { Thread.sleep((long) (50*Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); } } class Message { private long id; private long conversationId; private String data; Message(long id, long conversationId, String someData) { this.id = id; this.conversationId = conversationId; this.data = someData; } long getConversationId() { return conversationId; } String getData() { return data; } public String toString() { return "Message{" + id + "," + conversationId + "," + data + "}"; } } public class MessageProcessorTest { public static void main(String[] args) { MessageProcessor test = new MessageProcessor(2); for (int i=1; i<100; i++) { test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i)); } // Kill after 4 seconds for online test try {Thread.sleep(4000);} catch(Exception e) {} System.exit(0); } }