Created
March 13, 2021 23:47
-
-
Save hbinduni/3b8cc9892895841e0f4250c6c9af62de to your computer and use it in GitHub Desktop.
Redis publish and subscribe to stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const redis = require('redis'); | |
const {promisify} = require('util'); | |
const waitFor = (ms) => new Promise((r) => setTimeout(r, ms)); | |
const host = 'localhost'; | |
const port = 6379; | |
const password = 'xxxxx-xx-xxxx'; | |
const options = {host, port, password}; | |
const subscriber = redis.createClient(options); | |
const publisher = redis.createClient(options); | |
const xaddAsync = promisify(publisher.xadd).bind(publisher); | |
const xreadgroupAsync = promisify(subscriber.xreadgroup).bind(subscriber); | |
const xackAsync = promisify(subscriber.xack).bind(subscriber); | |
const xgroupAsync = promisify(subscriber.xgroup).bind(subscriber); | |
let timeToWait = 1000; | |
let ID = '0-0'; | |
const getRndInteger = (min, max) => { | |
return Math.floor(Math.random() * (max - min + 1) ) + min; | |
}; | |
const processMsgs = async ({messages, stream, groupName}) => { | |
for await (const msg of messages[0][1]) { | |
const id = msg[0]; | |
const key = msg[1][0]; | |
const content = msg[1][1]; | |
console.log(`id: ${id}. key: ${key} => content: ${content}`); | |
await xackAsync(stream, groupName, id); | |
console.log('send ack', id); | |
ID = id; | |
} | |
}; | |
const xgroup = async ({stream, groupName}) => { | |
await xgroupAsync('CREATE', stream, groupName, '$', 'MKSTREAM'); | |
}; | |
//! NOTE: use option MKSTREAM to auto create stream | |
// XGROUP CREATE mystream mygroup $ MKSTREAM | |
const xreadgroup = async ({ groupName, consumerName, stream }) => { | |
while (true) { | |
try { | |
console.log('reading id', ID); | |
const messages = await xreadgroupAsync('GROUP', groupName, consumerName, 'BLOCK', '2000', 'COUNT', '10', 'STREAMS', stream, ID); | |
console.log('messages', messages); | |
if (!messages) { | |
console.log(`empty messages. waiting for ${timeToWait / 1000}s`); | |
await waitFor(timeToWait); | |
timeToWait += getRndInteger(600, 1000); | |
// reset timeToWait when > 5s | |
if (timeToWait / 1000 > 5) { | |
console.log('reset timeToWait to 1s'); | |
timeToWait = 1000; | |
} | |
continue; | |
} | |
if (!messages[0][1].length) { | |
console.log('end of history. get the latest data'); | |
ID = '>'; | |
} | |
await processMsgs({messages, stream, groupName}); | |
} catch (err) { | |
console.log('err', err.message); | |
} | |
} | |
} | |
const publish = ({stream}) => { | |
let timeInterval = 1000; | |
setInterval(async () => { | |
console.log('publish', timeInterval); | |
const msg = {a: 100, b: 200, c: 'ok'}; | |
await xaddAsync(stream, '*', `key1-${Date.now()}`, JSON.stringify(msg)); | |
timeInterval += getRndInteger(600, 1000); | |
}, timeInterval); | |
}; | |
(async () => { | |
const stream = 'stream1'; | |
const groupName = 'group1'; | |
const consumerName = 'consumer1'; | |
// uncomment if you need sample publish to test | |
// publish({stream}); | |
// wait 1s before do subscribe stream | |
//! NOTE: dont do await without setTimeout, it will blocking the whole process | |
setTimeout(async () => { | |
try { | |
console.log('create group stream if none'); | |
await xgroup({stream, groupName}); | |
} catch (err) { | |
console.log('normal err', err.message); | |
} | |
await xreadgroup({groupName, consumerName, stream, id: '0-0'}); | |
}, 1000); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment