Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  csp_ping_pong.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

//! Example for usage of the Chat Server Protocol state machine, doing a real handshake with the
//! chat server and an exemplary payload flow loop.
#![expect(unused_crate_dependencies, reason = "Example triggered false positive")]
#![expect(
    clippy::integer_division_remainder_used,
    reason = "Some internal of tokio::select triggers this"
)]

use core::time::Duration;
use std::io;

use anyhow::bail;
use clap::Parser;
use libthreema::{
    cli::{FullIdentityConfig, FullIdentityConfigOptions},
    csp::{
        CspProtocol, CspProtocolContext, CspStateUpdate,
        payload::{EchoPayload, IncomingPayload, OutgoingFrame, OutgoingPayload},
    },
    https::cli::https_client_builder,
    utils::logging::init_stderr_logging,
};
use tokio::{
    io::{AsyncReadExt as _, AsyncWriteExt as _},
    net::TcpStream,
    signal,
    sync::mpsc,
    time::{self, Instant},
};
use tracing::{Level, debug, error, info, trace, warn};

#[derive(Parser)]
#[command()]
struct CspPingPongCommand {
    #[command(flatten)]
    config: FullIdentityConfigOptions,
}

/// Payload queues for the main process
struct PayloadQueuesForCspPingPong {
    incoming: mpsc::Receiver<IncomingPayload>,
    outgoing: mpsc::Sender<OutgoingPayload>,
}

/// Payload queues for the protocol flow runner
struct PayloadQueuesForCsp {
    incoming: mpsc::Sender<IncomingPayload>,
    outgoing: mpsc::Receiver<OutgoingPayload>,
}

struct CspProtocolRunner {
    /// The TCP stream
    stream: TcpStream,

    /// An instance of the [`CspProtocol`] state machine
    protocol: CspProtocol,
}
impl CspProtocolRunner {
    /// Initiate a CSP protocol connection and hand out the initial `client_hello` message
    #[tracing::instrument(skip_all)]
    async fn new(
        server_address: Vec<(String, u16)>,
        context: CspProtocolContext,
    ) -> anyhow::Result<(Self, OutgoingFrame)> {
        // Connect via TCP
        debug!(?server_address, "Establishing TCP connection to chat server",);
        let tcp_stream = TcpStream::connect(
            server_address
                .first()
                .expect("CSP config should have at least one address"),
        )
        .await?;

        // Create the protocol
        let (csp_protocol, client_hello) = CspProtocol::new(context);
        Ok((
            Self {
                stream: tcp_stream,
                protocol: csp_protocol,
            },
            client_hello,
        ))
    }

    /// Do the handshake with the chat server by exchanging the following messages:
    ///
    /// ```txt
    /// C -- client-hello -> S
    /// C <- server-hello -- S
    /// C ---- login ---- -> S
    /// C <-- login-ack ---- S
    /// ```
    #[tracing::instrument(skip_all)]
    async fn run_handshake_flow(&mut self, client_hello: OutgoingFrame) -> anyhow::Result<()> {
        // Send the client hello
        debug!(length = client_hello.0.len(), "Sending client hello");
        self.send(&client_hello.0).await?;

        // Handshake by polling the CSP state
        for iteration in 1_usize.. {
            trace!("Iteration #{iteration}");

            // Receive required bytes and add them
            let bytes = self.receive_required().await?;
            self.protocol.add_chunks(&[&bytes])?;

            // Handle instruction
            let Some(instruction) = self.protocol.poll()? else {
                continue;
            };

            // We do not expect an incoming payload at this stage
            if let Some(incoming_payload) = instruction.incoming_payload {
                let message = "Unexpected incoming payload during handshake";
                error!(?incoming_payload, message);
                bail!(message)
            }

            // Send any outgoing frame
            if let Some(frame) = instruction.outgoing_frame {
                self.send(&frame.0).await?;
            }

            // Check if we've completed the handshake
            if let Some(CspStateUpdate::PostHandshake(login_ack_data)) = instruction.state_update {
                info!(?login_ack_data, "Handshake complete");
                break;
            }
        }

        Ok(())
    }

    /// Run the payload exchange flow until stopped.
    #[tracing::instrument(skip_all)]
    async fn run_payload_flow(&mut self, mut queues: PayloadQueuesForCsp) -> anyhow::Result<()> {
        let mut read_buffer = [0_u8; 8192];

        for iteration in 1_usize.. {
            trace!("Payload flow iteration #{iteration}");

            // Poll for any pending instruction
            let mut instruction = self.protocol.poll()?;
            if instruction.is_none() {
                // No pending instruction left, wait for more input
                instruction = tokio::select! {
                    // Forward any incoming chunks from the TCP stream
                    _ = self.stream.readable() => {
                        let length = self.try_receive(&mut read_buffer)?;

                        // Add chunks (poll in the next iteration)
                        self.protocol
                            .add_chunks(&[read_buffer.get(..length)
                            .expect("Amount of read bytes should be available")])?;
                        None
                    },

                    // Forward any outgoing payloads
                    Some(outgoing_payload) = queues.outgoing.recv() => {
                        debug!(?outgoing_payload, "Sending payload");
                        Some(self.protocol.create_payload(&outgoing_payload)?)
                    }
                }
            }
            let Some(instruction) = instruction else {
                continue;
            };

            // We do not expect any state updates at this stage
            if let Some(state_update) = instruction.state_update {
                let message = "Unexpected state update after handshake";
                error!(?state_update, message);
                bail!(message)
            }

            // Forward any incoming payload
            if let Some(incoming_payload) = instruction.incoming_payload {
                debug!(?incoming_payload, "Received payload");
                queues.incoming.send(incoming_payload).await?;
            }

            // Send any outgoing frame
            if let Some(frame) = instruction.outgoing_frame {
                self.send(&frame.0).await?;
            }
        }

        Ok(())
    }

    /// Shut down the TCP connection
    #[tracing::instrument(skip_all)]
    async fn shutdown(&mut self) -> anyhow::Result<()> {
        info!("Shutting down TCP connection");
        Ok(self.stream.shutdown().await?)
    }

    /// Send bytes to the server over the TCP connection
    #[tracing::instrument(skip_all, fields(bytes_length = bytes.len()))]
    async fn send(&mut self, bytes: &[u8]) -> anyhow::Result<()> {
        trace!(length = bytes.len(), "Sending bytes");
        self.stream.write_all(bytes).await?;

        Ok(())
    }

    #[tracing::instrument(skip_all)]
    async fn receive_required(&mut self) -> anyhow::Result<Vec<u8>> {
        // Get the minimum amount of bytes we'll need to receive
        let length = self.protocol.next_required_length()?;
        let mut buffer = vec![0; length];
        trace!(?length, "Reading bytes");

        // If there is nothing to read, return immediately
        if length == 0 {
            return Ok(buffer);
        }

        // Read the exact number of bytes required
        let _ = self.stream.read_exact(&mut buffer).await?;

        // Read more if available
        match self.stream.try_read_buf(&mut buffer) {
            Ok(0) => {
                // Remote shut down our reading end. But we still need to process the previously
                // read bytes.
                warn!("TCP reading end closed");
            },
            Ok(length) => {
                trace!(length, "Got additional bytes");
            },
            Err(error) if error.kind() == io::ErrorKind::WouldBlock => {
                trace!("No additional bytes available");
            },
            Err(error) => {
                return Err(error.into());
            },
        }
        debug!(length = buffer.len(), "Received bytes");

        Ok(buffer)
    }

    #[tracing::instrument(skip_all)]
    fn try_receive(&mut self, buffer: &mut [u8]) -> anyhow::Result<usize> {
        match self.stream.try_read(buffer) {
            Ok(0) => {
                // Remote shut down our reading end gracefully.
                //
                // IMPORTANT: An implementation needs to ensure that it stops gracefully by processing any
                // remaining payloads prior to stopping the protocol. This example implementation ensures this
                // by handling all pending instructions prior to polling for more data. The only case we bail
                // is therefore when our instruction queue is already dry.
                bail!("TCP reading end closed")
            },
            Ok(length) => {
                debug!(length, "Received bytes");
                Ok(length)
            },
            Err(error) if error.kind() == io::ErrorKind::WouldBlock => {
                trace!("No bytes to receive");
                Ok(0)
            },
            Err(error) => Err(error.into()),
        }
    }
}

#[tracing::instrument(skip_all)]
async fn run_ping_pong_flow(mut queues: PayloadQueuesForCspPingPong) -> anyhow::Result<()> {
    // Create the echo timer that will trigger an outgoing payload every 10s
    let mut echo_timer = time::interval_at(
        Instant::now()
            .checked_add(Duration::from_secs(10))
            .expect("Oops, apocalypse in 10s"),
        Duration::from_secs(10),
    );

    // Enter ping-pong flow loop
    loop {
        tokio::select! {
            // Send echo-request when the timer fires
            _ = echo_timer.tick() => {
                let echo_request = OutgoingPayload::EchoRequest(
                    EchoPayload("ping".as_bytes().to_owned()));
                info!(?echo_request, "Sending echo request");
                queues.outgoing.send(echo_request).await?;
            },

            // Process incoming payload
            incoming_payload = queues.incoming.recv() => {
                if let Some(incoming_payload) = incoming_payload {
                    info!(?incoming_payload, "Received payload");
                } else {
                    break
                }
            }
        };
    }

    Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Configure logging
    init_stderr_logging(Level::TRACE);

    // Create HTTP client
    let http_client = https_client_builder().build()?;

    // Parse command
    let arguments = CspPingPongCommand::parse();
    let config = FullIdentityConfig::from_options(&http_client, arguments.config).await?;

    // Create payload queues
    let (csp_ping_pong_queues, csp_queues) = {
        let incoming_payload = mpsc::channel(4);
        let outgoing_payload = mpsc::channel(4);
        (
            PayloadQueuesForCspPingPong {
                incoming: incoming_payload.1,
                outgoing: outgoing_payload.0,
            },
            PayloadQueuesForCsp {
                incoming: incoming_payload.0,
                outgoing: outgoing_payload.1,
            },
        )
    };

    // Create CSP protocol and establish a connection
    let (mut csp_runner, client_hello) = CspProtocolRunner::new(
        config
            .minimal
            .common
            .config
            .chat_server_address
            .addresses(config.csp_server_group),
        config
            .csp_context_init()
            .try_into()
            .expect("Configuration should be valid"),
    )
    .await?;

    // Run the handshake flow
    csp_runner.run_handshake_flow(client_hello).await?;

    // Run the protocols
    tokio::select! {
        _ = csp_runner.run_payload_flow(csp_queues) => {},
        _ = run_ping_pong_flow(csp_ping_pong_queues) => {},
        _ = signal::ctrl_c() => {},
    };

    // Shut down
    csp_runner.shutdown().await?;
    Ok(())
}

#[test]
fn verify_cli() {
    use clap::CommandFactory;
    CspPingPongCommand::command().debug_assert();
}

[Dauer der Verarbeitung: 0.22 Sekunden, vorverarbeitet 2026-04-27]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge