Created
March 28, 2017 20:47
-
-
Save philmadden83/09b2df23d94e5fbbc2c5fb0f04130774 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.vivid.mq.sqs; | |
import com.amazonaws.services.sqs.model.Message; | |
import com.vivid.micro.GenericsUtil; | |
import com.vivid.micro.GsonMarshaller; | |
import com.vivid.mq.MessageHandler; | |
import com.vivid.mq.MessageQueueConsumer; | |
import com.vivid.mq.exception.HandlerException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.lang.reflect.ParameterizedType; | |
import java.lang.reflect.Type; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
public class SqsMessageConsumer<T extends SqsMessageWrapper> implements MessageQueueConsumer { | |
private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageConsumer.class); | |
private final ScheduledExecutorService executorService; | |
private final GsonMarshaller<T> marshaller; | |
private final MessageHandler<T> messageHandler; | |
private final SqsMessageQueue messageQueue; | |
private final ThreadPoolExecutor threadPoolExecutor; | |
private int availableSlots; | |
public SqsMessageConsumer(SqsMessageQueue messageQueue, Class payloadType, MessageHandler<T> messageHandler, | |
ThreadPoolExecutor threadPoolExecutor, ScheduledExecutorService executorService) { | |
this.messageQueue = messageQueue; | |
this.messageHandler = messageHandler; | |
this.threadPoolExecutor = threadPoolExecutor; | |
this.executorService = executorService; | |
this.marshaller = new GsonMarshaller(GenericsUtil.getType(SqsMessageWrapper.class, payloadType)); | |
} | |
@Override | |
public void start(int initialDelay, int period) { | |
executorService.scheduleAtFixedRate(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
consume(); | |
} catch (Throwable e) { | |
LOGGER.error(e.getMessage(), e); | |
} | |
} | |
}, initialDelay, period, TimeUnit.SECONDS); | |
} | |
@Override | |
public void stop() { | |
executorService.shutdown(); | |
messageQueue.close(); | |
} | |
private void consume() { | |
availableSlots = threadPoolExecutor.getCorePoolSize() - threadPoolExecutor.getActiveCount(); | |
if (availableSlots > 0) { | |
List<Message> messages = messageQueue.getMessages(availableSlots); | |
for (Message message : messages) { | |
dispatch(message); | |
} | |
} | |
} | |
private void dispatch(final Message message) { | |
threadPoolExecutor.submit(new Callable<Void>() { | |
@Override | |
public Void call() throws Exception { | |
try { | |
T obj = marshaller.marshall(message.getBody()); | |
if (messageQueue.getConfig().getMessageConfiguration().isAcceptedVersion(obj.getVersion())) { | |
try { | |
obj.setSqsMessage(message); | |
messageHandler.onMessage(obj); | |
delete(obj); | |
} catch (Throwable t) { | |
throw new HandlerException(obj, t); | |
} | |
} else { | |
LOGGER.info(String.format("Message version \"%s\" is not accepted by this consumer. Accepted versions are: ", | |
obj.getVersion(), | |
Arrays.deepToString(messageQueue.getConfig().getMessageConfiguration().getAcceptedVersions().toArray()))); | |
} | |
} catch (HandlerException e) { | |
messageHandler.onException((T) e.getMessageWrapper(), e); | |
} catch (Throwable e) { | |
messageHandler.onException((T) SqsMessageHelper.wrapSqsMessage(message), e); | |
} | |
return null; | |
} | |
}); | |
} | |
public void delete(final T obj) throws Throwable { | |
beforeDelete(obj); | |
messageQueue.deleteMessage(obj.getSqsMessage()); | |
afterDelete(obj); | |
} | |
public void beforeDelete(SqsMessageWrapper<T> wrapper) throws Throwable { | |
//To be overridden if required | |
} | |
public void afterDelete(SqsMessageWrapper<T> wrapper) throws Throwable { | |
//To be overridden if required | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment