Skip to content

Instantly share code, notes, and snippets.

@martenc
Created March 14, 2023 20:56
Show Gist options
  • Save martenc/d5dba51debd9275ead7f08474232af2a to your computer and use it in GitHub Desktop.
Save martenc/d5dba51debd9275ead7f08474232af2a to your computer and use it in GitHub Desktop.
This example listens for messages from the specified Kafka topic and submits a transaction to the Hyperledger Fabric network with the received message as an argument.
const { Kafka } = require('kafka-node');
const { FileSystemWallet, Gateway } = require('fabric-network');
const path = require('path');
const fs = require('fs');
// Load the connection profile
const ccpPath = path.resolve(__dirname, '..', 'your_connection_profile.json');
const ccpJSON = fs.readFileSync(ccpPath, 'utf8');
const ccp = JSON.parse(ccpJSON);
// Wallet directory
const walletPath = path.join(process.cwd(), 'wallet');
const wallet = new FileSystemWallet(walletPath);
// Kafka configuration
const kafkaClientOptions = {
kafkaHost: 'localhost:9092', // Change this to your Kafka server address and port
};
const kafkaClient = new Kafka.KafkaClient(kafkaClientOptions);
const kafkaTopic = 'my-topic'; // Change this to your desired Kafka topic
// Hyperledger Fabric configuration
const fabricUser = 'User1';
const fabricChannel = 'mychannel';
const fabricChaincode = 'mycc';
async function main() {
// Create Kafka consumer
const consumer = new Kafka.Consumer(kafkaClient, [{ topic: kafkaTopic }], {
groupId: 'kafka-to-hyperledger',
});
// Check if user is already enrolled
const userExists = await wallet.exists(fabricUser);
if (!userExists) {
console.log(`An identity for the user ${fabricUser} does not exist in the wallet`);
console.log('Run the registerUser.js application before retrying');
return;
}
// Connect to the gateway
const gateway = new Gateway();
await gateway.connect(ccp, { wallet, identity: fabricUser, discovery: { enabled: true, asLocalhost: true } });
// Get the network (channel) our contract is deployed to
const network = await gateway.getNetwork(fabricChannel);
// Get the contract (chaincode) from the network
const contract = network.getContract(fabricChaincode);
// Process messages from Kafka
consumer.on('message', async (message) => {
console.log(`Received message: ${message.value}`);
try {
// Send the message to the chaincode (invoke a transaction)
const result = await contract.submitTransaction('your_chaincode_function', message.value);
console.log(`Transaction result: ${result.toString('utf8')}`);
} catch (error) {
console.error(`Failed to submit transaction: ${error}`);
}
});
consumer.on('error', (err) => {
console.error('Kafka consumer error:', err);
});
}
main().then(() => {
console.log('Kafka to Hyperledger Fabric data sink started');
}).catch((error) => {
console.error('Error:', error);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment