|
|
@@ -0,0 +1,246 @@ |
|
|
package com.studyblue.test.aws; |
|
|
|
|
|
import com.amazonaws.AmazonClientException; |
|
|
import com.amazonaws.AmazonServiceException; |
|
|
import com.amazonaws.AmazonWebServiceRequest; |
|
|
import com.amazonaws.ResponseMetadata; |
|
|
import com.amazonaws.regions.Region; |
|
|
import com.amazonaws.services.sqs.AmazonSQS; |
|
|
import com.amazonaws.services.sqs.model.*; |
|
|
import com.google.common.hash.Hashing; |
|
|
|
|
|
import java.util.*; |
|
|
import java.util.concurrent.*; |
|
|
|
|
|
import static com.google.common.base.Preconditions.checkArgument; |
|
|
import static com.google.common.base.Preconditions.checkNotNull; |
|
|
|
|
|
/** |
|
|
* Sitting in the airport, unable to connect to the internet, this seemed |
|
|
* like a good use of my time, at least as compared with sleeping. |
|
|
* |
|
|
* @author Ben Fagin |
|
|
* @version 2013-05-28 |
|
|
*/ |
|
|
public class MockSQS implements AmazonSQS { |
|
|
private final Map<String, Queue<MessageInfo>> queues = new HashMap<>(); |
|
|
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); |
|
|
private int timeout = 35*60; |
|
|
private final Map<String, ScheduledFuture> receivedMessages = new WeakHashMap<>(); |
|
|
|
|
|
/* |
|
|
- adds a message to the correct queue |
|
|
- delays if required |
|
|
*/ |
|
|
@Override |
|
|
public SendMessageResult sendMessage(final SendMessageRequest request) throws AmazonServiceException, AmazonClientException { |
|
|
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl()); |
|
|
final MessageInfo info = new MessageInfo(); |
|
|
info.body = checkNotNull(request.getMessageBody()); |
|
|
info.id = UUID.randomUUID().toString(); |
|
|
|
|
|
if (request.getDelaySeconds() == null) { |
|
|
queue.add(info); |
|
|
} else { |
|
|
Runnable task = new Runnable() { |
|
|
public void run() { |
|
|
queue.add(info); |
|
|
} |
|
|
}; |
|
|
executor.schedule(task, request.getDelaySeconds(), TimeUnit.SECONDS); |
|
|
} |
|
|
|
|
|
return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash()); |
|
|
} |
|
|
|
|
|
/* |
|
|
- takes messages off the queue |
|
|
- if timeout, then they are added back |
|
|
*/ |
|
|
@Override |
|
|
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws AmazonServiceException, AmazonClientException { |
|
|
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl()); |
|
|
List<Message> messages = new ArrayList<>(); |
|
|
|
|
|
Integer max = request.getMaxNumberOfMessages(); |
|
|
if (max == null) { max = 0; } |
|
|
checkArgument(max <= 10 && max > 0); |
|
|
|
|
|
Integer visibilityTimeout = request.getVisibilityTimeout(); |
|
|
if (visibilityTimeout == null) { visibilityTimeout = timeout; } |
|
|
|
|
|
for (int i=0; i < max; ++i) { |
|
|
final MessageInfo info = queue.poll(); |
|
|
|
|
|
if (info != null) { |
|
|
Message message = new Message(); |
|
|
message.setBody(info.body); |
|
|
message.setMessageId(info.id); |
|
|
message.setMD5OfBody(info.hash()); |
|
|
message.setReceiptHandle(UUID.randomUUID().toString()); |
|
|
messages.add(message); |
|
|
|
|
|
ScheduledFuture future = executor.schedule(new Runnable() { |
|
|
public void run() { |
|
|
queue.add(info); |
|
|
} |
|
|
}, visibilityTimeout, TimeUnit.SECONDS); |
|
|
|
|
|
receivedMessages.put(message.getReceiptHandle(), future); |
|
|
} |
|
|
} |
|
|
|
|
|
return new ReceiveMessageResult().withMessages(messages); |
|
|
} |
|
|
|
|
|
/* |
|
|
- deletes the task which would have re-added a message to the queue, |
|
|
effectively deleting the message |
|
|
*/ |
|
|
@Override |
|
|
public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException { |
|
|
ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle()); |
|
|
if (future == null) { |
|
|
throw new RuntimeException("message does not exist"); |
|
|
} |
|
|
future.cancel(true); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException { |
|
|
final ScheduledFuture future = receivedMessages.remove(request.getReceiptHandle()); |
|
|
if (future == null) { |
|
|
throw new RuntimeException("message does not exist"); |
|
|
} |
|
|
|
|
|
future.cancel(false); |
|
|
|
|
|
Callable task = new Callable() { |
|
|
public Object call() throws Exception { |
|
|
return future.get(); |
|
|
} |
|
|
}; |
|
|
|
|
|
// TODO closing over these repeatedly can create nasty leaking |
|
|
|
|
|
final ScheduledFuture newFuture = executor.schedule(task, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS); |
|
|
receivedMessages.put(request.getReceiptHandle(), newFuture); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void shutdown() { |
|
|
executor.shutdown(); |
|
|
receivedMessages.clear(); |
|
|
queues.clear(); |
|
|
} |
|
|
|
|
|
private static class MessageInfo { |
|
|
String body; |
|
|
String id; |
|
|
|
|
|
String hash() { |
|
|
return Hashing.md5().hashString(body).toString(); |
|
|
} |
|
|
} |
|
|
|
|
|
private Queue<MessageInfo> getOrCreateQueue(String url) { |
|
|
Queue<MessageInfo> queue = queues.get(checkNotNull(url)); |
|
|
|
|
|
if (queue == null) { |
|
|
synchronized (queues) { |
|
|
queue = queues.get(checkNotNull(url)); |
|
|
|
|
|
if (queue == null) { |
|
|
queue = new ArrayDeque<>(); |
|
|
queues.put(url, queue); |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return queue; |
|
|
} |
|
|
|
|
|
/* |
|
|
- set the default timeout when receiving messages from the queue |
|
|
*/ |
|
|
public void setTimeout(int timeout) { |
|
|
this.timeout = timeout; |
|
|
} |
|
|
|
|
|
//---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---// |
|
|
|
|
|
@Override |
|
|
public void setEndpoint(String endpoint) throws IllegalArgumentException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void setRegion(Region region) throws IllegalArgumentException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void removePermission(RemovePermissionRequest removePermissionRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void deleteQueue(DeleteQueueRequest deleteQueueRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public void addPermission(AddPermissionRequest addPermissionRequest) throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
|
|
|
@Override |
|
|
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { |
|
|
throw new RuntimeException("not implemented"); |
|
|
} |
|
|
} |