lib<mpsc> 0.1.1
A C POSIX multi-thread based multiple producers, single consumer (MPSC) channel library
Loading...
Searching...
No Matches
Data Structures | Typedefs | Enumerations | Functions
mpsc.h File Reference
#include <stdbool.h>
#include <stddef.h>

Go to the source code of this file.

Data Structures

struct  mpsc_create_params_t
 

Typedefs

typedef struct mpsc_s mpsc_t
 
typedef struct mpsc_consumer_s mpsc_consumer_t
 
typedef struct mpsc_producer_s mpsc_producer_t
 
typedef void() mpsc_producer_thread_callback_t(mpsc_producer_t *producer)
 
typedef void() mpsc_consumer_callback_t(mpsc_consumer_t *consumer, void *data, size_t n, bool closed)
 
typedef void() mpsc_consumer_error_callback_t(mpsc_consumer_t *consumer)
 

Enumerations

enum  mpsc_register_producer_error_t { MPSC_REGISTER_PRODUCER_ERROR_NONE = 0 , MPSC_REGISTER_PRODUCER_ERROR_CLOSED = 1 , MPSC_REGISTER_PRODUCER_ERROR_N_MAX_PRODUCERS_REACHED = 2 , MPSC_REGISTER_PRODUCER_ERROR_EAGAIN = 3 }
 

Functions

mpsc_tmpsc_create (mpsc_create_params_t params)
 
void mpsc_join (mpsc_t *self)
 
mpsc_register_producer_error_t mpsc_register_producer (mpsc_t *self, mpsc_producer_thread_callback_t callback, void *context)
 
mpsc_register_producer_error_t mpsc_consumer_register_producer (mpsc_consumer_t *self, mpsc_producer_thread_callback_t callback, void *context)
 
void mpsc_consumer_close (mpsc_consumer_t *self)
 
bool mpsc_producer_ping (mpsc_producer_t *self)
 
bool mpsc_producer_send (mpsc_producer_t *self, void *data, size_t n)
 
bool mpsc_producer_send_empty (mpsc_producer_t *self)
 
voidmpsc_producer_context (mpsc_producer_t *self)
 
mpsc_register_producer_error_t mpsc_producer_register_producer (mpsc_producer_t *self, mpsc_producer_thread_callback_t callback, void *context)
 

Data Structure Documentation

◆ mpsc_create_params_t

struct mpsc_create_params_t

The structure that must be passed to mpsc_create to instantiate a new mpsc_t object.

See also
mpsc_create
Data Fields
size_t buffer_size The size (in bytes) of the internal buffer used to transfer a message between a producer and the consumer.
Note
- This acts as the upper bound of a message size for the mpsc_t instance. Calling mpsc_producer_send with a message of size exceeding this value will cause the process to be terminated, regardless of error_handling_enabled's value.
- This value can be set to 0 if the application only requires empty messages, or if it does not need to send messages at all.
Warning
In this context, the argument name buffer_size should not be confused with the term buffer in the sense of a "buffered channel", for which the term could correspond to the number of messages that can be held inside an internal message queue to be delivered to the consumer. Currently, this library only allows sending one message at the time, in a blocking manner. In other words, the mpsc_producer_send and mpsc_producer_send_empty functions are blocking functions.
mpsc_consumer_callback_t * consumer_callback The application defined consumer callback function to be used to received messages for the mpsc_t instance.
mpsc_consumer_error_callback_t * consumer_error_callback An optional, application defined producer thread callback function used, when error_handling_enabled = true, to deliver potential internal errors to the application, in which case errno should be checked for more information.
Note
As explained in the mpsc_consumer_error_callback_t documentation, the only possible such error at the moment is ENOMEM , which could arise from a failed internal call to malloc to allocate space where to copy the message from the internal buffer before passing that new memory to the consumer_callback function's data argument.
bool create_and_join_thread_safety_disabled A boolean value that can be used to disable the safety feature that prevents mpsc_create and mpsc_join to be called on distinct threads for the same mpsc_t instance.
bool error_handling_enabled A boolean value indicating whether (true) error handling should be handed over to the application or whether (false) errors should be printed to stderr and the process be terminated.
Note
Not all errors can be handled, but those that can may occur at three different levels:
  1. When calling mpsc_create (the returned value could be the NULL pointer, in which case errno should be checked for details about the error, which will be related to resources exhaustion; i.e., ENOMEM or EAGAIN ).
  2. When calling mpsc_register_producer, as well as the mpsc_consumer_register_producer and mpsc_producer_register_producer aliases (which will allow the mpsc_register_producer_error_t error of type MPSC_REGISTER_PRODUCER_ERROR_EAGAIN to be returned, instead of terminating the process).
  3. As a result of the internal consumer thread not being able to allocate memory needed to hold a copy of the message temporarily stored inside the internal buffer by a producer for delivery to the consumer callback. Such an error will be communicated to the application through the consumer_error_callback when error_handling_enabled = true, else will result in the process being terminated.
