package com.anjlab.spelling.web.services.workers; import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.inject.Inject; import org.apache.tapestry5.ioc.Messages; import org.apache.tapestry5.ioc.ReloadAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.anjlab.spelling.web.entities.Task; import com.anjlab.spelling.web.services.atmosphere.WebSocketProgressMonitor; import com.anjlab.spelling.web.services.managers.TaskManager; public abstract class AbstractWorker implements Worker, ReloadAware { @Inject private volatile Messages messages; private volatile WorkerStatus status = WorkerStatus.IDLE; private volatile boolean paused; protected volatile boolean taskCanceled; protected volatile Task currentTask; @Inject protected volatile TaskManager taskManager; protected volatile WebSocketProgressMonitor progressMonitor; private final BlockingQueue queue = new ArrayBlockingQueue(1); private WorkerRoutine routine; public void setRoutine(WorkerRoutine routine) { this.routine = routine; } public boolean shutdownImplementationForReload() { // Support live class reloading during development cycles if (owningThread != null) { routine.workerReloading(); owningThread.interrupt(); } return true; } private volatile Thread owningThread; private static final Logger logger = LoggerFactory.getLogger(AbstractWorker.class); public WorkerStatus getStatus() { return status; } public WebSocketProgressMonitor getProgressMonitor() { return progressMonitor; } private volatile boolean shuttingDown = false; public void shutdown() { shuttingDown = true; if (owningThread != null) { owningThread.interrupt(); } } public boolean isShuttingDown() { return shuttingDown; } public void checkForNewTask() { try { queue.offer(new Object(), 100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.error("Interrupted exception", e); } } public boolean togglePaused() { paused = !paused; return paused; } public boolean cancelTask(int taskId) { Task task = currentTask; boolean cancelingCurrentTask = task != null && task.getId() == taskId; if (cancelingCurrentTask && this.taskCanceled) { internalCancelTask(); return true; } if (!cancelingCurrentTask) { return false; } internalCancelTask(); return true; } private void internalCancelTask() { taskCanceled = true; if (owningThread != null) { owningThread.interrupt(); } } public boolean isTaskCancelled() { return taskCanceled; } public Task currentTask() { return currentTask; } public void cleanup() { // Clear interrupted state Thread.interrupted(); taskCanceled = false; progressMonitor = null; currentTask = null; if (owningThread == null) { owningThread = Thread.currentThread(); owningThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { logger.error("Uncaugth exception in worker", e); } }); } } public void run() throws InterruptedException, RuntimeException { while (paused || (!taskCanceled && (currentTask = findTask()) == null)) { status = paused ? WorkerStatus.PAUSED : WorkerStatus.IDLE; // Ignore result. Queue only used for synchronization purposes queue.take(); } if (taskCanceled) { return; } status = WorkerStatus.RUNNING; try { progressMonitor = new WebSocketProgressMonitor(currentTask, messages, taskManager); progressMonitor.pending(); runTask(); taskManager.deleteTask(currentTask.getId()); } catch (Throwable e) { if (e instanceof InterruptedException) { if (currentTask != null) { taskManager.deleteTask(currentTask.getId()); } throw (InterruptedException) e; } if (currentTask != null) { taskManager.markAsError(currentTask.getId(), e); } throw new RuntimeException(e); } finally { if (progressMonitor != null) { progressMonitor.destroy(); } } } protected abstract void runTask() throws InterruptedException; private Task findTask() { // OptimisticLockException may be thrown on commit? return taskManager.findTaskForWorker(getTaskType()); } public void onExit() { this.status = WorkerStatus.DOWN; } }