Rust P2P Network Application Practical – 2 P2P Chat Program

In the article <<Rust P2P Network Application Practical – 1 P2P Core Concepts and Ping Program>>, we introduced the core concepts of P2P networks and analyzed a simple P2P Ping application using the libp2p library. In this article, we will analyze a slightly more complex P2P chat program.

This chat program adds a node auto-discovery mechanism based on the PING program, and message sending uses a publish/subscribe model, with encrypted network transmission between nodes.

Therefore, the following protocols are required:

  • floodsub protocol: one of the pub/sub schemes in libp2p, suitable for broadcasting messages in small networks.

  • mDNS protocol: local node discovery protocol.

  • noise protocol: transport layer security encryption protocol.

In libp2p, floodsub and mDNS belong to network behaviors, and we create our own network behavior to combine them.

Add new dependencies in the Cargo.toml file:

[dependencies]
anyhow = "1"
libp2p = { version = "0.46",  features = ["tcp-tokio"] }
tokio = { version = "1.19", features = ["full"] }
futures = "0.3.1"

Create a chat.rs file in the src/bin/ directory:

// Custom network behavior, combining floodsub and mDNS.
#[derive(NetworkBehaviour)]
#[behaviour(event_process = true)]
struct MyBehaviour {
    floodsub: Floodsub,
    mdns: Mdns,
}

impl MyBehaviour {
    // Pass in peerId to construct MyBehaviour
    async fn new(id: PeerId) -> Result<Self> {
        Ok(Self {
            // Initialize floodsub protocol
            floodsub: Floodsub::new(id),
            // Initialize mDNS protocol
            mdns: Mdns::new(Default::default()).await?,
        })
    }
}

// Process Floodsub network behavior events
impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
    // This method is called when a floodsub event occurs.
    fn inject_event(&mut self, message: FloodsubEvent) {
        // Display received message and source
        if let FloodsubEvent::Message(message) = message {
            println!(
                "Received message: '{:?}' from {:?}",
                String::from_utf8_lossy(&message.data),
                message.source
            );
        }
    }
}

// Process mDNS network behavior events
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
    // This method is called when an mDNS event occurs.
    fn inject_event(&mut self, event: MdnsEvent) {
        match event {
            // When a new node is discovered, add it to the list of nodes propagating messages.
            MdnsEvent::Discovered(list) => {
                for (peer, _) in list {
                    self.floodsub.add_node_to_partial_view(peer);
                    println!("Added node to the network: {peer} ");
                }
            }
            // When a node expires, remove it from the list of nodes propagating messages.
            MdnsEvent::Expired(list) => {
                for (peer, _) in list {
                    if !self.mdns.has_node(&peer) {
                        self.floodsub.remove_node_from_partial_view(&peer);
                        println!("Removed node from the network: {peer} ");
                    }
                }
            }
        }
    }
}

Next, let’s improve the main function:

#[tokio::main]
async fn main() -> Result<()> {
    // Generate key pair
    let id_keys = identity::Keypair::generate_ed25519();

    // Generate node unique identifier peerId based on the public key of the key pair
    let peer_id = PeerId::from(id_keys.public());
    println!("Node ID: {peer_id}");

    // Create noise key pair
    let noise_keys = noise::Keypair::<noise::x25519spec>::new().into_authentic(&id_keys)?;

    // Create a Tokio-based TCP transport layer, using noise for authentication.
    // Due to an additional layer of encryption, use yamux for multiplexing over TCP streams.
    let transport = TokioTcpTransport::new(GenTcpConfig::default().nodelay(true))
        .upgrade(upgrade::Version::V1)
        .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
        .multiplex(yamux::YamuxConfig::default())
        .boxed();

    // Create Floodsub topic
    let floodsub_topic = floodsub::Topic::new("chat");

    // Create Swarm to manage node network and events.
    let mut swarm = {
        let mut behaviour = MyBehaviour::new(peer_id).await?;

        // Subscribe to floodsub topic
        behaviour.floodsub.subscribe(floodsub_topic.clone());

        SwarmBuilder::new(transport, behaviour, peer_id)
            .executor(Box::new(|fut| {
                tokio::spawn(fut);
            }))
            .build()
    };

    // Specify a remote node for manual linking.
    if let Some(to_dial) = std::env::args().nth(1) {
        let addr: Multiaddr = to_dial.parse()?;
        swarm.dial(addr)?;
        println!("Connecting to remote node: {to_dial}");
    }

    // Read messages from standard input
    let mut stdin = io::BufReader::new(io::stdin()).lines();

    // Listen on the port allocated by the operating system
    swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?;

    loop {
        tokio::select! {
            line = stdin.next_line() => {
                let line = line?.expect("stdin closed");
                // After reading a message from standard input, publish it to nodes subscribed to the floodsub topic.
                swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
            }
            event = swarm.select_next_some() => {
                if let SwarmEvent::NewListenAddr { address, .. } = event {
                    println!("Local listening address: {address}");
                }
            }
        }
    }
}
</noise::x25519spec>

Open two terminals and run:

cargo run --bin chat

Output from the first terminal:

master:libp2p-learn Justin$ cargo run --bin chat    Finished dev [unoptimized + debuginfo] target(s) in 0.29s     Running `target/debug/chat`Node ID: 12D3KooWFt1xhyL5ZfJAR2SxUAQLEeb4ryTtQ5T2VtZsNGRKmB5JLocal listening address: /ip4/127.0.0.1/tcp/52318Added node to the network: 12D3KooWLDojjNzDHYUZCpqV4AL4XTMGScvHHAe8vsJrpCpYv2pT Added node to the network: 12D3KooWLDojjNzDHYUZCpqV4AL4XTMGScvHHAe8vsJrpCpYv2pT

Output from the second terminal:

master:libp2p-learn Justin$ cargo run --bin chat    Finished dev [unoptimized + debuginfo] target(s) in 0.29s     Running `target/debug/chat`Node ID: 12D3KooWLDojjNzDHYUZCpqV4AL4XTMGScvHHAe8vsJrpCpYv2pTLocal listening address: /ip4/127.0.0.1/tcp/52320Added node to the network: 12D3KooWFt1xhyL5ZfJAR2SxUAQLEeb4ryT Added node to the network: 12D3KooWFt1xhyL5ZfJAR2SxUAQLEeb4ryTtQ5T2VtZsNGRKmB5J

The two nodes automatically discovered each other and joined the P2P network.

We send ‘hello’ from the second node, and the first node prints:

Received message: '"hello"' from PeerId("12D3KooWLDojjNzDHYUZCpqV4AL4XTMGScvHHAe8vsJrpCpYv2pT")

Then the first node replies ‘world’, and the second node prints:

Received message: '"world"' from PeerId("12D3KooWFt1xhyL5ZfJAR2SxUAQLEeb4ryTtQ5T2VtZsNGRKmB5J")

You can also open three or more terminals for testing.

The next article will provide a detailed analysis of the distributed key-value storage program.

Leave a Comment