Skip to content

Instantly share code, notes, and snippets.

@the-pavels
Created November 11, 2023 17:07
Show Gist options
  • Save the-pavels/87db4f11ec28125e1af3988d402af41c to your computer and use it in GitHub Desktop.
Save the-pavels/87db4f11ec28125e1af3988d402af41c to your computer and use it in GitHub Desktop.
import org.apache.pulsar.client.api.*;
public class PulsarConsumer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("non-persistent://public/default/my-topic")
.subscriptionName("my-shared-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
while (true) {
// Wait for a message
Message<String> msg = consumer.receive();
try {
System.out.printf("Message received: %s\n", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
// Never reaches here in this example
// consumer.close();
// client.close();
}
}
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
public class PulsarProducer {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("non-persistent://public/default/my-topic")
.create();
while (true) {
System.out.println("Sending message...");
String msg = """
{ "foo": "bar" }""";
producer.send(msg);
Thread.sleep(5000); // Sleep for 5 seconds
}
// Never reaches here in this example
// producer.close();
// client.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment