Linux | Red Hat Certified | IT Technology | Operations Engineer
👇 Join our technical exchange QQ group with the note 【Public Account】 for faster approval
1. Concept of Thread Synchronization
Condition Variable: When a thread accesses a variable exclusively, it may find that it cannot proceed until other threads change the state. For example, when a thread accesses a queue and finds it empty, it can only wait until another thread adds a node to the queue. This situation requires the use of a condition variable.
Thread Synchronization: Refers to the coordination of multiple threads’ execution order in multithreaded programming through specific mechanisms, ensuring safe and orderly access to shared resources (such as memory, files, hardware, etc.). The core goal is to prevent data corruption, logical errors, or resource conflicts caused by concurrent access.
2. Why is Thread Synchronization Necessary?
2.1 Preventing Data Races (Data Race)
Problem: When multiple threads read and write shared data simultaneously, the execution order is uncertain, which may lead to data inconsistency.
int balance = 100; // Shared variable// Thread A executes: deposit 200balance += 200; // Thread B executes: withdraw 150balance -= 150;
When not synchronized: If threads A and B read balance=100 simultaneously, the final result could be 100+200-150=150 (correct should be 150) or 100-150+200=150, but if operations are interleaved (e.g., A reads then B writes), an incorrect value may be obtained (e.g., -50).
2.2 Ensuring Atomicity of Operations
Problem: A single operation (like i++) may correspond to multiple machine instructions at the lower level, and thread switching can cause the operation to be interrupted before completion.
; x86's i++ actual steps:mov eax, [i] ; Read i into registerinc eax ; Increment registermov [i], eax ; Write back to memory
If thread A executes up to inc eax and is switched out, and thread B modifies i, when thread A resumes, it will write back the old value, leading to an incorrect result.
2.3 Coordinating Execution Order Between Threads
Scenario: Certain tasks require threads to execute in a specific order.
Producer-Consumer Model: The consumer thread must wait for the producer to generate data before reading it.
Task Dependency: Thread B must execute after thread A completes initialization.
2.4 Avoiding Resource Contention (such as files, network connections)
Problem: Multiple threads writing to the same file or occupying the same network port simultaneously can lead to data corruption or program crashes.
Common Means of Thread Synchronization
Severe Consequences of Synchronization Issues
Data Inconsistency: Program outputs errors, such as abnormal bank account balances.
Program Crashes: Multiple threads freeing memory simultaneously can lead to double freeing.
Deadlock: Threads waiting for each other to release locks, leading to permanent blocking.
In short, thread synchronization is a mechanism for unified management of the producer-consumer model.
3. What is the Producer-Consumer Model?
3.1 Three Relationships
3.2 Two Roles
Producer, simulating the party that provides data
Consumer, the party that takes data
3.3 A Scenario
All data used for production and consumption is processed in the intermediate “supermarket”.
4. Simulating the Multi-Producer Consumer Model with a Blocking Queue
The following diagram simulates the basic process of a multi-producer consumer model using a blocking queue.
There are two types of threads (producers and consumers), which put and take data from the same scene (blockqueue).
4.1 Member Variables
We encapsulate the environment using existing library functions and simulate a critical resource with a queue.
private: std::queue<T> _q; // Container for storing data, critical resource int _cap; // Maximum capacity of bq pthread_mutex_t _mutex; // Mutex pthread_cond_t _productor_cond; // Producer condition variable pthread_cond_t _consumer_cond; // Consumer condition variable int _cwait_num; // Current number of waiting consumers int _pwait_num; // Current number of waiting producers };
4.2 Constructor and Destructor
BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0) { pthread_mutex_init(&_mutex, nullptr); // Create mutex pthread_cond_init(&_productor_cond, nullptr); // Producer condition variable pthread_cond_init(&_consumer_cond, nullptr); // Consumer condition variable } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_productor_cond); pthread_cond_destroy(&_consumer_cond); }
4.3 Simulated Producer and Consumer
void Equeue(const T &in) // Producer { pthread_mutex_lock(&_mutex); // Can you just put data? Producing data has conditions! // Conclusion 1: Waiting in the critical section is inevitable (currently) while (IsFull()) // 5. Check conditions, to prevent spurious wakeups, we usually use while for checking! { std::cout << "Producer entering wait..." << std::endl; // 2. Wait, release _mutex _pwait_num++; pthread_cond_wait(&_productor_cond, &_mutex); // Must hold the lock when waiting!! This is problematic! _pwait_num--; // 3. Return, the thread is awakened && re-acquires and holds the lock (it will wake up in the critical section!) std::cout << "Producer awakened..." << std::endl; } // 4. if(IsFull()) not satisfied || thread awakened _q.push(in); // Produce // There must be data! if(_cwait_num) { std::cout << "Waking up consumers" << std::endl; pthread_cond_signal(&_consumer_cond); } pthread_mutex_unlock(&_mutex); } void Pop(T *out) // Consumer { pthread_mutex_lock(&_mutex); while(IsEmpty()) { std::cout << "Consumer entering wait..." << std::endl; _cwait_num++; pthread_cond_wait(&_consumer_cond, &_mutex); // Spurious wakeup _cwait_num--; std::cout << "Consumer awakened..." << std::endl; } // 4. if(IsEmpty()) not satisfied || thread awakened *out = _q.front(); _q.pop(); // There must be space if(_pwait_num) { std::cout << "Waking up producers" << std::endl; pthread_cond_signal(&_productor_cond); } pthread_mutex_unlock(&_mutex); }
4.4 Simulating the Producer and Consumer Process
We assume the production speed is less than the consumption speed, meaning we need to spend some time after producing an object, but the consumer is always ready and must wait for the producer to produce it.
void *Consumer(void *args){ BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args); while(true) { int data; // 1. Get data from bq bq->Pop(&data); // 2. Process printf("Consumer, consumed a data: %d\n", data); }}void *Productor(void *args){ BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args); int data = 10; while (true) { sleep(2); // 1. Get data from external source // data = 10; // Is there data??? // 2. Produce to bq bq->Equeue(data); printf("Producer produced a data: %d\n", data); data++; }}
5. Overall Code
5.1 BlockQueue.hpp
#pragma once#include <iostream>#include <queue>#include <pthread.h>namespace BlockQueueModule{ static const int gcap = 10; template <typename T> class BlockQueue { private: bool IsFull() { return _q.size() == _cap; } bool IsEmpty() { return _q.empty(); } public: BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0) { pthread_mutex_init(&_mutex, nullptr); // Create mutex pthread_cond_init(&_productor_cond, nullptr); // Producer condition variable pthread_cond_init(&_consumer_cond, nullptr); // Consumer condition variable } void Equeue(const T &in) // Producer { pthread_mutex_lock(&_mutex); // Can you just put data? Producing data has conditions! // Conclusion 1: Waiting in the critical section is inevitable (currently) while (IsFull()) // 5. Check conditions, to prevent spurious wakeups, we usually use while for checking! { std::cout << "Producer entering wait..." << std::endl; // 2. Wait, release _mutex _pwait_num++; pthread_cond_wait(&_productor_cond, &_mutex); // Must hold the lock when waiting!! This is problematic! _pwait_num--; // 3. Return, the thread is awakened && re-acquires and holds the lock (it will wake up in the critical section!) std::cout << "Producer awakened..." << std::endl; } // 4. if(IsFull()) not satisfied || thread awakened _q.push(in); // Produce // There must be data! if(_cwait_num) { std::cout << "Waking up consumers" << std::endl; pthread_cond_signal(&_consumer_cond); } pthread_mutex_unlock(&_mutex); } void Pop(T *out) // Consumer { pthread_mutex_lock(&_mutex); while(IsEmpty()) { std::cout << "Consumer entering wait..." << std::endl; _cwait_num++; pthread_cond_wait(&_consumer_cond, &_mutex); // Spurious wakeup _cwait_num--; std::cout << "Consumer awakened..." << std::endl; } // 4. if(IsEmpty()) not satisfied || thread awakened *out = _q.front(); _q.pop(); // There must be space if(_pwait_num) { std::cout << "Waking up producers" << std::endl; pthread_cond_signal(&_productor_cond); } pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_productor_cond); pthread_cond_destroy(&_consumer_cond); } private: std::queue<T> _q; // Container for storing data, critical resource int _cap; // Maximum capacity of bq pthread_mutex_t _mutex; // Mutex pthread_cond_t _productor_cond; // Producer condition variable pthread_cond_t _consumer_cond; // Consumer condition variable int _cwait_num; // Current number of waiting consumers int _pwait_num; // Current number of waiting producers };}
5.2 Main.cc
#include "BlockQueue.hpp"#include <pthread.h>#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args){ BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args); while(true) { int data; // 1. Get data from bq bq->Pop(&data); // 2. Process printf("Consumer, consumed a data: %d\n", data); }}void *Productor(void *args){ BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args); int data = 10; while (true) { sleep(2); // 1. Get data from external source // data = 10; // Is there data??? // 2. Produce to bq bq->Equeue(data); printf("Producer produced a data: %d\n", data); data++; }}int main(){ // Trading place, not only can be used to transfer data // Transfer tasks!!!v1: object v2 BlockQueue<int> *bq = new BlockQueue<int>(5); // Shared resource -> Critical resource // Single producer, single consumer pthread_t c1, p1, c2, p2, p3; pthread_create(&c1, nullptr, Consumer, bq); pthread_create(&c2, nullptr, Consumer, bq); pthread_create(&p1, nullptr, Productor, bq); pthread_create(&p2, nullptr, Productor, bq); pthread_create(&p3, nullptr, Productor, bq); pthread_join(c1, nullptr); pthread_join(c2, nullptr); pthread_join(p1, nullptr); pthread_join(p2, nullptr); pthread_join(p3, nullptr); delete bq; return 0;}
5.3 Makefile
bin=bqcc=g++src=$(wildcard *.cc)obj=$(src:.cc=.o)$(bin):$(obj) $(cc) -o $@ $^ -lpthread%.o:%.cc $(cc) -c $< -std=c++17.PHONY:cleanclean: rm -f $(bin) $(obj).PHONY:testtest: echo $(src) echo $(obj)
Conclusion
This article provides a detailed introduction to the analysis of synchronization principles and the simulation of the multi-consumer model. I hope it has been helpful to you!
For course inquiries, add: HCIE666CCIE
↓ Or scan the QR code below ↓
If you have any technical points or content you would like to see,
please leave a message below to let me know!