Skip to content

Instantly share code, notes, and snippets.

@umgefahren
Created July 29, 2021 15:41
Show Gist options
  • Save umgefahren/4255d9ffa5623825d2b8920e4f1cf3fd to your computer and use it in GitHub Desktop.
Save umgefahren/4255d9ffa5623825d2b8920e4f1cf3fd to your computer and use it in GitHub Desktop.
libp2p Kademlia with async interface
use libp2p::swarm::SwarmEvent;
use libp2p::NetworkBehaviour;
use libp2p::PeerId;
use libp2p::kad::{Kademlia, KademliaEvent, Quorum, Record, QueryResult};
use libp2p::kad::store::MemoryStore;
use libp2p::mdns::{Mdns, MdnsEvent, MdnsConfig};
use libp2p::kad::record::Key;
use libp2p::{identity, noise, Transport, mplex, Swarm};
use libp2p::tcp::TcpConfig;
use libp2p::core::upgrade;
use tokio::io::AsyncBufReadExt;
use libp2p::futures::StreamExt;
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
mdns: Mdns,
}
#[derive(Debug)]
pub enum OutEvent {
Kademlia(KademliaEvent),
Mdns(MdnsEvent)
}
impl From<KademliaEvent> for OutEvent {
fn from(event: KademliaEvent) -> Self {
Self::Kademlia(event)
}
}
impl From<MdnsEvent> for OutEvent {
fn from(event: MdnsEvent) -> Self {
Self::Mdns(event)
}
}
async fn handle_input_line(kademlia: &mut Dht, line: String) {
let mut args = line.split(' ');
match args.next() {
Some("GET") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
let res = kademlia.get(&key).await.unwrap();
println!("Got Record {:?} {:?}",
std::str::from_utf8(res.key.as_ref()).unwrap(),
std::str::from_utf8(res.value.as_slice()).unwrap(),
);
}
Some("PUT") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
let value = {
match args.next() {
Some(value) => value.as_bytes().to_vec(),
None => {
eprintln!("Expected value");
return;
}
}
};
let record = Record {
key,
value,
publisher: None,
expires: None,
};
let res = kademlia.put(record).await.unwrap();
println!(
"Successfully put record {:?}",
std::str::from_utf8(res.as_ref()).unwrap(),
);
},
Some("PUT_PROVIDER") => {
let key = {
match args.next() {
Some(key ) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
let res = kademlia.put_provider(key)
.await.expect("Failed to start providing key");
println!(
"Successfully provided for key {:?}",
std::str::from_utf8(res.as_ref()).unwrap(),
);
}
Some("GET_PROVIDERS") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return
}
}
};
let res = kademlia.get_providers(key.clone())
.await.expect("Failed to get key provider");
for peer in res {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
_ => {
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
}
}
}
struct Dht (Swarm<MyBehaviour>);
impl Dht {
pub fn new(swarm: Swarm<MyBehaviour>) -> Self {
Self(swarm)
}
pub async fn put(&mut self, value: Record) -> Result<Key, String> {
let behaviour = self.0.behaviour_mut();
behaviour.kademlia.put_record(value.clone(), Quorum::One).unwrap();
let res = loop {
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted {result, .. })) = self.0.select_next_some().await {
break result;
}
};
match res {
QueryResult::PutRecord(d) => {
match d {
Ok(dd) => {
Ok(dd.key)
}
Err(e) => {
Err(format!("{:?}", e))
}
}
}
_ => {
Err("Something went wrong".to_string())
}
}
}
pub async fn get(&mut self, key: &Key) -> Result<Record, &'static str> {
let behaviour = self.0.behaviour_mut();
behaviour.kademlia.get_record(key, Quorum::One);
let res = loop {
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) = self.0.select_next_some().await {
break result;
}
};
match res {
QueryResult::GetRecord(d) => {
match d {
Ok(dd) => {
Ok(dd.records.get(0).unwrap().record.clone())
}
Err(_) => {
Err("something went wrong again")
}
}
}
_ => {
Err("Something went wrong")
}
}
}
pub async fn put_provider(&mut self, key: Key) -> Result<Key, String> {
let behaviour = self.0.behaviour_mut();
behaviour.kademlia.start_providing(key).expect("Failed to start providing key");
let res = loop {
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, .. })) = self.0.select_next_some().await {
break result;
}
};
match res {
QueryResult::StartProviding(d) => {
match d {
Ok(dd) => Ok(dd.key),
Err(e) => Err(format!("{:?}", e)),
}
}
_ => {
Err("Something went wrong".to_string())
}
}
}
pub async fn get_providers(&mut self, key: Key) -> Result<Vec<PeerId>, String> {
let behaviour = self.0.behaviour_mut();
behaviour.kademlia.get_providers(key);
let res = loop {
if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, .. })) = self.0.select_next_some().await {
break result;
}
};
match res {
QueryResult::GetProviders(d) => {
match d {
Ok(dd) => Ok(dd.closest_peers),
Err(e) => Err(format!("{:?}", e)),
}
}
_ => {
Err("Something went wrong".to_string())
}
}
}
}
#[tokio::main]
async fn main() {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
println!("Local peer id => {:?}", peer_id);
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&id_keys)
.expect("Signing libp2p-noise static DH keypair failed.");
let transport = TcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(mplex::MplexConfig::new())
.boxed();
let mut swarm = {
let store = MemoryStore::new(peer_id);
let kademlia = Kademlia::new(peer_id, store);
let mdns = Mdns::new(MdnsConfig::default()).await.unwrap();
let behaviour = MyBehaviour {
kademlia,
mdns,
};
Swarm::new(transport, behaviour, peer_id)
};
let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
let mut managed_swarm = Dht::new(swarm);
loop {
tokio::select! {
line = stdin.next_line() => {
let line = line.unwrap().expect("stdin closed");
handle_input_line(&mut managed_swarm, line).await;
}
event = managed_swarm.0.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Discovered(list))) => {
for (peer_id, multiaddr) in list {
managed_swarm.0.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired(list))) => {
for (peer_id, multiaddr) in list {
managed_swarm.0.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr)
.expect("Error removing address");
}
}
_ => {}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment