Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/neqo-transport/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 10 kB image not shown  

Quelle  events.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// Collecting a list of events relevant to whoever is using the Connection.

use std::{cell::RefCell, collections::VecDeque, rc::Rc};

use neqo_common::event::Provider as EventProvider;
use neqo_crypto::ResumptionToken;

use crate::{
    connection::State,
    quic_datagrams::DatagramTracking,
    stream_id::{StreamId, StreamType},
    AppError, Stats,
};

#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum OutgoingDatagramOutcome {
    DroppedTooBig,
    DroppedQueueFull,
    Lost,
    Acked,
}

#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum ConnectionEvent {
    /// Cert authentication needed
    AuthenticationNeeded,
    /// Encrypted client hello fallback occurred.  The certificate for the
    /// public name needs to be authenticated.
    EchFallbackAuthenticationNeeded {
        public_name: String,
    },
    /// A new uni (read) or bidi stream has been opened by the peer.
    NewStream {
        stream_id: StreamId,
    },
    /// Space available in the buffer for an application write to succeed.
    SendStreamWritable {
        stream_id: StreamId,
    },
    /// New bytes available for reading.
    RecvStreamReadable {
        stream_id: StreamId,
    },
    /// Peer reset the stream.
    RecvStreamReset {
        stream_id: StreamId,
        app_error: AppError,
    },
    /// Peer has sent `STOP_SENDING`
    SendStreamStopSending {
        stream_id: StreamId,
        app_error: AppError,
    },
    /// Peer has acked everything sent on the stream.
    SendStreamComplete {
        stream_id: StreamId,
    },
    /// Peer increased `MAX_STREAMS`
    SendStreamCreatable {
        stream_type: StreamType,
    },
    /// Connection state change.
    StateChange(State),
    /// The server rejected 0-RTT.
    /// This event invalidates all state in streams that has been created.
    /// Any data written to streams needs to be written again.
    ZeroRttRejected,
    ResumptionToken(ResumptionToken),
    Datagram(Vec<u8>),
    OutgoingDatagramOutcome {
        id: u64,
        outcome: OutgoingDatagramOutcome,
    },
    IncomingDatagramDropped,
}

#[derive(Debug, Default, Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct ConnectionEvents {
    events: Rc<RefCell<VecDeque<ConnectionEvent>>>,
}

impl ConnectionEvents {
    pub fn authentication_needed(&self) {
        self.insert(ConnectionEvent::AuthenticationNeeded);
    }

    pub fn ech_fallback_authentication_needed(&self, public_name: String) {
        self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name });
    }

    pub fn new_stream(&self, stream_id: StreamId) {
        self.insert(ConnectionEvent::NewStream { stream_id });
    }

    pub fn recv_stream_readable(&self, stream_id: StreamId) {
        self.insert(ConnectionEvent::RecvStreamReadable { stream_id });
    }

    pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) {
        // If reset, no longer readable.
        self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));

        self.insert(ConnectionEvent::RecvStreamReset {
            stream_id,
            app_error,
        });
    }

    pub fn send_stream_writable(&self, stream_id: StreamId) {
        self.insert(ConnectionEvent::SendStreamWritable { stream_id });
    }

    pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) {
        // If stopped, no longer writable.
        self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id));

        self.insert(ConnectionEvent::SendStreamStopSending {
            stream_id,
            app_error,
        });
    }

    pub fn send_stream_complete(&self, stream_id: StreamId) {
        self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id));

        self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id.as_u64()));

        self.insert(ConnectionEvent::SendStreamComplete { stream_id });
    }

    pub fn send_stream_creatable(&self, stream_type: StreamType) {
        self.insert(ConnectionEvent::SendStreamCreatable { stream_type });
    }

    pub fn connection_state_change(&self, state: State) {
        // If closing, existing events no longer relevant.
        match state {
            State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(),
            _ => (),
        }
        self.insert(ConnectionEvent::StateChange(state));
    }

    pub fn client_resumption_token(&self, token: ResumptionToken) {
        self.insert(ConnectionEvent::ResumptionToken(token));
    }

    pub fn client_0rtt_rejected(&self) {
        // If 0rtt rejected, must start over and existing events are no longer
        // relevant.
        self.events.borrow_mut().clear();
        self.insert(ConnectionEvent::ZeroRttRejected);
    }

    pub fn recv_stream_complete(&self, stream_id: StreamId) {
        // If stopped, no longer readable.
        self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
    }

    // The number of datagrams in the events queue is limited to max_queued_datagrams.
    // This function ensure this and deletes the oldest datagrams if needed.
    fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) {
        let mut q = self.events.borrow_mut();
        let mut remove = None;
        if q.iter()
            .filter(|evt| matches!(evt, ConnectionEvent::Datagram(_)))
            .count()
            == max_queued_datagrams
        {
            if let Some(d) = q
                .iter()
                .rev()
                .enumerate()
                .filter(|(_, evt)| matches!(evt, ConnectionEvent::Datagram(_)))
                .take(1)
                .next()
            {
                remove = Some(d.0);
            }
        }
        if let Some(r) = remove {
            q.remove(r);
            q.push_back(ConnectionEvent::IncomingDatagramDropped);
            stats.incoming_datagram_dropped += 1;
        }
    }

    pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) {
        self.check_datagram_queued(max_queued_datagrams, stats);
        self.events
            .borrow_mut()
            .push_back(ConnectionEvent::Datagram(data.to_vec()));
    }

    pub fn datagram_outcome(
        &self,
        dgram_tracker: &DatagramTracking,
        outcome: OutgoingDatagramOutcome,
    ) {
        if let DatagramTracking::Id(id) = dgram_tracker {
            self.events
                .borrow_mut()
                .push_back(ConnectionEvent::OutgoingDatagramOutcome { id: *id, outcome });
        }
    }

    fn insert(&self, event: ConnectionEvent) {
        let mut q = self.events.borrow_mut();

        // Special-case two enums that are not strictly PartialEq equal but that
        // we wish to avoid inserting duplicates.
        let already_present = match &event {
            ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| {
                matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. }
                      if *x == *stream_id)
            }),
            ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| {
                matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. }
                      if *x == *stream_id)
            }),
            _ => q.contains(&event),
        };
        if !already_present {
            q.push_back(event);
        }
    }

    fn remove<F>(&self, f: F)
    where
        F: Fn(&ConnectionEvent) -> bool,
    {
        self.events.borrow_mut().retain(|evt| !f(evt));
    }
}

impl EventProvider for ConnectionEvents {
    type Event = ConnectionEvent;

    fn has_events(&self) -> bool {
        !self.events.borrow().is_empty()
    }

    fn next_event(&mut self) -> Option<Self::Event> {
        self.events.borrow_mut().pop_front()
    }
}

#[cfg(test)]
mod tests {
    use neqo_common::event::Provider;

    use crate::{CloseReason, ConnectionEvent, ConnectionEvents, Error, State, StreamId};

    #[test]
    fn event_culling() {
        let mut evts = ConnectionEvents::default();

        evts.client_0rtt_rejected();
        evts.client_0rtt_rejected();
        assert_eq!(evts.events().count(), 1);
        assert_eq!(evts.events().count(), 0);

        evts.new_stream(4.into());
        evts.new_stream(4.into());
        assert_eq!(evts.events().count(), 1);

        evts.recv_stream_readable(6.into());
        evts.recv_stream_reset(6.into(), 66);
        evts.recv_stream_reset(6.into(), 65);
        assert_eq!(evts.events().count(), 1);

        evts.send_stream_writable(8.into());
        evts.send_stream_writable(8.into());
        evts.send_stream_stop_sending(8.into(), 55);
        evts.send_stream_stop_sending(8.into(), 56);
        let events = evts.events().collect::<Vec<_>>();
        assert_eq!(events.len(), 1);
        assert_eq!(
            events[0],
            ConnectionEvent::SendStreamStopSending {
                stream_id: StreamId::new(8),
                app_error: 55
            }
        );

        evts.send_stream_writable(8.into());
        evts.send_stream_writable(8.into());
        evts.send_stream_stop_sending(8.into(), 55);
        evts.send_stream_stop_sending(8.into(), 56);
        evts.send_stream_complete(8.into());
        assert_eq!(evts.events().count(), 1);

        evts.send_stream_writable(8.into());
        evts.send_stream_writable(9.into());
        evts.send_stream_stop_sending(10.into(), 55);
        evts.send_stream_stop_sending(11.into(), 56);
        evts.send_stream_complete(12.into());
        assert_eq!(evts.events().count(), 5);

        evts.send_stream_writable(8.into());
        evts.send_stream_writable(9.into());
        evts.send_stream_stop_sending(10.into(), 55);
        evts.send_stream_stop_sending(11.into(), 56);
        evts.send_stream_complete(12.into());
        evts.client_0rtt_rejected();
        assert_eq!(evts.events().count(), 1);

        evts.send_stream_writable(9.into());
        evts.send_stream_stop_sending(10.into(), 55);
        evts.connection_state_change(State::Closed(CloseReason::Transport(
            Error::StreamStateError,
        )));
        assert_eq!(evts.events().count(), 1);
    }
}

[ Dauer der Verarbeitung: 0.2 Sekunden  (vorverarbeitet)  ]