Created
March 14, 2023 20:56
-
-
Save martenc/d5dba51debd9275ead7f08474232af2a to your computer and use it in GitHub Desktop.
This example listens for messages from the specified Kafka topic and submits a transaction to the Hyperledger Fabric network with the received message as an argument.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Kafka } = require('kafka-node'); | |
const { FileSystemWallet, Gateway } = require('fabric-network'); | |
const path = require('path'); | |
const fs = require('fs'); | |
// Load the connection profile | |
const ccpPath = path.resolve(__dirname, '..', 'your_connection_profile.json'); | |
const ccpJSON = fs.readFileSync(ccpPath, 'utf8'); | |
const ccp = JSON.parse(ccpJSON); | |
// Wallet directory | |
const walletPath = path.join(process.cwd(), 'wallet'); | |
const wallet = new FileSystemWallet(walletPath); | |
// Kafka configuration | |
const kafkaClientOptions = { | |
kafkaHost: 'localhost:9092', // Change this to your Kafka server address and port | |
}; | |
const kafkaClient = new Kafka.KafkaClient(kafkaClientOptions); | |
const kafkaTopic = 'my-topic'; // Change this to your desired Kafka topic | |
// Hyperledger Fabric configuration | |
const fabricUser = 'User1'; | |
const fabricChannel = 'mychannel'; | |
const fabricChaincode = 'mycc'; | |
async function main() { | |
// Create Kafka consumer | |
const consumer = new Kafka.Consumer(kafkaClient, [{ topic: kafkaTopic }], { | |
groupId: 'kafka-to-hyperledger', | |
}); | |
// Check if user is already enrolled | |
const userExists = await wallet.exists(fabricUser); | |
if (!userExists) { | |
console.log(`An identity for the user ${fabricUser} does not exist in the wallet`); | |
console.log('Run the registerUser.js application before retrying'); | |
return; | |
} | |
// Connect to the gateway | |
const gateway = new Gateway(); | |
await gateway.connect(ccp, { wallet, identity: fabricUser, discovery: { enabled: true, asLocalhost: true } }); | |
// Get the network (channel) our contract is deployed to | |
const network = await gateway.getNetwork(fabricChannel); | |
// Get the contract (chaincode) from the network | |
const contract = network.getContract(fabricChaincode); | |
// Process messages from Kafka | |
consumer.on('message', async (message) => { | |
console.log(`Received message: ${message.value}`); | |
try { | |
// Send the message to the chaincode (invoke a transaction) | |
const result = await contract.submitTransaction('your_chaincode_function', message.value); | |
console.log(`Transaction result: ${result.toString('utf8')}`); | |
} catch (error) { | |
console.error(`Failed to submit transaction: ${error}`); | |
} | |
}); | |
consumer.on('error', (err) => { | |
console.error('Kafka consumer error:', err); | |
}); | |
} | |
main().then(() => { | |
console.log('Kafka to Hyperledger Fabric data sink started'); | |
}).catch((error) => { | |
console.error('Error:', error); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment