Last active
October 1, 2022 04:00
-
-
Save UnquietCode/5717942 to your computer and use it in GitHub Desktop.
Revisions
-
UnquietCode revised this gist
Jan 23, 2017 . 1 changed file with 0 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,3 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonWebServiceRequest; -
UnquietCode revised this gist
Oct 18, 2013 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -74,7 +74,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws if (info != null) { final String receiptHandle = UUID.randomUUID().toString(); Message message = new Message(); message.setBody(info.body); message.setMessageId(info.id); -
UnquietCode revised this gist
Jun 5, 2013 . 1 changed file with 7 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -73,19 +73,22 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws final MessageInfo info = queue.poll(); if (info != null) { final String receiptHandle = UUID.randomUUID().toString(); Message message = new Message(); message.setBody(info.body); message.setMessageId(info.id); message.setMD5OfBody(info.hash()); message.setReceiptHandle(receiptHandle); messages.add(message); Runnable command = new Runnable() { public void run() { queue.add(info); receivedMessages.remove(receiptHandle); } }; ScheduledMessage scheduled = new ScheduledMessage(); scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS); scheduled.runnable = command; @@ -106,7 +109,7 @@ public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceExce if (scheduled == null) { throw new RuntimeException("message does not exist"); } scheduled.future.cancel(true); } @@ -136,7 +139,7 @@ String hash() { return Hashing.md5().hashString(body).toString(); } } private static class ScheduledMessage { ScheduledFuture future; Runnable runnable; -
UnquietCode revised this gist
Jun 5, 2013 . 1 changed file with 22 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -26,7 +26,7 @@ public class MockSQS implements AmazonSQS { private final Map<String, Queue<MessageInfo>> queues = new HashMap<>(); private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); private int timeout = 35*60; private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>(); /* - adds a message to the correct queue @@ -80,13 +80,16 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws message.setReceiptHandle(UUID.randomUUID().toString()); messages.add(message); Runnable command = new Runnable() { public void run() { queue.add(info); } }; ScheduledMessage scheduled = new ScheduledMessage(); scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS); scheduled.runnable = command; receivedMessages.put(message.getReceiptHandle(), scheduled); } } @@ -99,32 +102,23 @@ public void run() { */ @Override public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException { ScheduledMessage scheduled = receivedMessages.remove(request.getReceiptHandle()); if (scheduled == null) { throw new RuntimeException("message does not exist"); } scheduled.future.cancel(true); } @Override public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException { ScheduledMessage scheduled = receivedMessages.get(request.getReceiptHandle()); if (scheduled == null) { throw new RuntimeException("message does not exist"); } scheduled.future.cancel(true); scheduled.future = executor.schedule(scheduled.runnable, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS); } @Override @@ -142,6 +136,11 @@ String hash() { return Hashing.md5().hashString(body).toString(); } } private static class ScheduledMessage { ScheduledFuture future; Runnable runnable; } private Queue<MessageInfo> getOrCreateQueue(String url) { Queue<MessageInfo> queue = queues.get(checkNotNull(url)); @@ -243,4 +242,4 @@ public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClient public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { throw new RuntimeException("not implemented"); } } -
UnquietCode created this gist
Jun 5, 2013 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,246 @@ package com.studyblue.test.aws; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.ResponseMetadata; import com.amazonaws.regions.Region; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.*; import com.google.common.hash.Hashing; import java.util.*; import java.util.concurrent.*; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** * Sitting in the airport, unable to connect to the internet, this seemed * like a good use of my time, at least as compared with sleeping. * * @author Ben Fagin * @version 2013-05-28 */ public class MockSQS implements AmazonSQS { private final Map<String, Queue<MessageInfo>> queues = new HashMap<>(); private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); private int timeout = 35*60; private final Map<String, ScheduledFuture> receivedMessages = new WeakHashMap<>(); /* - adds a message to the correct queue - delays if required */ @Override public SendMessageResult sendMessage(final SendMessageRequest request) throws AmazonServiceException, AmazonClientException { final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl()); final MessageInfo info = new MessageInfo(); info.body = checkNotNull(request.getMessageBody()); info.id = UUID.randomUUID().toString(); if (request.getDelaySeconds() == null) { queue.add(info); } else { Runnable task = new Runnable() { public void run() { queue.add(info); } }; executor.schedule(task, request.getDelaySeconds(), TimeUnit.SECONDS); } return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash()); } /* - takes messages off the queue - if timeout, then they are added back */ @Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws AmazonServiceException, AmazonClientException { final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl()); List<Message> messages = new ArrayList<>(); Integer max = request.getMaxNumberOfMessages(); if (max == null) { max = 0; } checkArgument(max <= 10 && max > 0); Integer visibilityTimeout = request.getVisibilityTimeout(); if (visibilityTimeout == null) { visibilityTimeout = timeout; } for (int i=0; i < max; ++i) { final MessageInfo info = queue.poll(); if (info != null) { Message message = new Message(); message.setBody(info.body); message.setMessageId(info.id); message.setMD5OfBody(info.hash()); message.setReceiptHandle(UUID.randomUUID().toString()); messages.add(message); ScheduledFuture future = executor.schedule(new Runnable() { public void run() { queue.add(info); } }, visibilityTimeout, TimeUnit.SECONDS); receivedMessages.put(message.getReceiptHandle(), future); } } return new ReceiveMessageResult().withMessages(messages); } /* - deletes the task which would have re-added a message to the queue, effectively deleting the message */ @Override public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException { ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle()); if (future == null) { throw new RuntimeException("message does not exist"); } future.cancel(true); } @Override public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException { final ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle()); if (future == null) { throw new RuntimeException("message does not exist"); } future.cancel(false); Callable task = new Callable() { public Object call() throws Exception { return future.get(); } }; // TODO closing over these repeatedly can create nasty leaking final ScheduledFuture newFuture = executor.schedule(task, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS); receivedMessages.put(request.getReceiptHandle(), newFuture); } @Override public void shutdown() { executor.shutdown(); receivedMessages.clear(); queues.clear(); } private static class MessageInfo { String body; String id; String hash() { return Hashing.md5().hashString(body).toString(); } } private Queue<MessageInfo> getOrCreateQueue(String url) { Queue<MessageInfo> queue = queues.get(checkNotNull(url)); if (queue == null) { synchronized (queues) { queue = queues.get(checkNotNull(url)); if (queue == null) { queue = new ArrayDeque<>(); queues.put(url, queue); } } } return queue; } /* - set the default timeout when receiving messages from the queue */ public void setTimeout(int timeout) { this.timeout = timeout; } //---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---// @Override public void setEndpoint(String endpoint) throws IllegalArgumentException { throw new RuntimeException("not implemented"); } @Override public void setRegion(Region region) throws IllegalArgumentException { throw new RuntimeException("not implemented"); } @Override public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public void removePermission(RemovePermissionRequest removePermissionRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public void deleteQueue(DeleteQueueRequest deleteQueueRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public void addPermission(AddPermissionRequest addPermissionRequest) throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException { throw new RuntimeException("not implemented"); } @Override public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { throw new RuntimeException("not implemented"); } }