size_t n_max_producers The maximum number of producers that can be registered on the mpsc_t instance.
Note
- This value must be greater than 0, else the process will be terminated.
- Calling mpsc_register_producer when this value has been reached will result in a mpsc_register_producer_error_t error of type MPSC_REGISTER_PRODUCER_ERROR_N_MAX_PRODUCERS_REACHED .

Typedef Documentation

◆ mpsc_consumer_callback_t

typedef void() mpsc_consumer_callback_t(mpsc_consumer_t *consumer, void *data, size_t n, bool closed)

The signature of the consumer callback function, to be declared and implemented by the application, which is passed as a parameter to the mpsc_create function when instantiating a new channel.

Parameters
consumerA pointer to a mpsc_consumer_t instance for which the callback is being executed.
dataA pointer to dynamically allocated memory containing the message sent by a producer to the consumer.
nThe size (in bytes) of data .
closedA boolean value indicating whether the channel (i.e., the mpsc_t instance) has been marked as closed, in which case the callback won't be called again for the given mpsc_t instance.
See also
mpsc_create, mpsc_create_params_t
Warning
When n is non-zero, data refers to dynamically allocated memory that is the responsibility of the callback. In other words, as soon as data is no longer needed, it should be freed using free , else memory will be leaked.
Note
- There are two scenarios that can cause the closed argument to be true: (1) the mpsc_consumer_close function was called on consumer from inside the callback or (2) the mpsc_join function has been called on the channel object and all producer threads have returned.
- The callback should return as quick as possible to avoid blocking the consumer thread. Blocking tasks, if required, should be offloaded to other threads and the callback should return as quick as possible.
- When n is set to 0, data will be set to NULL .

◆ mpsc_consumer_error_callback_t

typedef void() mpsc_consumer_error_callback_t(mpsc_consumer_t *consumer)

The signature of an optional consumer error callback function, to be declared and implemented by the application, which is passed as a parameter to the mpsc_create function when instantiating a new channel.

See also
mpsc_create, mpsc_create_params_t
Note
- This callback is ignored when mpsc_create_params_t 's error_handling_enabled is set to false, in which case, when an error occurs, information about that error is printed to stderr and the process is terminated.
- When executed for consumer (i.e., the specific mpsc_consumer_t instance), the application should look at errno for information about the encountered error. Currently, the only possible error is ENOMEM, which arises from a failed internal call to malloc .

◆ mpsc_producer_thread_callback_t

typedef void() mpsc_producer_thread_callback_t(mpsc_producer_t *producer)

The signature of the producer thread callback function, to be declared and implemented by the application, which is passed as a parameter to the mpsc_register_producer function when registering a producer for a given mpsc_t instance.

Parameters
producerA pointer to a mpsc_producer_t instance for which the callback is being executed.
See also
mpsc_register_producer, mpsc_consumer_register_producer, mpsc_producer_register_producer

◆ mpsc_t

typedef struct mpsc_s mpsc_t

An opaque data type used as a container for the MPSC channel data.

