Created
July 29, 2021 15:41
-
-
Save umgefahren/4255d9ffa5623825d2b8920e4f1cf3fd to your computer and use it in GitHub Desktop.
libp2p Kademlia with async interface
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use libp2p::swarm::SwarmEvent; | |
use libp2p::NetworkBehaviour; | |
use libp2p::PeerId; | |
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record, QueryResult}; | |
use libp2p::kad::store::MemoryStore; | |
use libp2p::mdns::{Mdns, MdnsEvent, MdnsConfig}; | |
use libp2p::kad::record::Key; | |
use libp2p::{identity, noise, Transport, mplex, Swarm}; | |
use libp2p::tcp::TcpConfig; | |
use libp2p::core::upgrade; | |
use tokio::io::AsyncBufReadExt; | |
use libp2p::futures::StreamExt; | |
#[derive(NetworkBehaviour)] | |
#[behaviour(out_event = "OutEvent", event_process = false)] | |
struct MyBehaviour { | |
kademlia: Kademlia<MemoryStore>, | |
mdns: Mdns, | |
} | |
#[derive(Debug)] | |
pub enum OutEvent { | |
Kademlia(KademliaEvent), | |
Mdns(MdnsEvent) | |
} | |
impl From<KademliaEvent> for OutEvent { | |
fn from(event: KademliaEvent) -> Self { | |
Self::Kademlia(event) | |
} | |
} | |
impl From<MdnsEvent> for OutEvent { | |
fn from(event: MdnsEvent) -> Self { | |
Self::Mdns(event) | |
} | |
} | |
async fn handle_input_line(kademlia: &mut Dht, line: String) { | |
let mut args = line.split(' '); | |
match args.next() { | |
Some("GET") => { | |
let key = { | |
match args.next() { | |
Some(key) => Key::new(&key), | |
None => { | |
eprintln!("Expected key"); | |
return; | |
} | |
} | |
}; | |
let res = kademlia.get(&key).await.unwrap(); | |
println!("Got Record {:?} {:?}", | |
std::str::from_utf8(res.key.as_ref()).unwrap(), | |
std::str::from_utf8(res.value.as_slice()).unwrap(), | |
); | |
} | |
Some("PUT") => { | |
let key = { | |
match args.next() { | |
Some(key) => Key::new(&key), | |
None => { | |
eprintln!("Expected key"); | |
return; | |
} | |
} | |
}; | |
let value = { | |
match args.next() { | |
Some(value) => value.as_bytes().to_vec(), | |
None => { | |
eprintln!("Expected value"); | |
return; | |
} | |
} | |
}; | |
let record = Record { | |
key, | |
value, | |
publisher: None, | |
expires: None, | |
}; | |
let res = kademlia.put(record).await.unwrap(); | |
println!( | |
"Successfully put record {:?}", | |
std::str::from_utf8(res.as_ref()).unwrap(), | |
); | |
}, | |
Some("PUT_PROVIDER") => { | |
let key = { | |
match args.next() { | |
Some(key ) => Key::new(&key), | |
None => { | |
eprintln!("Expected key"); | |
return; | |
} | |
} | |
}; | |
let res = kademlia.put_provider(key) | |
.await.expect("Failed to start providing key"); | |
println!( | |
"Successfully provided for key {:?}", | |
std::str::from_utf8(res.as_ref()).unwrap(), | |
); | |
} | |
Some("GET_PROVIDERS") => { | |
let key = { | |
match args.next() { | |
Some(key) => Key::new(&key), | |
None => { | |
eprintln!("Expected key"); | |
return | |
} | |
} | |
}; | |
let res = kademlia.get_providers(key.clone()) | |
.await.expect("Failed to get key provider"); | |
for peer in res { | |
println!( | |
"Peer {:?} provides key {:?}", | |
peer, | |
std::str::from_utf8(key.as_ref()).unwrap() | |
); | |
} | |
} | |
_ => { | |
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER"); | |
} | |
} | |
} | |
struct Dht (Swarm<MyBehaviour>); | |
impl Dht { | |
pub fn new(swarm: Swarm<MyBehaviour>) -> Self { | |
Self(swarm) | |
} | |
pub async fn put(&mut self, value: Record) -> Result<Key, String> { | |
let behaviour = self.0.behaviour_mut(); | |
behaviour.kademlia.put_record(value.clone(), Quorum::One).unwrap(); | |
let res = loop { | |
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted {result, .. })) = self.0.select_next_some().await { | |
break result; | |
} | |
}; | |
match res { | |
QueryResult::PutRecord(d) => { | |
match d { | |
Ok(dd) => { | |
Ok(dd.key) | |
} | |
Err(e) => { | |
Err(format!("{:?}", e)) | |
} | |
} | |
} | |
_ => { | |
Err("Something went wrong".to_string()) | |
} | |
} | |
} | |
pub async fn get(&mut self, key: &Key) -> Result<Record, &'static str> { | |
let behaviour = self.0.behaviour_mut(); | |
behaviour.kademlia.get_record(key, Quorum::One); | |
let res = loop { | |
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) = self.0.select_next_some().await { | |
break result; | |
} | |
}; | |
match res { | |
QueryResult::GetRecord(d) => { | |
match d { | |
Ok(dd) => { | |
Ok(dd.records.get(0).unwrap().record.clone()) | |
} | |
Err(_) => { | |
Err("something went wrong again") | |
} | |
} | |
} | |
_ => { | |
Err("Something went wrong") | |
} | |
} | |
} | |
pub async fn put_provider(&mut self, key: Key) -> Result<Key, String> { | |
let behaviour = self.0.behaviour_mut(); | |
behaviour.kademlia.start_providing(key).expect("Failed to start providing key"); | |
let res = loop { | |
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, .. })) = self.0.select_next_some().await { | |
break result; | |
} | |
}; | |
match res { | |
QueryResult::StartProviding(d) => { | |
match d { | |
Ok(dd) => Ok(dd.key), | |
Err(e) => Err(format!("{:?}", e)), | |
} | |
} | |
_ => { | |
Err("Something went wrong".to_string()) | |
} | |
} | |
} | |
pub async fn get_providers(&mut self, key: Key) -> Result<Vec<PeerId>, String> { | |
let behaviour = self.0.behaviour_mut(); | |
behaviour.kademlia.get_providers(key); | |
let res = loop { | |
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, .. })) = self.0.select_next_some().await { | |
break result; | |
} | |
}; | |
match res { | |
QueryResult::GetProviders(d) => { | |
match d { | |
Ok(dd) => Ok(dd.closest_peers), | |
Err(e) => Err(format!("{:?}", e)), | |
} | |
} | |
_ => { | |
Err("Something went wrong".to_string()) | |
} | |
} | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let id_keys = identity::Keypair::generate_ed25519(); | |
let peer_id = PeerId::from(id_keys.public()); | |
println!("Local peer id => {:?}", peer_id); | |
let noise_keys = noise::Keypair::<noise::X25519Spec>::new() | |
.into_authentic(&id_keys) | |
.expect("Signing libp2p-noise static DH keypair failed."); | |
let transport = TcpConfig::new() | |
.nodelay(true) | |
.upgrade(upgrade::Version::V1) | |
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) | |
.multiplex(mplex::MplexConfig::new()) | |
.boxed(); | |
let mut swarm = { | |
let store = MemoryStore::new(peer_id); | |
let kademlia = Kademlia::new(peer_id, store); | |
let mdns = Mdns::new(MdnsConfig::default()).await.unwrap(); | |
let behaviour = MyBehaviour { | |
kademlia, | |
mdns, | |
}; | |
Swarm::new(transport, behaviour, peer_id) | |
}; | |
let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines(); | |
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); | |
let mut managed_swarm = Dht::new(swarm); | |
loop { | |
tokio::select! { | |
line = stdin.next_line() => { | |
let line = line.unwrap().expect("stdin closed"); | |
handle_input_line(&mut managed_swarm, line).await; | |
} | |
event = managed_swarm.0.select_next_some() => { | |
match event { | |
SwarmEvent::NewListenAddr { address, .. } => { | |
println!("Listening on {:?}", address); | |
} | |
SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Discovered(list))) => { | |
for (peer_id, multiaddr) in list { | |
managed_swarm.0.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); | |
} | |
} | |
SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired(list))) => { | |
for (peer_id, multiaddr) in list { | |
managed_swarm.0.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr) | |
.expect("Error removing address"); | |
} | |
} | |
_ => {} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment