Overview
MPSC_PBUF (Multi Producer Single Consumer Packet Buffer) is a circular buffer where data in MPSC_PBUF is managed in packets, and the size of the packets is variable. Users read or store data from MPSC_PBUF in packet units. To reduce data copying in memory, data production is divided into two steps: allocating packet space and filling data to commit. Consuming data packets is also divided into two steps: the consumer requests a data packet, uses the data, and releases the data packet. The key features of MPSC are as follows:
-
Allocation + Commit for Production -
Request + Release for Consumption -
The allocator ensures that the allocated space is contiguous -
When allocating space, a wait timeout can be specified -
Handling strategies when space cannot be allocated without waiting timeout: -
Overwrite: Discard the oldest data packet, a callback will occur during the discard -
Do not overwrite: Directly return failure
Although MPSC also uses a circular buffer, the difference is that MPSC processes data in packet units within the circular buffer. Therefore, in addition to having a first-in-first-out characteristic, it can support multiple producers due to the clear boundaries between each packet. In Zephyr, the log system uses MPSC, where different modules act as multiple producers generating log information, and the log system acts as a consumer, sequentially consuming these logs for sequential display.
Usage
The API definitions for MPSC are referenced in the following two files:
-
zephyr/include/zephyr/sys/mpsc_packet.h
-
zephyr/include/zephyr/sys/mpsc_pbuf.h
Defining Packet Structure
MPSC manages packets, requiring the first 2 bits of the occupied packet to be used as management bits. In the code, MPSC_PBUF_HDR
is used to reserve the packet management bits for MPSC, defining the user packet structure.
struct mpsc_sample_header{
MPSC_PBUF_HDR;
uint32_t length: 32 - MPSC_PBUF_HDR_BITS;
};
struct mpsc_sample_packet{
struct mpsc_sample_header hdr;
uint8_t data[];
}
// For mpsc, only the union mpsc_pbuf_generic will be operated, so it is defined as a union for easy operation
union mpsc_packet{
union mpsc_pbuf_generic hdr;
struct mpsc_sample_packet packet;
}
Initializing MPSC
During the entire lifecycle of MPSC, a unique struct mpsc_pbuf_buffer
represents an MPSC, which is managed by MPSC itself. During initialization, it is configured using struct mpsc_pbuf_buffer_config
, as follows:
struct mpsc_pbuf_buffer_config {
// The buf used by MPSC, all packets allocate memory from this buf
uint32_t *buf;
// The size of the MPSC buf
uint32_t size;
// When using overwrite strategy, this callback will be called when a packet is discarded
mpsc_pbuf_notify_drop notify_drop;
// Get the size of the packet
mpsc_pbuf_get_wlen get_wlen;
// Configuration parameters
uint32_t flags;
};
It is necessary to provide a callback function for MPSC when a packet is dropped. This function will be called during packet dropping, allowing you to filter out the packets that must be processed. In the example, we only log a message without processing.
static void drop ( const struct mpsc_pbuf_buffer *buffer, const union mpsc_pbuf_generic *item )
{
struct mpsc_sample_header *t_item = ( struct mpsc_sample_header * ) item;
LOG_INF ( "drop packet length %d", t_item->length );
}
Aside from MPSC_PBUF_HDR
, MPSC does not concern itself with or access any data in the header, so a callback for getting the length must be registered for MPSC to internally obtain the packet length.
static uint32_t get_wlen ( const union mpsc_pbuf_generic *item )
{
struct mpsc_sample_header *t_item = ( struct mpsc_sample_header * ) item;
LOG_INF ( "get_wlen %d", t_item->length );
return t_item->length;
}
Execute initialization:
// MPSC's packet buffer
static uint32_t buf [128];
// mpsc manager
struct mpsc_pbuf_buffer buffer;
static int mpsc_sample_cmd ( const struct shell *shell, size_t argc, char **argv )
{
// Initialize mpsc_buf_cfg, set buffer and callback, overwrite when space cannot be allocated
struct mpsc_pbuf_buffer_config mpsc_buf_cfg = {
.buf = buf32,
.size = ARRAY_SIZE ( buf32 ),
.notify_drop = drop,
.get_wlen = get_wlen,
.flags = MPSC_PBUF_MODE_OVERWRITE;
};
mpsc_pbuf_init ( &buffer, &mpsc_buf_cfg );
}
The flags
for initialization can be configured by the user as follows:
-
MPSC_PBUF_MODE_OVERWRITE
: Discard the oldest packet when space cannot be allocated -
MPSC_PBUF_MAX_UTILIZATION
: Record the maximum utilization of the MPSC buffer, generally for debugging purposes
Producer
The producer generates data:
-
Request space from MPSC mpsc_pbuf_alloc
-
Fill the data packet -
Commit the data packet mpsc_pbuf_commit
It is important to note that the space requested from MPSC includes the header space. If you want to send a packet with 32 bytes of net data through MPSC, you need to request 32 + sizeof ( mpsc_sample_header )
bytes of space.
// Pass 32 bytes of net data
uint32_t packet_len = 32 + sizeof ( mpsc_sample_header );
// Request space from MPSC
union mpsc_packet *packet = mpsc_pbuf_alloc ( &buffer, packet_len, K_NO_WAIT );
// Fill data
memset ( packet->packet.data, 0x5a, 32 );
// Commit data
mpsc_pbuf_commit ( &buffer, packet );
The characteristics of mpsc_pbuf_alloc
are:
-
Sufficient buffer space: Successfully allocated packet space -
Insufficient buffer space: -
With wait timeout: Wait for the consumer to release packet space within the timeout. If the released packet space is less than the requested space, it will wait again (this may lead to actual wait time being several times longer than the specified wait time) -
No wait timeout: When MPSC_PBUF_MODE_OVERWRITE
is specified, it will discard the oldest packet to free space for the new packet. IfMPSC_PBUF_MODE_OVERWRITE
is not specified, the request for space will fail.
Consumer
The consumer consumes data:
-
Request a data packet from MPSC mpsc_pbuf_claim
-
Process the data -
Notify MPSC to free the data packet mpsc_pbuf_free
// Request a data packet
union mpsc_packet *packet_read = mpsc_pbuf_claim ( &buffer );
if ( packet_read ) {
// Process the data packet
shell_print ( shell, "Dump packet[%d]:", packet_read->packet.hdr.length );
shell_hexdump ( shell, ( const uint8_t* ) packet_read->packet.data, packet_read->packet.hdr.length );
// Free the data packet
mpsc_pbuf_free ( &buffer, packet_read );
}
Other APIs
In addition to the previous 5 APIs, MPSC also provides other auxiliary APIs.
The following three APIs simplify the producer’s operations, eliminating the need to separate the request and commit into two steps. However, it should be noted that when the buffer in MPSC is insufficient and MPSC_PBUF_MODE_OVERWRITE
is not specified, none of the following three APIs will successfully place data into MPSC. Before calling these three APIs, you can first confirm space availability using mpsc_pbuf_get_utilization
.
// Write a single word (4-byte data) to the buffer word->raw
void mpsc_pbuf_put_word ( struct mpsc_pbuf_buffer *buffer,
const union mpsc_pbuf_generic word );
// Write a single word (4-byte data) to the buffer word->raw and a data pointer
void mpsc_pbuf_put_word_ext ( struct mpsc_pbuf_buffer *buffer,
const union mpsc_pbuf_generic word,
const void *data );
// Write data of length wlen pointing to data to the buffer
void mpsc_pbuf_put_data ( struct mpsc_pbuf_buffer *buffer,
const uint32_t *data, size_t wlen );
The following three APIs are for obtaining the status of MPSC:
// Returns whether there are available packets in MPSC
bool mpsc_pbuf_is_pending ( struct mpsc_pbuf_buffer *buffer );
// MPSC buffer utilization, now indicates used, size indicates total.
void mpsc_pbuf_get_utilization ( struct mpsc_pbuf_buffer *buffer,
uint32_t *size, uint32_t *now );
// max indicates the maximum utilization of the MPSC buffer. If MPSC is initialized without specifying MPSC_PBUF_MAX_UTILIZATION, it will return -ENOTSUP
int mpsc_pbuf_get_max_utilization ( struct mpsc_pbuf_buffer *buffer, uint32_t *max );
References
https://docs.zephyrproject.org/latest/kernel/data_structures/mpsc_pbuf.html