See also
mpsc_create, mpsc_join
Example:
/*
Copyright (c) 2024 BB-301 <fw3dg3@gmail.com>
[Official repository](https://github.com/BB-301/c-mpsc)
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the “Software”), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
/*
======================
Example: Quick example
======================
This is a quick "getting started" example used to illustrate
the basic structure of a program using lib<mpsc>. This example
is featured in the [repository](https://github.com/BB-301/c-mpsc)'s
README.md file.
*/
#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include "mpsc.h"
#define IGNORE_UNUSED(m) ((void)(m))
#define TEXT_BUFFER_SIZE (100)
#define N_PRODUCERS (8)
static void my_consumer_callback(mpsc_consumer_t *consumer, void *data, size_t n, bool closed);
static void my_producer_thread_callback(mpsc_producer_t *producer);
struct my_message
{
char text[TEXT_BUFFER_SIZE];
};
struct my_producer_thread_callback_context
{
size_t id;
};
int main(void)
{
.buffer_size = sizeof(struct my_message),
.n_max_producers = N_PRODUCERS,
.consumer_callback = my_consumer_callback,
.consumer_error_callback = NULL,
.error_handling_enabled = false,
.create_and_join_thread_safety_disabled = false,
});
struct my_producer_thread_callback_context contexts[N_PRODUCERS];
for (size_t i = 0; i < N_PRODUCERS; i++)
{
contexts[i].id = i + 1;
assert(mpsc_register_producer(mpsc, my_producer_thread_callback, &contexts[i]) == MPSC_REGISTER_PRODUCER_ERROR_NONE);
}
mpsc_join(mpsc);
exit(EXIT_SUCCESS);
}
static void my_consumer_callback(mpsc_consumer_t *consumer, void *data, size_t n, bool closed)
{
IGNORE_UNUSED(consumer);
static size_t counter = 0;
counter += 1;
if (closed)
{
fprintf(stdout, "[consumer:%zu] closed\n", counter);
return;
}
if (n != sizeof(struct my_message))
{
fprintf(stderr, "[consumer] Error: Unexpected message size\n");
exit(EXIT_FAILURE);
}
struct my_message *message = data;
fprintf(stdout, "[consumer:%zu] %s\n", counter, message->text);
free(data);
}
static void my_producer_thread_callback(mpsc_producer_t *producer)
{
struct my_producer_thread_callback_context *ctx = mpsc_producer_context(producer);
struct my_message message;
sprintf(message.text, "Hello from producer #%zu!", ctx->id);
assert(mpsc_producer_send(producer, &message, sizeof(struct my_message)));
}
free free(void *ptr)
mpsc_t * mpsc_create(mpsc_create_params_t params)
The function used to create a new channel instance (i.e., a mpsc_t instance).
mpsc_register_producer_error_t mpsc_register_producer(mpsc_t *self, mpsc_producer_thread_callback_t callback, void *context)
The function used to register a new producer for self .
struct mpsc_producer_s mpsc_producer_t
An opaque data type used as a container for a MPSC channel's producer.
Definition: mpsc.h:86
struct mpsc_consumer_s mpsc_consumer_t
An opaque data type used as a container for the MPSC channel's consumer.
Definition: mpsc.h:80
void mpsc_join(mpsc_t *self)
The function that must be called on self to wait for the channel close.
bool mpsc_producer_send(mpsc_producer_t *self, void *data, size_t n)
The function used (from inside a producer thread callback function) to send a message to the channel'...
void * mpsc_producer_context(mpsc_producer_t *self)
A function that can be used from inside the producer thread callback function to retrieve the applica...
@ MPSC_REGISTER_PRODUCER_ERROR_NONE
The producer was successfully registered.
Definition: mpsc.h:47
struct mpsc_s mpsc_t
An opaque data type used as a container for the MPSC channel data.
Definition: mpsc.h:74
The structure that must be passed to mpsc_create to instantiate a new mpsc_t object.
Definition: mpsc.h:145
NULL NULL
FILE stdout

Enumeration Type Documentation

◆ mpsc_register_producer_error_t

The type returned by mpsc_register_producer (as well as by its aliases; i.e., mpsc_producer_register_producer and mpsc_consumer_register_producer ) when trying to register a new producer on a mpsc_t instance, and which indicates whether an error occurred.

See also
mpsc_register_producer, mpsc_consumer_register_producer, mpsc_producer_register_producer
Enumerator
MPSC_REGISTER_PRODUCER_ERROR_NONE 

The producer was successfully registered.

MPSC_REGISTER_PRODUCER_ERROR_CLOSED 

The producer could not be registered because the mpsc_t instance has internally been marked as closed (i.e., the channel has been closed).

MPSC_REGISTER_PRODUCER_ERROR_N_MAX_PRODUCERS_REACHED 

The producer could not be registered because the maximum number of producers allowed (i.e., n_max_producers) in the mpsc_t instance has been reached.

MPSC_REGISTER_PRODUCER_ERROR_EAGAIN 

The producer could not be registered because a EAGAIN error was observed when, internally, while trying to create a new thread using pthread_create .

Note
When mpsc_create_params_t 's error_handling_enabled is set to false, this error will not be returned and will instead result in the process being terminated.

