Skip to content

Instantly share code, notes, and snippets.

@Sanjdcool
Forked from piyushgarg-dev/README.md
Last active March 2, 2025 13:37
Show Gist options
  • Save Sanjdcool/3ee5659d6a9315fc1110320f4efaf1d5 to your computer and use it in GitHub Desktop.
Save Sanjdcool/3ee5659d6a9315fc1110320f4efaf1d5 to your computer and use it in GitHub Desktop.
Kafka Crash Course
# Kafka
Video Link: [Apache Kafka Crash Course | What is Kafka?](https://youtu.be/ZJJHm_bd9Zo)
## Prerequisite
- Knowledge
- Node.JS Intermediate level
- Experience with designing distributed systems
- Tools
- Node.js: [Download Node.JS](https://nodejs.org/en)
- Docker: [Download Docker](https://www.docker.com)
- VsCode: [Download VSCode](https://code.visualstudio.com)
## Commands
- Start Zookeper Container and expose PORT `2181`.
```bash
docker run -p 2181:2181 zookeeper
```
- Start Kafka Container, expose PORT `9092` and setup ENV variables.
```bash
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka
```
## Code
`client.js`
```js
const { Kafka } = require("kafkajs");
exports.kafka = new Kafka({
clientId: "my-app",
brokers: ["<PRIVATE_IP>:9092"],
});
```
`admin.js`
```js
const { kafka } = require("./client");
async function init() {
const admin = kafka.admin();
console.log("Admin connecting...");
admin.connect();
console.log("Adming Connection Success...");
console.log("Creating Topic [rider-updates]");
await admin.createTopics({
topics: [
{
topic: "rider-updates",
numPartitions: 2,
},
],
});
console.log("Topic Created Success [rider-updates]");
console.log("Disconnecting Admin..");
await admin.disconnect();
}
init();
```
`producer.js`
```js
const { kafka } = require("./client");
const readline = require("readline");
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
async function init() {
const producer = kafka.producer();
console.log("Connecting Producer");
await producer.connect();
console.log("Producer Connected Successfully");
rl.setPrompt("> ");
rl.prompt();
rl.on("line", async function (line) {
const [riderName, location] = line.split(" ");
await producer.send({
topic: "rider-updates",
messages: [
{
partition: location.toLowerCase() === "north" ? 0 : 1,
key: "location-update",
value: JSON.stringify({ name: riderName, location }),
},
],
});
}).on("close", async () => {
await producer.disconnect();
});
}
init();
```
`consumer.js`
```js
const { kafka } = require("./client");
const group = process.argv[2];
async function init() {
const consumer = kafka.consumer({ groupId: group });
await consumer.connect();
await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
console.log(
`${group}: [${topic}]: PART:${partition}:`,
message.value.toString()
);
},
});
}
init();
```
## Running Locally
- Run Multiple Consumers
```bash
node consumer.js <GROUP_NAME>
```
- Create Producer
```bash
node producer.js
```
```bash
> tony south
> tony north
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment