Skip to content

Instantly share code, notes, and snippets.

@MdGolam-Kibria
Created August 22, 2024 08:55
Show Gist options
  • Save MdGolam-Kibria/dcfc8e8a8e797f98680e28f016d7ba87 to your computer and use it in GitHub Desktop.
Save MdGolam-Kibria/dcfc8e8a8e797f98680e28f016d7ba87 to your computer and use it in GitHub Desktop.
Push data to Rabbit MQ queue using java with publishing guarantee
public void produceBatchesDataToQueue(List<String> tranIDs, Integer eventId, String actionPoint) throws IOException, TimeoutException {
for (int i = 0; i < tranIDs.size(); i++) {
LOGGER_I.info("Going to bind actual request data");
JBCollectionPolicyModelRequest request = buildAarongApiCallbackModel(tranIDs.get(i), actionPoint);
request.setEventId(eventId);
String message = gson.toJson(request);
LOGGER_I.info("Successfully bind data and prepare message :{}", message);
try {
LOGGER_I.info("Going to send rejected data to queue :{}",statusQueue);
//Now send data to queue
Channel publishChannel = ConnectionManager.getConnection(port, username, password, host).createChannel();
publishChannel.queueBind(statusQueue, exchangeName, batchRouting);
publishChannel.confirmSelect();
//crete messageId
long messageId = System.nanoTime();
// Create a CountDownLatch with a count of 1 to wait for acknowledgments
CountDownLatch latch = new CountDownLatch(1);
// Add a ConfirmListener for the channel
publishChannel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// This callback is called when a message is successfully delivered
System.out.println("Message published successfully with deliveryTag: " + deliveryTag);
// You can use your own logic here to correlate the deliveryTag with your messages.
latch.countDown(); // Release the latch
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// This callback is called when a message fails to be delivered
//TODO We can set alert from here for [Message delivery failed with deliveryTag].
LOGGER_I.info("Message delivery failed with deliveryTag: {} and messageId :{}" , deliveryTag,messageId);
// You can use your own logic here to correlate the deliveryTag with your messages.
latch.countDown(); // Release the latch
}
});
// Publish the message with the deliveryTag as a message property
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // Persistent message to ensure if the server stops or crashes then after restart the server the message will still available
.messageId(String.valueOf(messageId))
.build();
publishChannel.basicPublish(exchangeName, batchRouting, properties, message.getBytes(StandardCharsets.UTF_8));
// Wait for the ConfirmListener callbacks or a specified timeout
/**
* We're waiting 500ms for the callback otherwise we treat this delivery as false.
* check the latch count is 0 or still 1 if 0 the condition will return true otherwise.
*/
if (latch.await(5000, TimeUnit.MILLISECONDS)) {
System.out.println("All messages were confirmed.");
} else {
//TODO we can set alert from here for [One or more messages failed to be confirmed within the timeout.]....
System.out.println("One or more messages failed to be confirmed within the timeout.");
}
// Close the channel after acknowledgments or timeout
publishChannel.close();
//LOGGER_I.info("Successfully send rejected data to queue :{}",statusQueue);
} catch (Exception e) {
//@TODO add alert during mq operation..
LOGGER_I.info("Queue Data Push time error :{}", e.getMessage(),e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment