Skip to content

Instantly share code, notes, and snippets.

@rterbush
Forked from UnquietCode/MockSQS.java
Created October 11, 2016 14:47

Revisions

  1. @UnquietCode UnquietCode revised this gist Oct 18, 2013. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion MockSQS.java
    Original file line number Diff line number Diff line change
    @@ -74,7 +74,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws

    if (info != null) {
    final String receiptHandle = UUID.randomUUID().toString();

    Message message = new Message();
    message.setBody(info.body);
    message.setMessageId(info.id);
  2. @UnquietCode UnquietCode revised this gist Jun 5, 2013. 1 changed file with 7 additions and 4 deletions.
    11 changes: 7 additions & 4 deletions MockSQS.java
    Original file line number Diff line number Diff line change
    @@ -73,19 +73,22 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws
    final MessageInfo info = queue.poll();

    if (info != null) {
    final String receiptHandle = UUID.randomUUID().toString();

    Message message = new Message();
    message.setBody(info.body);
    message.setMessageId(info.id);
    message.setMD5OfBody(info.hash());
    message.setReceiptHandle(UUID.randomUUID().toString());
    message.setReceiptHandle(receiptHandle);
    messages.add(message);

    Runnable command = new Runnable() {
    public void run() {
    queue.add(info);
    receivedMessages.remove(receiptHandle);
    }
    };

    ScheduledMessage scheduled = new ScheduledMessage();
    scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS);
    scheduled.runnable = command;
    @@ -106,7 +109,7 @@ public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceExce
    if (scheduled == null) {
    throw new RuntimeException("message does not exist");
    }

    scheduled.future.cancel(true);
    }

    @@ -136,7 +139,7 @@ String hash() {
    return Hashing.md5().hashString(body).toString();
    }
    }

    private static class ScheduledMessage {
    ScheduledFuture future;
    Runnable runnable;
  3. @UnquietCode UnquietCode revised this gist Jun 5, 2013. 1 changed file with 22 additions and 23 deletions.
    45 changes: 22 additions & 23 deletions MockSQS.java
    Original file line number Diff line number Diff line change
    @@ -26,7 +26,7 @@ public class MockSQS implements AmazonSQS {
    private final Map<String, Queue<MessageInfo>> queues = new HashMap<>();
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    private int timeout = 35*60;
    private final Map<String, ScheduledFuture> receivedMessages = new WeakHashMap<>();
    private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>();

    /*
    - adds a message to the correct queue
    @@ -80,13 +80,16 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws
    message.setReceiptHandle(UUID.randomUUID().toString());
    messages.add(message);

    ScheduledFuture future = executor.schedule(new Runnable() {
    Runnable command = new Runnable() {
    public void run() {
    queue.add(info);
    }
    }, visibilityTimeout, TimeUnit.SECONDS);

    receivedMessages.put(message.getReceiptHandle(), future);
    };

    ScheduledMessage scheduled = new ScheduledMessage();
    scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS);
    scheduled.runnable = command;
    receivedMessages.put(message.getReceiptHandle(), scheduled);
    }
    }

    @@ -99,32 +102,23 @@ public void run() {
    */
    @Override
    public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException {
    ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle());
    if (future == null) {
    ScheduledMessage scheduled = receivedMessages.remove(request.getReceiptHandle());
    if (scheduled == null) {
    throw new RuntimeException("message does not exist");
    }
    future.cancel(true);

    scheduled.future.cancel(true);
    }

    @Override
    public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException {
    final ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle());
    if (future == null) {
    ScheduledMessage scheduled = receivedMessages.get(request.getReceiptHandle());
    if (scheduled == null) {
    throw new RuntimeException("message does not exist");
    }

    future.cancel(false);

    Callable task = new Callable() {
    public Object call() throws Exception {
    return future.get();
    }
    };

    // TODO closing over these repeatedly can create nasty leaking

    final ScheduledFuture newFuture = executor.schedule(task, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS);
    receivedMessages.put(request.getReceiptHandle(), newFuture);
    scheduled.future.cancel(true);
    scheduled.future = executor.schedule(scheduled.runnable, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS);
    }

    @Override
    @@ -142,6 +136,11 @@ String hash() {
    return Hashing.md5().hashString(body).toString();
    }
    }

    private static class ScheduledMessage {
    ScheduledFuture future;
    Runnable runnable;
    }

    private Queue<MessageInfo> getOrCreateQueue(String url) {
    Queue<MessageInfo> queue = queues.get(checkNotNull(url));
    @@ -243,4 +242,4 @@ public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClient
    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
    throw new RuntimeException("not implemented");
    }
    }
    }
  4. @UnquietCode UnquietCode created this gist Jun 5, 2013.
    246 changes: 246 additions & 0 deletions MockSQS.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,246 @@
    package com.studyblue.test.aws;

    import com.amazonaws.AmazonClientException;
    import com.amazonaws.AmazonServiceException;
    import com.amazonaws.AmazonWebServiceRequest;
    import com.amazonaws.ResponseMetadata;
    import com.amazonaws.regions.Region;
    import com.amazonaws.services.sqs.AmazonSQS;
    import com.amazonaws.services.sqs.model.*;
    import com.google.common.hash.Hashing;

    import java.util.*;
    import java.util.concurrent.*;

    import static com.google.common.base.Preconditions.checkArgument;
    import static com.google.common.base.Preconditions.checkNotNull;

    /**
    * Sitting in the airport, unable to connect to the internet, this seemed
    * like a good use of my time, at least as compared with sleeping.
    *
    * @author Ben Fagin
    * @version 2013-05-28
    */
    public class MockSQS implements AmazonSQS {
    private final Map<String, Queue<MessageInfo>> queues = new HashMap<>();
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    private int timeout = 35*60;
    private final Map<String, ScheduledFuture> receivedMessages = new WeakHashMap<>();

    /*
    - adds a message to the correct queue
    - delays if required
    */
    @Override
    public SendMessageResult sendMessage(final SendMessageRequest request) throws AmazonServiceException, AmazonClientException {
    final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl());
    final MessageInfo info = new MessageInfo();
    info.body = checkNotNull(request.getMessageBody());
    info.id = UUID.randomUUID().toString();

    if (request.getDelaySeconds() == null) {
    queue.add(info);
    } else {
    Runnable task = new Runnable() {
    public void run() {
    queue.add(info);
    }
    };
    executor.schedule(task, request.getDelaySeconds(), TimeUnit.SECONDS);
    }

    return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash());
    }

    /*
    - takes messages off the queue
    - if timeout, then they are added back
    */
    @Override
    public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws AmazonServiceException, AmazonClientException {
    final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl());
    List<Message> messages = new ArrayList<>();

    Integer max = request.getMaxNumberOfMessages();
    if (max == null) { max = 0; }
    checkArgument(max <= 10 && max > 0);

    Integer visibilityTimeout = request.getVisibilityTimeout();
    if (visibilityTimeout == null) { visibilityTimeout = timeout; }

    for (int i=0; i < max; ++i) {
    final MessageInfo info = queue.poll();

    if (info != null) {
    Message message = new Message();
    message.setBody(info.body);
    message.setMessageId(info.id);
    message.setMD5OfBody(info.hash());
    message.setReceiptHandle(UUID.randomUUID().toString());
    messages.add(message);

    ScheduledFuture future = executor.schedule(new Runnable() {
    public void run() {
    queue.add(info);
    }
    }, visibilityTimeout, TimeUnit.SECONDS);

    receivedMessages.put(message.getReceiptHandle(), future);
    }
    }

    return new ReceiveMessageResult().withMessages(messages);
    }

    /*
    - deletes the task which would have re-added a message to the queue,
    effectively deleting the message
    */
    @Override
    public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException {
    ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle());
    if (future == null) {
    throw new RuntimeException("message does not exist");
    }
    future.cancel(true);
    }

    @Override
    public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException {
    final ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle());
    if (future == null) {
    throw new RuntimeException("message does not exist");
    }

    future.cancel(false);

    Callable task = new Callable() {
    public Object call() throws Exception {
    return future.get();
    }
    };

    // TODO closing over these repeatedly can create nasty leaking

    final ScheduledFuture newFuture = executor.schedule(task, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS);
    receivedMessages.put(request.getReceiptHandle(), newFuture);
    }

    @Override
    public void shutdown() {
    executor.shutdown();
    receivedMessages.clear();
    queues.clear();
    }

    private static class MessageInfo {
    String body;
    String id;

    String hash() {
    return Hashing.md5().hashString(body).toString();
    }
    }

    private Queue<MessageInfo> getOrCreateQueue(String url) {
    Queue<MessageInfo> queue = queues.get(checkNotNull(url));

    if (queue == null) {
    synchronized (queues) {
    queue = queues.get(checkNotNull(url));

    if (queue == null) {
    queue = new ArrayDeque<>();
    queues.put(url, queue);
    }
    }
    }

    return queue;
    }

    /*
    - set the default timeout when receiving messages from the queue
    */
    public void setTimeout(int timeout) {
    this.timeout = timeout;
    }

    //---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---//

    @Override
    public void setEndpoint(String endpoint) throws IllegalArgumentException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public void setRegion(Region region) throws IllegalArgumentException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public void removePermission(RemovePermissionRequest removePermissionRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public void deleteQueue(DeleteQueueRequest deleteQueueRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public void addPermission(AddPermissionRequest addPermissionRequest) throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
    throw new RuntimeException("not implemented");
    }

    @Override
    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
    throw new RuntimeException("not implemented");
    }
    }