Function Documentation

◆ mpsc_consumer_close()

void mpsc_consumer_close ( mpsc_consumer_t self)

A function that can be used (from inside the application defined consumer callback implementing mpsc_consumer_callback_t ) on the consumer object self to notify the channel's internal consumer thread that it should return.

Parameters
selfA pointer to a mpsc_consumer_t instance whose parent object (a mpsc_t instance) should be marked as closed.
Note
- Before returning, the internal consumer thread will call the mpsc_consumer_callback_t one last time with the closed argument marked as true.
- The mpsc_consumer_callback_t could also receive a call with closed = true if all producer threads have returned and mpsc_join has been called.

◆ mpsc_consumer_register_producer()

mpsc_register_producer_error_t mpsc_consumer_register_producer ( mpsc_consumer_t self,
mpsc_producer_thread_callback_t  callback,
void context 
)

An alias for mpsc_register_producer , but which is used on an object of type mpsc_consumer_t , to try to register a producer for self 's parent channel object.

Parameters
selfA pointer to the mpsc_consumer_t object for whose parent (i.e., a mpsc_t instance) to register a new producer.
callbackAn application defined thread callback function, which conforms to the mpsc_producer_thread_callback_t interface, to be used by the producer.
contextAn application defined context object that can be retrieved from inside callback by calling the mpsc_producer_context function on the callback 's mpsc_producer_t argument.
Returns
mpsc_register_producer_error_t A value used to report a potential error with the call. Please read the documentation for mpsc_register_producer_error_t for more information about the potential errors. A successful call will return MPSC_REGISTER_PRODUCER_ERROR_NONE .
See also
mpsc_register_producer, mpsc_producer_register_producer
Note
This function exists for situations in which a consumer would like to register other producers to the same channel.

◆ mpsc_create()

mpsc_t * mpsc_create ( mpsc_create_params_t  params)

The function used to create a new channel instance (i.e., a mpsc_t instance).

Parameters
paramsThe instance's configurations. (Note: See documentation for mpsc_create_params_t for the details).
Returns
mpsc_t* A pointer to the created mpsc_t object, or the NULL pointer if an error occurred during instantiation.
Note
If an error occurs when mpsc_create_params_t 's error_handling_enabled = false, information about the error will be printed to stderr and the process will be terminated, in which case the returned value doesn't need assertion. If, on the other hand, error_handling_enabled = true, then NULL will be returned and a handleable error will be available on errno . It should be noted, however, that not all errors can be handled by the application: some errors will always, regardless of error_handling_enabled's value, result in the process being terminated. Currently, the only errors that can be handled by the application are those related to resources exhaustion (i.e., ENOMEM or EAGAIN ), which, internally, can occur when calling malloc , pthread_mutex_init , pthread_cond_init , or pthread_create .

◆ mpsc_join()

void mpsc_join ( mpsc_t self)

The function that must be called on self to wait for the channel close.

Note
- Internally, this function will join the internal consumer thread. Once joined, it will set the internal channel state to closed, and then will join all registered producer threads. Once all internal threads have been joined, self 's internal resources will be destroyed and the memory freed.
- In most applications, this function should be called on the same thread as the thread that was used to instantiate self (i.e., the mpsc_t object).
Parameters
selfA pointer to the mpsc_t instance to be joined.

◆ mpsc_producer_context()

void * mpsc_producer_context ( mpsc_producer_t self)

A function that can be used from inside the producer thread callback function to retrieve the application defined context object passed to mpsc_register_producer when the producer was registered.

Parameters
selfA pointer to the mpsc_producer_t instance for which to retrieve the application defined context (i.e., the user context).
Returns
void* A pointer to arbitrary memory defined by the application, which contains the "user context object".

◆ mpsc_producer_ping()

bool mpsc_producer_ping ( mpsc_producer_t self)

A function that can be used from inside a producer thread callback to check whether the channel to which self belongs is still opened.

Parameters
selfA pointer to the mpsc_producer_t instance for which to check whether the channel is still open.
Returns
bool A boolean value indicating whether the underlying channel is still open (true) or whether it has been marked as closed (false).
Note
A producer thread callback function should return when its channel has been marked as closed, to avoid making the application's call to mpsc_join hang.
See also
mpsc_producer_send, mpsc_producer_send_empty

◆ mpsc_producer_register_producer()

mpsc_register_producer_error_t mpsc_producer_register_producer ( mpsc_producer_t self,
mpsc_producer_thread_callback_t  callback,
void context 
)

