Created
August 22, 2024 08:55
-
-
Save MdGolam-Kibria/dcfc8e8a8e797f98680e28f016d7ba87 to your computer and use it in GitHub Desktop.
Push data to Rabbit MQ queue using java with publishing guarantee
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void produceBatchesDataToQueue(List<String> tranIDs, Integer eventId, String actionPoint) throws IOException, TimeoutException { | |
for (int i = 0; i < tranIDs.size(); i++) { | |
LOGGER_I.info("Going to bind actual request data"); | |
JBCollectionPolicyModelRequest request = buildAarongApiCallbackModel(tranIDs.get(i), actionPoint); | |
request.setEventId(eventId); | |
String message = gson.toJson(request); | |
LOGGER_I.info("Successfully bind data and prepare message :{}", message); | |
try { | |
LOGGER_I.info("Going to send rejected data to queue :{}",statusQueue); | |
//Now send data to queue | |
Channel publishChannel = ConnectionManager.getConnection(port, username, password, host).createChannel(); | |
publishChannel.queueBind(statusQueue, exchangeName, batchRouting); | |
publishChannel.confirmSelect(); | |
//crete messageId | |
long messageId = System.nanoTime(); | |
// Create a CountDownLatch with a count of 1 to wait for acknowledgments | |
CountDownLatch latch = new CountDownLatch(1); | |
// Add a ConfirmListener for the channel | |
publishChannel.addConfirmListener(new ConfirmListener() { | |
@Override | |
public void handleAck(long deliveryTag, boolean multiple) throws IOException { | |
// This callback is called when a message is successfully delivered | |
System.out.println("Message published successfully with deliveryTag: " + deliveryTag); | |
// You can use your own logic here to correlate the deliveryTag with your messages. | |
latch.countDown(); // Release the latch | |
} | |
@Override | |
public void handleNack(long deliveryTag, boolean multiple) throws IOException { | |
// This callback is called when a message fails to be delivered | |
//TODO We can set alert from here for [Message delivery failed with deliveryTag]. | |
LOGGER_I.info("Message delivery failed with deliveryTag: {} and messageId :{}" , deliveryTag,messageId); | |
// You can use your own logic here to correlate the deliveryTag with your messages. | |
latch.countDown(); // Release the latch | |
} | |
}); | |
// Publish the message with the deliveryTag as a message property | |
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() | |
.deliveryMode(2) // Persistent message to ensure if the server stops or crashes then after restart the server the message will still available | |
.messageId(String.valueOf(messageId)) | |
.build(); | |
publishChannel.basicPublish(exchangeName, batchRouting, properties, message.getBytes(StandardCharsets.UTF_8)); | |
// Wait for the ConfirmListener callbacks or a specified timeout | |
/** | |
* We're waiting 500ms for the callback otherwise we treat this delivery as false. | |
* check the latch count is 0 or still 1 if 0 the condition will return true otherwise. | |
*/ | |
if (latch.await(5000, TimeUnit.MILLISECONDS)) { | |
System.out.println("All messages were confirmed."); | |
} else { | |
//TODO we can set alert from here for [One or more messages failed to be confirmed within the timeout.].... | |
System.out.println("One or more messages failed to be confirmed within the timeout."); | |
} | |
// Close the channel after acknowledgments or timeout | |
publishChannel.close(); | |
//LOGGER_I.info("Successfully send rejected data to queue :{}",statusQueue); | |
} catch (Exception e) { | |
//@TODO add alert during mq operation.. | |
LOGGER_I.info("Queue Data Push time error :{}", e.getMessage(),e); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment