/* The structure of the shared state of the rings are a simple * circular buffer, as outlined in * Documentation/core-api/circular-buffers.rst. For the Rx and * completion ring, the kernel is the producer and user space is the * consumer. For the Tx and fill rings, the kernel is the consumer and * user space is the producer. * * producer consumer * * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C) * STORE $data LOAD $data * STORE.rel ->producer (B) STORE.rel ->consumer (D) * } * * (A) pairs with (D), and (B) pairs with (C). * * Starting with (B), it protects the data from being written after * the producer pointer. If this barrier was missing, the consumer * could observe the producer pointer being set and thus load the data * before the producer has written the new data. The consumer would in * this case load the old data. * * (C) protects the consumer from speculatively loading the data before * the producer pointer actually has been read. If we do not have this * barrier, some architectures could load old data as speculative loads * are not discarded as the CPU does not know there is a dependency * between ->producer and data. * * (A) is a control dependency that separates the load of ->consumer * from the stores of $data. In case ->consumer indicates there is no * room in the buffer to store $data we do not. The dependency will * order both of the stores after the loads. So no barrier is needed. * * (D) protects the load of the data to be observed to happen after the * store of the consumer pointer. If we did not have this memory * barrier, the producer could observe the consumer pointer being set * and overwrite the data with a new value before the consumer got the * chance to read the old value. The consumer would thus miss reading * the old entry and very likely read the new entry twice, once right * now and again after circling through the ring.
*/
/* The operations on the rings are the following: * * producer consumer * * RESERVE entries PEEK in the ring for entries * WRITE data into the ring READ data from the ring * SUBMIT entries RELEASE entries * * The producer reserves one or more entries in the ring. It can then * fill in these entries and finally submit them so that they can be * seen and read by the consumer. * * The consumer peeks into the ring to see if the producer has written * any new entries. If so, the consumer can then read these entries * and when it is done reading them release them back to the producer * so that the producer can use these slots to fill in new entries. * * The function names below reflect these operations.
*/
/* Functions that read and validate content from consumer rings. */
/* Can overflow if desc->addr < pool->tx_metadata_len */ if (check_sub_overflow(desc->addr, pool->tx_metadata_len, &addr)) returnfalse;
offset = addr & (pool->chunk_size - 1);
/* * Can't overflow: @offset is guaranteed to be < ``U32_MAX`` * (pool->chunk_size is ``u32``), @len is guaranteed * to be <= ``U32_MAX``.
*/ if (offset + len + pool->tx_metadata_len > pool->chunk_size) returnfalse;
if (addr >= pool->addrs_cnt) returnfalse;
if (xp_unused_options_set(desc->options)) returnfalse;
/* Can't overflow: @len is guaranteed to be <= ``U32_MAX`` */
len += pool->tx_metadata_len; if (len > pool->chunk_size) returnfalse;
/* Can overflow if desc->addr is close to 0 */ if (check_sub_overflow(xp_unaligned_add_offset_to_addr(desc->addr),
pool->tx_metadata_len, &addr)) returnfalse;
if (addr >= pool->addrs_cnt) returnfalse;
/* Can overflow if pool->addrs_cnt is high enough */ if (check_add_overflow(addr, len, &end) || end > pool->addrs_cnt) returnfalse;
if (xp_desc_crosses_non_contig_pg(pool, addr, len)) returnfalse;
if (xp_unused_options_set(desc->options)) returnfalse;
/* To improve performance in the xskq_cons_release functions, only update local state here. * Reflect this to global state when we get new entries from the ring in * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
*/ staticinlinevoid xskq_cons_release(struct xsk_queue *q)
{
q->cached_cons++;
}
staticinline u32 xskq_cons_present_entries(struct xsk_queue *q)
{ /* No barriers needed since data is not accessed */ return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer);
}
/* A, matches D */
cached_prod = q->cached_prod; for (i = 0; i < nb_entries; i++)
ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
q->cached_prod = cached_prod;
}
staticinlinebool xskq_prod_is_empty(struct xsk_queue *q)
{ /* No barriers needed since data is not accessed */ return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.