Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

Linux | Red Hat Certified | IT Technology | Operations Engineer

👇 Join our technical exchange QQ group with the note 【Public Account】 for faster approval

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

1. Concept of Thread Synchronization

Condition Variable: When a thread accesses a variable exclusively, it may find that it cannot proceed until other threads change the state. For example, when a thread accesses a queue and finds it empty, it can only wait until another thread adds a node to the queue. This situation requires the use of a condition variable.

Thread Synchronization: Refers to the coordination of multiple threads’ execution order in multithreaded programming through specific mechanisms, ensuring safe and orderly access to shared resources (such as memory, files, hardware, etc.). The core goal is to prevent data corruption, logical errors, or resource conflicts caused by concurrent access.

2. Why is Thread Synchronization Necessary?

2.1 Preventing Data Races (Data Race)

Problem: When multiple threads read and write shared data simultaneously, the execution order is uncertain, which may lead to data inconsistency.

int balance = 100;  // Shared variable// Thread A executes: deposit 200balance += 200;  // Thread B executes: withdraw 150balance -= 150;

When not synchronized: If threads A and B read balance=100 simultaneously, the final result could be 100+200-150=150 (correct should be 150) or 100-150+200=150, but if operations are interleaved (e.g., A reads then B writes), an incorrect value may be obtained (e.g., -50).

2.2 Ensuring Atomicity of Operations

Problem: A single operation (like i++) may correspond to multiple machine instructions at the lower level, and thread switching can cause the operation to be interrupted before completion.

; x86's i++ actual steps:mov eax, [i]  ; Read i into registerinc eax       ; Increment registermov [i], eax  ; Write back to memory

If thread A executes up to inc eax and is switched out, and thread B modifies i, when thread A resumes, it will write back the old value, leading to an incorrect result.

2.3 Coordinating Execution Order Between Threads

Scenario: Certain tasks require threads to execute in a specific order.

Producer-Consumer Model: The consumer thread must wait for the producer to generate data before reading it.

Task Dependency: Thread B must execute after thread A completes initialization.

2.4 Avoiding Resource Contention (such as files, network connections)

Problem: Multiple threads writing to the same file or occupying the same network port simultaneously can lead to data corruption or program crashes.

Common Means of Thread Synchronization

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

Severe Consequences of Synchronization Issues

Data Inconsistency: Program outputs errors, such as abnormal bank account balances.

Program Crashes: Multiple threads freeing memory simultaneously can lead to double freeing.

Deadlock: Threads waiting for each other to release locks, leading to permanent blocking.

In short, thread synchronization is a mechanism for unified management of the producer-consumer model.

3. What is the Producer-Consumer Model?

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

3.1 Three Relationships

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

3.2 Two Roles

Producer, simulating the party that provides data

Consumer, the party that takes data

3.3 A Scenario

All data used for production and consumption is processed in the intermediate “supermarket”.

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

4. Simulating the Multi-Producer Consumer Model with a Blocking Queue

The following diagram simulates the basic process of a multi-producer consumer model using a blocking queue.

There are two types of threads (producers and consumers), which put and take data from the same scene (blockqueue).

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

4.1 Member Variables

We encapsulate the environment using existing library functions and simulate a critical resource with a queue.

private:        std::queue<T> _q;               // Container for storing data, critical resource        int _cap;                       // Maximum capacity of bq        pthread_mutex_t _mutex;         // Mutex        pthread_cond_t _productor_cond; // Producer condition variable        pthread_cond_t _consumer_cond;  // Consumer condition variable        int _cwait_num;                 // Current number of waiting consumers        int _pwait_num;                 // Current number of waiting producers    };

4.2 Constructor and Destructor

BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)        {            pthread_mutex_init(&_mutex, nullptr);         // Create mutex            pthread_cond_init(&_productor_cond, nullptr); // Producer condition variable            pthread_cond_init(&_consumer_cond, nullptr);  // Consumer condition variable        }        ~BlockQueue()        {            pthread_mutex_destroy(&_mutex);            pthread_cond_destroy(&_productor_cond);            pthread_cond_destroy(&_consumer_cond);        }

4.3 Simulated Producer and Consumer

void Equeue(const T &in) // Producer        {            pthread_mutex_lock(&_mutex);            // Can you just put data? Producing data has conditions!            // Conclusion 1: Waiting in the critical section is inevitable (currently)            while (IsFull()) // 5. Check conditions, to prevent spurious wakeups, we usually use while for checking!            {                std::cout << "Producer entering wait..." << std::endl;                // 2. Wait, release _mutex                _pwait_num++;                pthread_cond_wait(&_productor_cond, &_mutex); // Must hold the lock when waiting!! This is problematic!                _pwait_num--;                // 3. Return, the thread is awakened && re-acquires and holds the lock (it will wake up in the critical section!)                std::cout << "Producer awakened..." << std::endl;            }            // 4. if(IsFull()) not satisfied || thread awakened            _q.push(in); // Produce            // There must be data!            if(_cwait_num)            {                std::cout << "Waking up consumers" << std::endl;                pthread_cond_signal(&_consumer_cond);            }            pthread_mutex_unlock(&_mutex);        }        void Pop(T *out) // Consumer        {            pthread_mutex_lock(&_mutex);            while(IsEmpty())            {                std::cout << "Consumer entering wait..." << std::endl;                _cwait_num++;                pthread_cond_wait(&_consumer_cond, &_mutex); // Spurious wakeup                _cwait_num--;                std::cout << "Consumer awakened..." << std::endl;            }            // 4. if(IsEmpty()) not satisfied || thread awakened            *out = _q.front();            _q.pop();            // There must be space            if(_pwait_num)            {                std::cout << "Waking up producers" << std::endl;                pthread_cond_signal(&_productor_cond);            }            pthread_mutex_unlock(&_mutex);        }

4.4 Simulating the Producer and Consumer Process

We assume the production speed is less than the consumption speed, meaning we need to spend some time after producing an object, but the consumer is always ready and must wait for the producer to produce it.

void *Consumer(void *args){    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);    while(true)    {        int data;        // 1. Get data from bq        bq->Pop(&data);        // 2. Process        printf("Consumer, consumed a data: %d\n", data);    }}void *Productor(void *args){    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);    int data = 10;    while (true)    {        sleep(2);        // 1. Get data from external source        // data = 10; // Is there data???        // 2. Produce to bq        bq->Equeue(data);        printf("Producer produced a data: %d\n", data);        data++;    }}

5. Overall Code

5.1 BlockQueue.hpp

#pragma once#include <iostream>#include <queue>#include <pthread.h>namespace BlockQueueModule{    static const int gcap = 10;    template <typename T>    class BlockQueue    {    private:        bool IsFull() { return _q.size() == _cap; }        bool IsEmpty() { return _q.empty(); }    public:        BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0)        {            pthread_mutex_init(&_mutex, nullptr);         // Create mutex            pthread_cond_init(&_productor_cond, nullptr); // Producer condition variable            pthread_cond_init(&_consumer_cond, nullptr);  // Consumer condition variable        }        void Equeue(const T &in) // Producer        {            pthread_mutex_lock(&_mutex);            // Can you just put data? Producing data has conditions!            // Conclusion 1: Waiting in the critical section is inevitable (currently)            while (IsFull()) // 5. Check conditions, to prevent spurious wakeups, we usually use while for checking!            {                std::cout << "Producer entering wait..." << std::endl;                // 2. Wait, release _mutex                _pwait_num++;                pthread_cond_wait(&_productor_cond, &_mutex); // Must hold the lock when waiting!! This is problematic!                _pwait_num--;                // 3. Return, the thread is awakened && re-acquires and holds the lock (it will wake up in the critical section!)                std::cout << "Producer awakened..." << std::endl;            }            // 4. if(IsFull()) not satisfied || thread awakened            _q.push(in); // Produce            // There must be data!            if(_cwait_num)            {                std::cout << "Waking up consumers" << std::endl;                pthread_cond_signal(&_consumer_cond);            }            pthread_mutex_unlock(&_mutex);        }        void Pop(T *out) // Consumer        {            pthread_mutex_lock(&_mutex);            while(IsEmpty())            {                std::cout << "Consumer entering wait..." << std::endl;                _cwait_num++;                pthread_cond_wait(&_consumer_cond, &_mutex); // Spurious wakeup                _cwait_num--;                std::cout << "Consumer awakened..." << std::endl;            }            // 4. if(IsEmpty()) not satisfied || thread awakened            *out = _q.front();            _q.pop();            // There must be space            if(_pwait_num)            {                std::cout << "Waking up producers" << std::endl;                pthread_cond_signal(&_productor_cond);            }            pthread_mutex_unlock(&_mutex);        }        ~BlockQueue()        {            pthread_mutex_destroy(&_mutex);            pthread_cond_destroy(&_productor_cond);            pthread_cond_destroy(&_consumer_cond);        }    private:        std::queue<T> _q;               // Container for storing data, critical resource        int _cap;                       // Maximum capacity of bq        pthread_mutex_t _mutex;         // Mutex        pthread_cond_t _productor_cond; // Producer condition variable        pthread_cond_t _consumer_cond;  // Consumer condition variable        int _cwait_num;                 // Current number of waiting consumers        int _pwait_num;                 // Current number of waiting producers    };}

5.2 Main.cc

#include "BlockQueue.hpp"#include <pthread.h>#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args){    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);    while(true)    {        int data;        // 1. Get data from bq        bq->Pop(&data);        // 2. Process        printf("Consumer, consumed a data: %d\n", data);    }}void *Productor(void *args){    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);    int data = 10;    while (true)    {        sleep(2);        // 1. Get data from external source        // data = 10; // Is there data???        // 2. Produce to bq        bq->Equeue(data);        printf("Producer produced a data: %d\n", data);        data++;    }}int main(){    // Trading place, not only can be used to transfer data    // Transfer tasks!!!v1: object v2    BlockQueue<int> *bq = new BlockQueue<int>(5); // Shared resource -> Critical resource    // Single producer, single consumer    pthread_t c1, p1, c2, p2, p3;    pthread_create(&c1, nullptr, Consumer, bq);    pthread_create(&c2, nullptr, Consumer, bq);    pthread_create(&p1, nullptr, Productor, bq);    pthread_create(&p2, nullptr, Productor, bq);    pthread_create(&p3, nullptr, Productor, bq);    pthread_join(c1, nullptr);    pthread_join(c2, nullptr);    pthread_join(p1, nullptr);    pthread_join(p2, nullptr);    pthread_join(p3, nullptr);    delete bq;    return 0;}

5.3 Makefile

bin=bqcc=g++src=$(wildcard *.cc)obj=$(src:.cc=.o)$(bin):$(obj)    $(cc) -o $@ $^ -lpthread%.o:%.cc    $(cc) -c $< -std=c++17.PHONY:cleanclean:    rm -f $(bin) $(obj).PHONY:testtest:    echo $(src)    echo $(obj)

Conclusion

This article provides a detailed introduction to the analysis of synchronization principles and the simulation of the multi-consumer model. I hope it has been helpful to you!

For course inquiries, add: HCIE666CCIE

↓ Or scan the QR code below ↓

Understanding Linux Synchronization Principles: Simulating the BlockQueue Producer-Consumer Model for Efficient Concurrent Programming!

If you have any technical points or content you would like to see,

please leave a message below to let me know!

Leave a Comment