An alias for mpsc_register_producer , but which is used on an object of type mpsc_producer_t , to try to register a producer for self 's parent channel object.

Parameters
selfA pointer to the mpsc_producer_t object for whose parent (i.e., a mpsc_t instance) to register a new producer.
callbackAn application defined thread callback function, which conforms to the mpsc_producer_thread_callback_t interface, to be used by the producer.
contextAn application defined context object that can be retrieved from inside callback by calling the mpsc_producer_context function on the callback 's mpsc_producer_t argument.
Returns
mpsc_register_producer_error_t A value used to report a potential error with the call. Please read the documentation for mpsc_register_producer_error_t for more information about the potential errors. A successful call will return MPSC_REGISTER_PRODUCER_ERROR_NONE .
See also
mpsc_register_producer, mpsc_producer_register_producer
Note
This function exists for situations in which a producer would like to register other producers to the same channel.

◆ mpsc_producer_send()

bool mpsc_producer_send ( mpsc_producer_t self,
void data,
size_t  n 
)

The function used (from inside a producer thread callback function) to send a message to the channel's consumer.

Parameters
selfA pointer to the mpsc_producer_t instance for which to send a message down the underlying channel, to be delivered to the consumer.
dataA pointer to arbitrary bytes ( n bytes) to be sent to the channel's consumer.
nthe message size, in bytes.
Returns
bool A boolean value indicating whether the message was accepted or not. false means that the channel has been marked as closed and that, as a consequence, the message could not be delivered. true means that the message was successfully copied to the internal buffer and will eventually be picked up and copied by the internal consumer thread for delivery to the consumer callback.
Note
- The message size (i.e., n ) should never be greater than mpsc_create_params_t 's the buffer_size parameter value (which was specified when creating the channel using mpsc_create ). If n > buffer_size, an error message will be printed to stderr and the process will be terminated.
- It is possible to send en empty message using data = NULL and n = 0, although there is a function called mpsc_producer_send_empty that can be used for that purpose.
- The n bytes from data are temporarily copied to the internal buffer. So, if the producer thread callback function implementation uses dynamic memory allocation for data , it must call free() on that memory when it's no longer needed to avoid a leak.
- In some applications, a producer will not need to send messages to the consumer, and will simply perform a job and return. Once all producer thread callback functions have returned for a particular mpsc_t object, the consumer callback (i.e., mpsc_consumer_callback_t ) will be called one last time with its closed argument set to true, and the call to mpsc_join will return.
See also
mpsc_producer_ping, mpsc_producer_send_empty

◆ mpsc_producer_send_empty()

bool mpsc_producer_send_empty ( mpsc_producer_t self)

Similar to mpsc_producer_send , except that this function is used (from inside a producer thread callback function) to send an empty message.

Parameters
selfA pointer to the mpsc_producer_t instance for which to send a message down the underlying channel, to be delivered to the consumer.
Returns
bool A boolean value indicating whether the message was accepted or not. false means that the channel has been marked as closed and that, as a consequence, the message could not be delivered. true means that the message was accepted and will eventually be picked up by the internal consumer thread for delivery to the consumer callback.
Note
In some applications, a producer will not need to send messages to the consumer, and will simply perform a job and return. Once all producer thread callback functions have returned for a particular mpsc_t object, the consumer callback (i.e., mpsc_consumer_callback_t ) will be called one last time with its closed argument set to true, and the call to mpsc_join will return.
See also
mpsc_producer_ping, mpsc_producer_send

◆ mpsc_register_producer()

mpsc_register_producer_error_t mpsc_register_producer ( mpsc_t self,
mpsc_producer_thread_callback_t  callback,
void context 
)

The function used to register a new producer for self .

Parameters
selfA pointer to the mpsc_t instance for which to register a new producer.
callbackAn application defined thread callback function, which conforms to the mpsc_producer_thread_callback_t interface, to be used by the producer.
contextAn application defined context object that can be retrieved from inside callback by calling the mpsc_producer_context function on the callback 's mpsc_producer_t argument.
Returns
mpsc_register_producer_error_t A value used to report a potential error with the call. Please read the documentation for mpsc_register_producer_error_t for more information about the potential errors. A successful call will return MPSC_REGISTER_PRODUCER_ERROR_NONE .
See also
mpsc_consumer_register_producer, mpsc_producer_register_producer