Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save philmadden83/09b2df23d94e5fbbc2c5fb0f04130774 to your computer and use it in GitHub Desktop.
Save philmadden83/09b2df23d94e5fbbc2c5fb0f04130774 to your computer and use it in GitHub Desktop.
package com.vivid.mq.sqs;
import com.amazonaws.services.sqs.model.Message;
import com.vivid.micro.GenericsUtil;
import com.vivid.micro.GsonMarshaller;
import com.vivid.mq.MessageHandler;
import com.vivid.mq.MessageQueueConsumer;
import com.vivid.mq.exception.HandlerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SqsMessageConsumer<T extends SqsMessageWrapper> implements MessageQueueConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageConsumer.class);
private final ScheduledExecutorService executorService;
private final GsonMarshaller<T> marshaller;
private final MessageHandler<T> messageHandler;
private final SqsMessageQueue messageQueue;
private final ThreadPoolExecutor threadPoolExecutor;
private int availableSlots;
public SqsMessageConsumer(SqsMessageQueue messageQueue, Class payloadType, MessageHandler<T> messageHandler,
ThreadPoolExecutor threadPoolExecutor, ScheduledExecutorService executorService) {
this.messageQueue = messageQueue;
this.messageHandler = messageHandler;
this.threadPoolExecutor = threadPoolExecutor;
this.executorService = executorService;
this.marshaller = new GsonMarshaller(GenericsUtil.getType(SqsMessageWrapper.class, payloadType));
}
@Override
public void start(int initialDelay, int period) {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
consume();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
}, initialDelay, period, TimeUnit.SECONDS);
}
@Override
public void stop() {
executorService.shutdown();
messageQueue.close();
}
private void consume() {
availableSlots = threadPoolExecutor.getCorePoolSize() - threadPoolExecutor.getActiveCount();
if (availableSlots > 0) {
List<Message> messages = messageQueue.getMessages(availableSlots);
for (Message message : messages) {
dispatch(message);
}
}
}
private void dispatch(final Message message) {
threadPoolExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
T obj = marshaller.marshall(message.getBody());
if (messageQueue.getConfig().getMessageConfiguration().isAcceptedVersion(obj.getVersion())) {
try {
obj.setSqsMessage(message);
messageHandler.onMessage(obj);
delete(obj);
} catch (Throwable t) {
throw new HandlerException(obj, t);
}
} else {
LOGGER.info(String.format("Message version \"%s\" is not accepted by this consumer. Accepted versions are: ",
obj.getVersion(),
Arrays.deepToString(messageQueue.getConfig().getMessageConfiguration().getAcceptedVersions().toArray())));
}
} catch (HandlerException e) {
messageHandler.onException((T) e.getMessageWrapper(), e);
} catch (Throwable e) {
messageHandler.onException((T) SqsMessageHelper.wrapSqsMessage(message), e);
}
return null;
}
});
}
public void delete(final T obj) throws Throwable {
beforeDelete(obj);
messageQueue.deleteMessage(obj.getSqsMessage());
afterDelete(obj);
}
public void beforeDelete(SqsMessageWrapper<T> wrapper) throws Throwable {
//To be overridden if required
}
public void afterDelete(SqsMessageWrapper<T> wrapper) throws Throwable {
//To be overridden if required
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment