Multithreading and concurrency
Table of Contents
- 1. Multithreading and concurrency
- 1.1. Fundamental Concepts
- 1.2. Concepts Reference Map
- 1.3. Standard Library Reference
- 1.4. Papers from WG21 Working Group
- 1.5. Class std::thread
- 1.6. Functions of namespace std::this_thread
- 1.7. Thread - std::thread usage and synchronization primitives
- 1.8. Thread - Returning values from std::thread
- 1.9. Thread Local Variables and TLS - Thread Local Storage
- 1.10. Task-based APIs => Futures and Promises
- 1.11. Condition Variables and Producer Consumer Problem
- 1.12. Messaging Passing Concurrency and Message Queue
- 1.13. Thread Pools
1 Multithreading and concurrency
1.1 Fundamental Concepts
Processes and Threads
Processes:
- A process is a running program with its own virtual memory, address space, unique process identifier ID and context (CPU registers - IP - Instruction Pointer and SP - Stack Pointer)
- A single CPU core is only capable of executing a single process at a time. However, users have the illusion that multiple processes are being run simultaneously because the operating system's scheduler multiplexes the CPU execution time between all processes. As a result, each process is run sequentially by a single CPU core during a short time slice (time sharing). When the scheduler switches to another process, it saves the process' state (current directory, CPU registers, …) and loads the state of the next process (context switching).
Threads:
- A thread is a independent flow of execution or task within a single process. The purpose of threads is to allow a process to execute multiple independent simultaneous tasks, such as running the user interface dispatch thread, handling socket connections, performing the download in a different thread and so on.
- For an operating system, a thread is a lightweight process or a stripped down process with its own stack, local data, CPU registers (specially IP - Instruction Pointer and SP - Stack Pointer), but without its own address space and virtual memory, instead it only can access to the virtual memory of the process that thread belongs to.
- Threads are also known as (aka):
- Lightweight process
- Native thread
- Kernel-thread
- Operating system thread or OS thread
Benefits of multi-threading:
- Increase application responsiviness, specially GUI - Graphical User Interface Applications.
- Take advantage of multi-core processor
- Speed up heavy math computations. Multi-threading allows to split a heavy matrix calculation into multiple threads running in different CPU cores.
- Fewer system resources usage. Using multiple threads for running multiple tasks is cheaper than running multiple processes for each task. Note: before multi-threading, it was common in Unix-like operating system to use the fork() system call to fork (copy) the current process for handling client socket connections in network server appliocations.
Synchronization Primitives
- Mutex
- Semaphore
- Condition variables
- Barriers
- Atomic variables
Potential Problems of Concurrency and Multi-threading
- Race condition, aka data race
- Deadlock
- Starvation
- Oversubscription
- Load balancing
- Thread exaustion
Low Level Multithreading APIs:
Threads and processes require operating system support, therefore programming languages do not provide threads, they just provide high level wrapper APIs for calling operating systems' low level APIs. The most common of those low level APIs are:
- pthread (Posix threads) - POSIX API (C-API)
- The posix thread API is implemented by most Unix-like operating systems such Linux, MacOsx, iOS, Android, BSD and some embedded real time systems such as RTEMS, QNX and VxWorks.
- Windows Win32 Thread API
- Available only for Windows NT and Windows CE kernels.
Concurrency X Parallelism
- Concurrency
- => Multiple threads run sharing resources (shared memory) in the same process. It is not possible to run them safely without synchronization promitives, specially locks such as mutexes and semaphores.
- Note: concurrency is possible in a system with just one processor and one core.
- Main purpose: Increase system responsiviness.
- Avoid blocking main threads of user interface's event loop by running a computation with significant delay in a new thread; handle multiple client requests of a web server in another thread or in a thread pool and so on..
- Better for:
- GUI - Graphical User Interfaces, increase responsiviness avoiding freezing the UI.
- Network Servers with lots IO.
- Games and computer graphics => Tasks with long delay that could block the game loop can be executed in another thread.
- Parallel Computing
- => Multiple threads are running in different processing units without any communication or shared data. Parallelism require at least more than one processing unit, which can be multiple CPU cores, multiple CPU chips/sockets, multiple computers or a GPU card.
- => Parallelism is not possible with a single core CPU, it requires multiple cores or multiple processing units.
- => Parallism is also possible without threads with SIMD CPU instructions (data parallelism).
- => Better for:
- HPC - High Performance Computing
- Math intensive computation
- CPU-bound tasks
- Math, Physics, Science and Engineering
- Hardware for Parallel Computing
- Multi-core CPUs
- Multi processor servers with multi-core CPUs.
- GPU
- Computer cluster (distributed systems)
- APIs - Application Programming Interfaces for Parallel Computing
- OpenMP => Useful for parallelizing existing algorithms and loops.
- MPI - Message Passing Interface.
- GPU => Cuda, OpenCL, Sycl …
- SIMD Libraries
Computation bottlenecks
- IO-bound computations
- => Most of the time of an IO-bound computation is spent waiting, reading and writing IO. The speed of the task is limited by the IO speed.
- Example: Processing big files, web servers, FTP servers …
- Solutions:
- Thread-pools + non-blocking IO based on operating system APIs:
epool (Linux); Kqueue (BSD and Mac OSX) and IOCP (Windows).
- Some libraries: Boost ASIO or LibUV used by NodeJS. …
- User-space threads (aka fibers, greenthreads or coroutines) +
Non-blocking IO such as epoll (Linux); kqueue (MacOSX and
BSD-variants) and IOCP on Windows.
- Some libraries: Boost Asio + Boost Coroutines or C++20 Coroutines.
- Thread-pools + non-blocking IO based on operating system APIs:
epool (Linux); Kqueue (BSD and Mac OSX) and IOCP (Windows).
- CPU-bound computations
- => A CPU-bound computations is CPU intensive and its speed is limited by the CPU speed. As the CPU clock is no longer increasing as before, the only way to boost performance when no further optimization is possible is by using multithreading and parallel programming for taking advantage of multicore CPUs.
- Example: Heavy mathematical computation; matrix computations; linear algebra; solving PDEs partial differential equations; computer vision …
- Solutions:
- Find better algorithms
- Select suitable data structures
- Cache-friendly data structures
- Parallize algorithms for running a chunck on N threads on different cores. APIs: C++ STL parallel algorithms; Intel TBB; OpenMP …
- Use a CPU with more cores or higher clock "if possible".
- Use a server with multiple multi-core CPUs.
- Use a cluster or distributed computer.
- IO-bound and CPU-bound.
- The speed of the task is limited both by CPU speed and time spent waiting, reading or writing IO.
Hardware and Multicore
The days where manufacturers would keep increasing the CPU frequency are over as a technology limit was reached, an increase in the CPU clock speed, increases the CPU power consumption in a cubic rate, therefore the CPU heat rises dramatically requiring even more cooling. The solution found was to design CPU chips with more cores instead of keep increasing the CPU frequency. However, it crates new challenges for developers as the only way to boost the performance and speed of CPU-bound applications is by taking advantage of the multicore processors which means using multithreading and parallel programming tehcniques.
Terminology:
- Physical Processor, aka CPU (Central Processing Unit) or socket
- => Chip visible in the computer's motherboard. A single modern chip can contain multiple processing units inside of it, called CPU core. Note: some server computers may have multiple physical processors or CPU chips.
- CPU Core and Multicore CPUs
- => A CPU core is a processing unit or the device that executes the machine code instructions. Nowadays most processors chips are multicore, a single CPU chip contain multiple cores. A CPU with N cores is capable of executing at least N stream of instruction simultaneously or at least N hardware threads.
- Hyper Thread
- => Hyper threading is a Intel's proprietary technology which allows a single CPU core to run process multiple streams of instructions as it was multiple processors. In other words, a single core is capable of running multiple threads in parallel.
- Total number of hardware threads or logical processors
- => NHW = TOTAL NUMBER OF LOGICAL PROCESSORS = TOTAL NUMBER OF THREADS
NHW = (Number of CPUs) * (Number of Cores per CPU) * ( Number of HW threads per core )
A server computer with 2 physical processors or CPU sockets, 4 processing cores per CPU and 2 threads per core has a total of
- NHW = 2 x 2 x 4 = 16 threads or 16 logical processors
1.2 Concepts Reference Map
Multitasking
Ability to perform multiple actions at the same time:
- Multi processing (Multi-process)
- => Spawn, fork new process for handling new requests or performing other tasks. Example: POSIX system call fork - before threads become widespread, servers handled new incoming client sockets requests by forking a new process by calling fork which creates a child process which is a copy of the current process.
- => Processes communicate through IPC - Interprocess Communication primitives such as shared memory, message passing, synchronization primitives or sockets.
- => Advantages:
- Better robustness: the crashing or abnormal termination of any process does not affect others.
- => Disadvantage:
- Higher overhead, creating processes is more expensive than creating threads.
- More complicated communication: unlike threads, processes cannot easily share data and variables without IPC inter process communication.
- => Note: Web browsers such as Chrome and Firefox spawns a new process for dealing with a new created tab or window.
- => Note: Some programming languages, such as Python, OCaml, and Ruby suffer from the GIL - Global Interpreter Lock problem which allows only a single native thread run at a time due to concurrency reasons and design choices. The GILs makes the language unable to achieve true concurrency and parallelism by taking advantage of multicore processors. The solution to this issue is to use multi-processing, spawn multiple processes for harness the multicore processors capabilities.
- => Note: Python and other languages with the GIL issue can overcome the GIL limitation by using libraries written in native code in Rust, C, C++ such as Numpy for Python.
- Multi threading (Kernel threads)
- => Use multiple threads for dealing with concurrency and running multiple independent taks control flows within a single process.
- => Advantages:
- Creating new threads is less expensive than creating processes, although spawning a kernel threads is still expensive.
- Threads can communicate through shared memory (process virtual memory) and thread synchronization primitives.
- Unlike, threads can easily share variables and data.
- => Disadvantages:
- Threads
- Multi threading (User Space threads)
- User-space threads (_fibers_, aka coroutines, aka greenthreads, lightweight threads) are similar to native threads which are provided by the kernel. However, unlike kernel threads, user-space threads are emulated by a runtime, interpreter or library in the user-space. Note: languages withotu user-space threads, can overcome the thread overhead by using thread pools and non-blocking IO just like Nginx web server and NodeJS. The problem of this approach is the non-friendly callbacks and possible "callback-hell".
- Advantages:
- Faster context switch than kernel-threads.
- It is possible to run multiple user-space threads in a single threads.
- Scalability: It is expensive to spawn multiple kernel threads for handling every incoming connection of some web server due to the memory and context switch overhead of kernel threads. However, it is cheap and faster to spawn hundreds or thousands of user-space threads for handling incoming connections.
- Disadvantages:
- Cooperative multithreading: All fibers or user-space threads can only use non-blocking IO and non-blocking system calls (epoll, select, kqueue, …), otherwise all other user-space threads will be stalled.
- User-space threads are not suitable for CPU-bound (CPU intensive), numerical computations and parallel computations.
- Programming Languages with user-space threads:
- Golang (Goroutines are user-space threads)
- Haskell
- Erlang
- OCaml
- Python
- Ruby
- C++20, until C++20 is not released there is the
Concurrency Primitives
- Threads (kernel threads)
- Thread-local store
- Lock synchronization primitives
- mutex
- recursive mutex
- semaphore
- barriers
- spinlocks
- Non-lock synchronization primitives
- atomic variables
- Coordination synchronization primnitives
- condition variables
Thread Operations
- fork()/spawn()
- Create a new thread.
- join()
- Wait for thread termination.
- get_id()
- Get unique thread identifier nunber, similar to process ID (PID).
- cancel()
- aka trhead cancellation => Abnormally terminate a thread (similar to std::terminate), not supported by C++ standard library.
Concurrency Hazards
- Data races, race conditions
- Deadlock
- Livelock
- Starvation
Concurrency Concepts
- Mutual exclusion
- locks
- race conditions (a.k.a: data races)
- Producer/consumer problems
- Thread local storage
High Level Concurrency Constructs
- Message queue (aka blocking queue or mailbox)
- Construct used for implementing message passing concurrency and coordinates producer/consumer threads without explicit use of locks and shared state.
- Example: Java's BlockingQueue
- Thread Pools
- Construct used for recycling and avoiding creating too much threads in IO intensive applications such as web servers. A thread pool is comprised of a task queue and a set of long-running worker threads that runs taks from the task queue submited by the client code. Thread pools can also be used for parallel computing.
- Servers such as Nginx, the NodeJS (JavaScript runtime) and Boost.Asio library use thread pools alongside non blocking IO for achieving scalability.
- Executors
- Abstraction for decoupling tasks from the execution method. It allows changing the execution mechanism without much effort. For instance, it makes possible to run a set of tasks from a task queue (message queue) serially in the same thread, serially in a another thread or in a thread pool.
- Futures (Task-based API)
- Allows the calling to get the return value from an asynchrnous computation. It does not block the calling thread unti Future.get() is called for retrieving the value from the async computation. Note: The computation wrapped by the future object can run in a new thread or in a thread pool (better).
- Note: The C++11 std::future runs in a new thread which has a big overhead if too much future objects are created in a short time.
- Promises (Task-based API)
- Lock-free data structures
- Thread-safe containers/data structures or collections
- Collections such as hashmaps, arrays, sets tha encapsulates synchronization primitives such as locks and are safe to use from multiples threads.
Concurrency Models
- CSP - Concurrent Sequential Process (Tony Hoare)
- Languages with CSP: Go (aka Golang), Clojure
- Actor Model
- Languages with actor model: Erlang, Scala (Akka)
- Types:
- Active Objects
- Agents
- STM - Software Transaction Memory
- Languages with STM: Haskell.
Parallelism
- Goal: speed up computations and improve the overall performance by splitting a big computation into multiple smaller computations tha runs simulatenously in multiple processing units.
- Hardware: Multicore CPUs; CPU SIMD instructions (aka vector instructions); server computers with many multicores CPU chips; clusters of many computers; supercomputers with thousands of cores; GPUs - Graphical Processing Unit.
- APIs:
- OpenMP (Language extension for C, C++ or Fortran + Runtime library)
- Parallelize for-loops of numerical algorithms without explicitly dealing with threads.
- MPI - Message Passing Interface
- Intel Cilk Language extension for C or C++
- Intel TBB - Thread Building Blocks
- Microsoft PPL - Parallel Patterns Library (Windows-Only)
- STL - C++17 Standard Library Parallel Algorithms
- Intel Parallel STL (Reference implementation of C++ Standard-Library parallel algorithms)
- GPU APIs:
- CUDA (proprieteary, Nvidia)
- OpenCL (open source, Khronos Group)
- SYCL (open source, Khronos Group)
- OpenMP (Language extension for C, C++ or Fortran + Runtime library)
See:
User-Space Threads, Coroutines, Fibers, …
- Fiber (computer science) - Wikipedia
- Green threads explained (intro)
- Coroutines (C++20) - cppreference.com
- Coroutines - ModernesCpp.com
- c++ - What are coroutines in C++20? - Stack Overflow
- Library: Boost.Coroutine
- Library: Boost.Coroutine2
- Boost coroutine library.
- Library: Boost.Fiber
- Library: Boost.Asion - coroutines
- Coroutines support in Boost Asio network socket server/client framework
- P0158R0 - Coroutines belong in a TS
- P13564R0 - Fibers under the magnifying glass - Gor Nishanov [BEST, MUST READ]
- N4402 - Resumable Functions - Gor Nishanov
- P08066: Response to "Fibers Under Magnifying glass"
- Fibers: the Most Elegant Windows API « null program
- What are fibers and why should you care? | JRebel & XRebel
- Why you can have millions of Goroutines but only thousands of Java Threads
- Goroutines vs Threads - Seven Story Rabbit Hole
- corona - crates.io: Rust Package Registry (Rust Coroutines)
Web Browser Multi-processing:
- Chromium Blog: Multi-process Architecture
- Multi-process Architecture - The Chromium Projects
- Modern Multi-Process Browser Architecture • Helge Klein
- c - Which is better choice for browser's tab: multi-thread or multi-process? - Stack Overflow
GIL => Global Interpreter Lock:
- What is the Python Global Interpreter Lock (GIL)? – Real Python
- GlobalInterpreterLock - Python Wiki
- What is the Python Global Interpreter Lock (GIL) - GeeksforGeeks
- Understanding the Python GIL - David Beazley
Executor Design Pattern:
- The Executor Design Pattern - Decoupling Tasks from Execution - Eric Crahen - VikingPLOP 2002 - page 55
- Concurrency Patterns - Douglas C. Schmidt
- https://www.dre.vanderbilt.edu/~schmidt/POSA/POSA2/conc-patterns.html
- Note: "Other patterns in the literature that address concurrency-related issues include Master-Slave [POSA1], Producer-Consumer [Grand98], Scheduler [Lea99a], and Two-phase Termination [Grand98]."
- Executors for C++ - A long History - Strugling for a Base for Concurrency Building Block - Detlef Vollman.
- GitHub - executors/executors: A proposal for a executor programming model for ISO C++
- A Short Detour: Executors - ModernesCpp.com
- Java Executor Framework
- WG21 - N4046 - Executors and Asynchronous Operations - Christopher Kohlhoff
- http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4046.pdf
Note: Reference implementation of executors library:
- Presentation:
- "This proposal is the asynchronous model that underpins the P0112R0 Networking Library proposal, and the proposed wording below is taken from the corresponding sections of P0112R0. In doing so, it takes design concepts from Boost.Asio, many of which have been unchanged since its inclusion in Boost, and repackages them in a way that is more suited to C++14 language facilities."
1.3 Standard Library Reference
Technical Specifications
- P0159 - Technical Specifiction for Concurrency
- P0024 - Technical Specifiction for Parallelism
- C++17 Concurrency TS (Technical Specification)
C++11/14 Thread API
- std::threads (C++11)
Task Based API
- std::promise (Microsoft)
- std::future (Microsoft)
- std::future<>, std::shared_future<>, std::atomic_future<>
- std::packaged_task (Microsft)
- std::async
- Function: std::async (Microsft)
- std::launch
C++17 Additions:
- Concurrent TS (Nonblocking futures (.then), executors, await)
- future::when_any
- future::when_all()
- future::then()
- future::unwrap()
Syncronization Primitives:
- Locks
- std::mutex
- std::condition_variable
- RAII Wrappers for locks
- std::unique_lock<>
- std::lock_guard - RAII Wrapper for locks
- Atomic Operations => Header: <atomic>
- std::atomic
- std::atomic_xxx, std::atomic<>, std::atomic_thread_fence()
Implementations of C++11 Standard Library
- Clang LLVM
- GNU GCC/G++
- MSVC - Microsft Visual C++ Compiler (aka Visual Studio Compiler)
- just::thread - commercial implementation by Just Software Solution for MSVC, GNU GCC/G++ and CLang.
1.4 Papers from WG21 Working Group
C++11 Thread Local Storage
- WG21 - N2659 Thread-Local Storage (C++ Standard Proposal)
- "Proposal that defines the thread_local storage class specifier for C++11."
Executors and thread-pools
- P0761R2 - Executors Design Documentation
- "This paper is a companion to P0443R4 anddescribes the executors programming model itspecifies. This paper is directed toward readers whowant to understand in detail the mechanics ofP0443’s programming model, and the rationaleunderpinning the choices of that model’s design."
- P0443r9 - A Unified Executors Proposal for C++
- "This paper proposes a programming model for executors, which are modular components for creating execution. The design of this proposal is described in paper P0761."
- N3562 - Executors and Schedulers, Revision1
- "This paper is a proposal for executors, objects that can execute units of work packaged asfunction objects, in the form of an abstract base class and several concrete classes that inheritfrom it. It is based on components that are heavily used in internal Google and Microsoft code,with changes to better match the style of the C++ standard."
Task-based concurrency API: Futures, Promises, Packaged-task
- N3857 - Improvements to std::future<T> and Related APIs
- "This proposal is anevolution of the functionality of std::future/std::shared_future. It details additions which can enable wait free compositions of asynchronous operations.This document supersedes N3784. Several typos in the code samples have been fixed, and a small editorial change made in the Technical Specification section."
- N2276 - Thread Pools and Futures
- "At Oxford, the combined EWG and LWG voted to proceed with work on Thread Pools and Futures for C++0x, even though this work had previously been destined for TR2. This paper is provided to further discussions in this area. It draws heavily on N2094 and N2185."
- Note: Provides basic explanation about futures, promises, packaged_tasks and thread pools concurrency constructs.
- N2709 - N2709 Packaging Tasks for Asynchronous Execution
- "This proposal is based on the proposal for std::packaged_task in N2561 and N2627. It has been separated from the futures proposal at the request of the LWG, and revised in light of LWG comments at the Sophia-Antipolis meeting in June 2008. A std::packaged_task provides a means of binding a function to an asynchronous future values so that when the function is invoked, the asynchronous future value becomes populated, rather than the return value or exception being propagated to the caller.. …"
- N2561 - N2561: An Asynchronous Future Value
- "This is a proposal to implement the "Kona concurrency compromise", i.e. motion SP2 in N2452 and N2453: 'WG21 resolves that for this revision of the C++ standard (aka "C++0x") the scope of concurrency extensions shall be constrained as follows: Include a memory model, atomic operations, threads, locks, condition variables, and asynchronous future values. Exclude thread pools, task launching, and reader-writer locks.'"
User-space threads, fibers, coroutines
- N4024 - Distinguishing coroutines and fibers
- "The purpose of this paper is to foreshadow a forthcoming proposal to introduce fibers to the C++ standard library;to briefly describe the features in the proposed fiber library; and to contrast it with the coroutine library proposed inN39856.It is hoped that this comparison will help to clarify the feature set of the proposed coroutine library. Certain fea-tures properly belong in the coroutine library; other conceptually-related features more properly belong in the fiber library."
Parallelism and Parallel Algorithms
- N4071 - Working Draft, Technical Specification for C++ Extensions for Parallelism
- "This Technical Specification describes requirements for implementations of an interface that computer programs written in the C++ programming language may use to invoke algorithms with parallel execution. The algorithms described by this Technical Specification are realizable across a broad class of computer architecturesThis Technical Specification is non-normative. Some of the functionality described by this Technical Specification may be considered for standardization in a future version of C++, but it is not currently part of any C++ standard. Some of the functionality in this Technical Specification may never be standardized, and other functionality may be standardized in a substantially changed form. The goal of this Technical Specification is to build widespread existing practice for parallelism in the C++ standard algorithms library. It gives advice on extensions to those vendors who wish to provide them."
- N4167 - Transform Reduce, and Additional Algorithm for C++ *Extensions for Parallelism
- "The goal of this paper is to widen the range of problems that the C++ standards proposal N4071, A Technical Specification for C++ Extensions for Parallelism, Revision 1 may encompass. Offering a greater use case for these algorithms may help further extend the significanceof this Technical Specification.This document describes and outlines the problems we have encountered with the current algorithm requirements, as specified in standards proposal N4071. The algorithm described in this document is a solution to a general set of problems that the current algorithms do not support. Transform Reduceis to be considered as an addition to the current set of algorithms in proposal N4071, not a replacement."
1.5 Class std::thread
The class std::threads is not thread, it is a proxy for a native thread and encapsulates a native thread or a kernel thread which the documentation calls thread of execution.
Header:
Documentation:
- std::thread - cppreference
- std::thread - Microsft MSFT, MSVC
- boost:thread - Predecessor of the standard library threads
- Thread Management - 1.71.0 (Boost docs)
Papers related to the standard library implementation:
- WG21 - N2093 - Multithreading API for C++0X - A Layered Approach - 2006-09-09
- WG21 - N2139 - Thoughts on a Thread Library for C++ - 2006-11-06
- WG21 - N2184 - Thread Launching for C++0X - 2007-03-09
- WG21 - N2497 - Multi-threading Library for Standard C++ (Revision 1) - 2008-01-07
- WG21 - N2320 - Multi-threading Library for Standard C++ - 2007-06-24
Type of signature of std::thread member functions
class thread { public: // types: class id; // typedef implementation-defined native_handle_type; // See [thread.native] // construct/copy/destroy: thread(); template <class F> explicit thread(F f); template <class F, class ...Args> thread(F&& f, Args&&... args); ~thread(); thread(const thread&) = delete; thread(thread&&); thread& operator=(const thread&) = delete; thread& operator=(thread&&); // members: void swap(thread&&); bool joinable() const; void join(); void detach(); id get_id() const; native_handle_type native_handle(); // See [thread.native] // static members: static unsigned hardware_concurrency(); };
Detailed Member Functions of class std::thread
- Default and move constructors
// Default constructor - without any thread of execution thread() noexcept; // Move constructor thread(thread&& Other) noexcept;
- Other Constructors
- The following constructor can take as agument (type parameter Fn): any function pointer; callable object (aka "functor") or lambda expression. The thread of execution associated to the constructed object starts its execution immediately.
template <class Fn, class... Args> explicit thread(Fn&& F, Args&&... A);
Example: Construct thread out of function pointer:
//========= Create thread out of function pointer =======// void do_forever() { while(every 10 seconds){ println(" 10 second elapsed!")}; } void action_sleep(int N) { // .... sleep for N seconds .... print(" [INFO] Thread wake up! OK"); } thread th1 {do_forever}; thread th2 {&do_forever}; thread th3 {action_sleep, 10}; thread th4 {&action_sleep, 1};
Example: construct threads out of function object, aka callable objects or functors.
// Functor struct LoopMessage { const std::string message; const int delay; LoopMessage(std::string message, int delay): message(message) , delay(delay) { } // Function-call operator called by the thread class. void operator()() { while(true) { std::this_thread::sleep_for(std::chrono::seconds(delay)); std::cout << " [INFO] thread id = " << std::this_thread::get_id() << " ; " << message << std::endl; } } }; int main() { std::thread thread_messageA {LoopMessage, "Hello world", 10}; auto thread_messageB = std::thread{LoopMessage, "Hello world", 10}; return 0; }
Example: construct thread object out of lambda expressions.
std::thread threadA ([](){ while(true) { std::this_thread::sleep_for(std::chrono::seconds(1)); // ... action .... // } }); auto th4 = std::thread([](int N){ while(true) { std::this_thread::sleep_for(std::chrono::seconds(1)); // .... action ... // } }, 10);
Note: Before a given std::thread object goes out of scope, it is necessary to call the methods .join() for waiting for the completion of associated thread of execution or std::thread::detach() for detaching the thread of execution. If neither of those functions are called, the C++ runtime calls std::terminate and std::abort() indirectly which causes abnormal termination of the current application.
Example:
// Failure => Te runtime will call std::terminate int Function_error() { std::thread threadA {&functionPointer, 10, "Hello world"}; // Error: Missing call to methods .detach() or .join() // The runtime will call std::terminate() causing abnormal terminatoon!! return 1; } // OK => Does not call std::terminate. int Function_join() { std::thread threadA {&functionPointer, 10, "Hello world"}; // Ok threadA.join(); // Wait for completion of threadA. return 1; } // ThreaA out of scope here! // OK => Does not call std::terminate. int Function_detach() { std::thread threadA {&functionPointer, 10, "Hello world"}; // Ok // Detach, no longer control or manage threadA. threadA.detach(); return 1; } int main() { Function_error(); Function_join(); Function_detach(); return 10; }
- joinable()
- => Returns true if the thread is joinable, in other words, if the thread of execution associated to the called object is running.
bool joinable() const noexcept;
- join()
- => Blocks the current thread waiting for the completion of the called object thread. For instance, calling threadA.join() will block the current thread waiting until the thread of execution of threadA object finishes.
void thread::join();
- detach()
- => Detaches the associated thread of execution from a given std::thread object. After this function is called, it is no longer possible to control the detached thread of execution or joining it (waiting for its completion). Then, the std::thread object no longer represents the detached execution thread.
- => A thread of execution that was detached is also called daemon thread.
void thread::detach();
- id()
- => Returns the unique ID indentifier number for each thread.
int thread::id () noexcept;
- hardware_concurrency()
- => Returns an estimate for the number of threads that can be run in parallel. The result is often equal to the number of logical CPU cores.
static unsigned int thread::hardware_concurrency() noexcept;
1.6 Functions of namespace std::this_thread
Utilities functions for currrent thread of execution:
Header:
Function Documentation:
Signature of functions in namespace this_thread.
// Namespace: std::this_thread. namespace std { namespace this_thread { // Returns the id of the current thread. std::thread::id get_id() noexcept; // Provides a hint to the implementation to reschedule the // execution of threads, allowing other threads to run. void yield() noexcept; // Blocks the execution of the current thread for at least the specified sleep_duration. template< class Rep, class Period > void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration ); // Blocks the execution of the current thread until specified // sleep_time has been reached. template< class Clock, class Duration > void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time ); } }
1.7 Thread - std::thread usage and synchronization primitives
1.7.1 Race condition
When the a race condition happens, the outcome of the computation with a shared resource depends precisely on the order of execution of the threads. Race condition bugs are hard to trace and debug. The solution to this flaw is to coordinate the thread access to shared resources through synchronization primitives, namely, mutex (mutual exclusion locks), atomic variables, and so on.
Most common types of shared resources:
- Global variable or objects such as: std::cout, std::cerr, std::cin
- Shared variables between threads
- Singleton objects - class with an unique global instance.
C++ Standard definition about Data Race (aka race condition):
The execution of a program contains a data race if it conains two potential concurrent conflicting actions, at least one of which is not atomic, and neither happens before the other, except of the special case for singnal handlers described below. Any such data race results in undefined behavior.
C++ Standard about Undefined Behavior:
A conforming implementation executing a well-formaed program shall reproduce the same observable behavior as one of the possible executions of the corresponding instances of the abstract machien with the smae program and the same input. However, if any such exeuction contains an undefined operation, this International Standard places no requirement on the implementation executing that program with that input.
Example about race condition:
Race condition:
- File: race_condtion.cpp
#include <iostream> #include <thread> #include <chrono> #include <thread> #include <vector> struct Worker { int& acc; Worker(int& acc): acc(acc) { } void operator()(int x) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); acc = acc + x * x; } }; int main() { int result = 0; std::vector<std::thread> thread_list{}; for(int i = 1; i <= 10; i++) { thread_list.push_back( std::thread{Worker(result), i} ); } for(auto& t: thread_list) { t.join(); } std::cout << " result = " << result << std::endl; return 0; }
Building:
$ g++ thread1.cpp -o thread1.bin -std=c++1z -Wall -Wextra -O0 -g -lpthread
Running:
- The expected result is 385. However, the program sometimes yields an incorrect result due to a race condition bug (aka data race).
$ ./thread1.bin result = 385 $ ./thread1.bin result = 384 $ ./thread1.bin result = 385 $ ./thread1.bin result = 368 $ ./thread1.bin result = 385 ./thread1.bin result = 376
1.7.2 Debugging Race Conditions with CLang's Sanitizers
It is well known that Race condition (aka data races) are hard to reproduce, trace and debug due to the nondeterminism of concurrent systems and the result in race condition be dependent on the order that threads were run. For instance, multiple executions with the same inputs may lead to different outcomes.
Clang compiler has some sanitizers and static analysis tools that helps to debug undefined behavior, memory and race condition.
Clang Sanitizer Tools:
- Thread Safety Analysis (Static analysis tool => compiler warning)
- Compiler switch: -Wthread-safety
- Warns about potential race conditions in the code at compile-time.
- Thread Sanitizer
- Compiler switch: -fsanitize=thread
- Can detect:
- Race conditions or data races at runtime.
- Address Sanitizer
- Compiler switch: -fsanitize=address
- Can detect:
- Out-of-bounds accesses to heap, stack and globals
- Use-after-free
- Use-after-return (runtime flag ASAN_OPTIONS=detect_stack_use_after_return=1)
- Double-free, invalid free
- Memory leaks (experimental)
- Undefined Behavior Sanitizer
- Compiler switch: -fsanitize=undefined
- Can detect:
- Using misaligned or null pointer
- Signed integer overflow
- Conversion to, from, or between floating-point types which would overflow the destination.
- Memory Sanitizer
- Compiler switch: -fsanitize=memory
- Can detect:
- Uninitialized reads (reading of unitialized variables)
Example: Using Clang sanitizer tools for race condition detection:
- File: flawed-program.cpp
#include <iostream> #include <thread> #include <chrono> #include <thread> #include <vector> struct Worker { int& acc; Worker(int& acc): acc(acc) { } void operator()(int x) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); acc = acc + x * x; } }; int main() { int result = 0; std::vector<std::thread> thread_list{}; for(int i = 1; i <= 10; i++) { thread_list.push_back( std::thread{Worker(result), i} ); } for(auto& t: thread_list) { t.join(); } std::cout << " result = " << result << std::endl; return 0; }
Building:
# Compiler swiches: # =>> -O0 Disable optimization # =>> -g Enable debug building # =>> -Wthread-safety Enable Clang compiler warning for race conditions # =>> -fsanitize=thread Enable thread sanitizer # =>> -fsanitize=undefined Eanble undefined behavior sanitizer $ clang++ flawed-program.cpp -o out.bin -O0 -g -lpthread -Wall -Wextra -pedantic \ -Wthread-safety \ -fsanitize=thread \ -fsanitize=undefined
Running:
$ ./out.bin ================== WARNING: ThreadSanitizer: data race (pid=22979) Read of size 4 at 0x7ffdb2ae1b78 by thread T2: #0 Worker::operator()(int) /Users/dummy/projects/cmake-experiment/flawed-program.cpp:16:17 (out.bin+0x4b7516) #1 void std::__invoke_impl<void, Worker, int>(std::__invoke_other, Worker&&, int&&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/bits/invoke.h:60:14 (out.bin+0x4b7421) #2 std::__invoke_result<Worker, int>::type std::__invoke<Worker, int>(Worker&&, int&&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/bits/invoke.h:95:14 (out.bin+0x4b7026) #3 _ZNSt6thread8_InvokerISt5tupleIJ6WorkeriEEE9_M_invokeIJLm0ELm1EEEEDTclsr3stdE8__invokespcl10_S_declvalIXT_EEEEESt12_Index_tupleIJXspT_EEE /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:244:13 (out.bin+0x4b6eb6) #4 std::thread::_Invoker<std::tuple<Worker, int> >::operator()() /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:253:11 (out.bin+0x4b6ca8) #5 std::thread::_State_impl<std::thread::_Invoker<std::tuple<Worker, int> > >::_M_run() /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:196:13 (out.bin+0x4b5fe7) #6 <null> <null> (libstdc++.so.6+0xbf5c2) Previous write of size 4 at 0x7ffdb2ae1b78 by thread T1: #0 Worker::operator()(int) /Users/dummy/projects/cmake-experiment/flawed-program.cpp:16:15 (out.bin+0x4b75c6) #1 void std::__invoke_impl<void, Worker, int>(std::__invoke_other, Worker&&, int&&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/bits/invoke.h:60:14 (out.bin+0x4b7421) #2 std::__invoke_result<Worker, int>::type std::__invoke<Worker, int>(Worker&&, int&&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/bits/invoke.h:95:14 (out.bin+0x4b7026) #3 _ZNSt6thread8_InvokerISt5tupleIJ6WorkeriEEE9_M_invokeIJLm0ELm1EEEEDTclsr3stdE8__invokespcl10_S_declvalIXT_EEEEESt12_Index_tupleIJXspT_EEE /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:244:13 (out.bin+0x4b6eb6) #4 std::thread::_Invoker<std::tuple<Worker, int> >::operator()() /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:253:11 (out.bin+0x4b6ca8) #5 std::thread::_State_impl<std::thread::_Invoker<std::tuple<Worker, int> > >::_M_run() /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:196:13 (out.bin+0x4b5fe7) #6 <null> <null> (libstdc++.so.6+0xbf5c2) As if synchronized via sleep: #0 nanosleep <null> (out.bin+0x43565e) #1 void std::this_thread::sleep_for<long, std::ratio<1l, 1000l> >(std::chrono::duration<long, std::ratio<1l, 1000l> > const&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:379:9 (out.bin+0x4b7798) #2 Worker::operator()(int) /Users/dummy/projects/cmake-experiment/flawed-program.cpp:15:11 (out.bin+0x4b74ef) #3 void std::__invoke_impl<void, Worker, int>(std::__invoke_other, Worker&&, int&&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/bits/invoke.h:60:14 (out.bin+0x4b7421) #4 std::__invoke_result<Worker, int>::type std::__invoke<Worker, int>(Worker&&, int&&) /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/bits/invoke.h:95:14 (out.bin+0x4b7026) #5 _ZNSt6thread8_InvokerISt5tupleIJ6WorkeriEEE9_M_invokeIJLm0ELm1EEEEDTclsr3stdE8__invokespcl10_S_declvalIXT_EEEEESt12_Index_tupleIJXspT_EEE /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:244:13 (out.bin+0x4b6eb6) #6 std::thread::_Invoker<std::tuple<Worker, int> >::operator()() /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:253:11 (out.bin+0x4b6ca8) #7 std::thread::_State_impl<std::thread::_Invoker<std::tuple<Worker, int> > >::_M_run() /usr/bin/../lib/gcc/x86_64-redhat-linux/8/../../../../include/c++/8/thread:196:13 (out.bin+0x4b5fe7) #8 <null> <null> (libstdc++.so.6+0xbf5c2) Location is stack of main thread. Location is global '??' at 0x7ffdb2ac4000 ([stack]+0x00000001db78) Thread T2 (tid=22982, running) created by main thread at: #0 pthread_create <null> (out.bin+0x425eba) #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xbf8a8) #2 main /Users/dummy/projects/cmake-experiment/flawed-program.cpp:27:34 (out.bin+0x4b3304) Thread T1 (tid=22981, finished) created by main thread at: #0 pthread_create <null> (out.bin+0x425eba) #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xbf8a8) #2 main /Users/dummy/projects/cmake-experiment/flawed-program.cpp:27:34 (out.bin+0x4b3304) SUMMARY: ThreadSanitizer: data race /Users/dummy/projects/cmake-experiment/flawed-program.cpp:16:17 in Worker::operator()(int) ================== result = 304 ThreadSanitizer: reported 1 warnings
The santizer found a data race in the line 16 and column 17 of the file: flawed-program.cpp
SUMMARY: ThreadSanitizer: data race /Users/... ..../flawed-program.cpp:16:17 in Worker::operator()(int)
The data race is in the following line where a reference to a variable is modified by multiple threads without any lock or mutex.
acc = acc + x * x;
1.7.3 Mutex solution
The race condition can be solved by using mutex - mutual exclusion synchronization primitive which allows only a single thread at a time to access the critical section, portion of the code with a shared resource.
File: thread2.cpp
#include <iostream> #include <thread> #include <chrono> #include <thread> #include <vector> #include <mutex> using namespace std::chrono_literals; struct Worker { int& acc; // Requires <mutex> header std::mutex& m; Worker(int& acc, std::mutex& m): acc(acc), m(m) { } void operator()(int x) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); // --- Start of critical section ---- // m.lock(); // Curren thread acquire locks acc = acc + x * x; m.unlock(); // Current thread releases lock // --- End of critical section ---- // } }; int main() { // Shared resource int result = 0; std::mutex m; std::vector<std::thread> thread_list{}; for(int i = 1; i <= 10; i++) { thread_list.push_back( std::thread{Worker(result, m), i} ); } for(auto& t: thread_list) { t.join(); } std::cout << " result = " << result << std::endl; return 0; }
Building:
$ g++ thread2.cpp -o thread2.bin -std=c++1z -Wall -Wextra -O0 -g -lpthread
Running:
- The computation becomes reproducible and predictable due to the mutex allow only a single thread at atime access the shared resource (variable result).
$ ./thread2.bin result = 385 $ ./thread2.bin result = 385 $ ./thread2.bin result = 385 $ ./thread2.bin result = 385 $ ./thread2.bin result = 385
Note: The current code is not exception safe and error prone, as a result if an exception happens or if the lock releasing code is missing, the outcome will be a deadlock. It is better to use the std::mutex_guard which is an RAII (Resource Acquisition Is Initialization) wrapper for locks. When the mutex guard object is constructed, the current thread acquires the lock and when the guard goes out of scope, the mutex lock is released. So, by using a scope_guard, the code becomes:
void operator()(int x) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); // --- Start of critical section ---- // // Acquires lock std::lock_guard<std::mutex> mutex_guard(m); acc = acc + x * x; // --- End of critical section ---- // } // Releases lock here, when the mutex_guard goes out of scope and is destroyed.
1.7.4 Atomic variable solution
Another way to solve the race condition (aka data race) problem is using atomic variables and atomic operations.
File: thread3.cpp
#include <iostream> #include <thread> #include <chrono> #include <thread> #include <vector> #include <mutex> #include <atomic> using namespace std::chrono_literals; struct Worker { std::atomic<int>& acc; // Requires <mutex> header std::mutex& m; Worker(std::atomic<int>& acc, std::mutex& m) : acc(acc), m(m) { } void operator()(int x) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); acc += x * x; } }; int main() { // Shared resource std::atomic<int> result = 0; std::mutex m; std::vector<std::thread> thread_list{}; for(int i = 1; i <= 10; i++) { thread_list.push_back( std::thread{Worker(result, m), i} ); } for(auto& t: thread_list) { t.join(); } std::cout << " result = " << result << std::endl; return 0; }
Output:
$ ./thread3.bin result = 385 $ ./thread3.bin result = 385 $ ./thread3.bin result = 385 $ ./thread3.bin result = 385
1.8 Thread - Returning values from std::thread
1.8.1 Return value from thread
There is no way to return a value from a std::thread computaiton, if a function used for instantiating std::thread has any returning value it is ignored.
Example:
double heavy_computation(doube input) { // do something .... heavy calculation return output; } // Return value ignored. auto th = std::thread( &heavy_computation, 10); // ... ... ... th.join();
The workaround for returning a value out of std::thread is to set a variable defined outside of the thread or set a parameter passed as pointer or reference.
- Return value from thread by setting a variable defined outside of the thread.
double output = 0.0; // Return value ignored. auto th = std::thread( &[&] { // perform heavy computation // .... /* set output */ output = .... }, 10); // ... ... ... th.join(); printf(" output = %f", output);
- Return value from thread by setting the function parameters passed by reference or pointer.
void heavy_computation(doube input, double& output1, double* output2) { // do something .... heavy calculation output1 = .... ; *output2 = "Something else ...."; } double output1 = 0.0; std::string output2 = ""; auto th = std::thread(&heavy_computation, std::ref(output1), &output2); th.join(); std::cout << " output1 = " << output1 " ; " << " output2 = " << output2 << "\n";
1.8.2 Catch exceptions in threads
If any exception is thrown in a std::thread, the exception is not propagated to the try … catch block outside of the thread, instead the C++ runtime calls std::terminate causing abonormal termination.
Example: code does not work => calls std::terminate
auto do_something = [](int input) { // .... .... // if(input < 0 ){ throw std::logic_error("Invalid input"); } //.... ... ... ... // }; // DOES NOT WORK =>>> C++ runtime calls std::terminate !!! try { auto th = std::thread(do_something, -10); } catch(std::logic_error& ex) { std::cout << " Error: " << ex.what() << "\n".; }
Example: Solution
std::exception_ptr exptr = nullptr; auto do_something = [&](int input) { try { // .... .... // if(input < 0 ){ throw std::logic_error("Invalid input"); } //.... ... ... ... // // Catch all exceptions } catch(...) { exception_ptr = std::current_exception(); } }; try { auto th = std::thread(do_something, -10); th.join(); std::rethrow_exception(exptr); } catch(std::logic_error& ex) { std::cout << " Error: " << ex.what() << "\n".; }
1.8.3 Example: Returning values and catching exceptions
File: return-thread.cpp
// Experiment returning results from threads //--------------------------------------------- #include <iostream> #include <chrono> #include <thread> #include <vector> #include <functional> using namespace std::chrono_literals; namespace cr = std::chrono; struct Timer { decltype(cr::steady_clock::now()) start; Timer(){ start = cr::steady_clock::now(); } ~Timer() { auto end = cr::steady_clock::now(); auto duration = cr::duration_cast<cr::seconds>(end - start); std::cout << " [TRACE] Elapsed time: " << duration.count() << std::endl; } }; double heavy_computation(int input) { using namespace std::string_literals; std::this_thread::sleep_for(4s); if(input < 0) { throw std::runtime_error("Error: invalid input: "s + std::to_string(input)); } return input * 3.815 + 5.6; } int main(int argc, char* argv[]) { std::vector<int> inputs{2, 10, 5}; std::vector<double> results(3); std::vector<std::thread> threads; threads.reserve(3); // Note: A std::thread will never return anything from a function // passed as argument, even if has non-void return type. // The only to return a value from a thread is to set an // argument passed by pointer or reference or set a global object. auto thread_adapter = [&](size_t index, int input) { results[index] = heavy_computation(input); }; std::puts("\n ===== Experiment 1 - Return value from threads (Parallel Computing) ==== \n"); { // Shows elapsed time at end of this scope auto timer = Timer{}; for(size_t i = 0; i < 3; i++) { threads.push_back( std::thread(thread_adapter, i, inputs[i]) ); } std::puts(" [INFO] Waiting for thread completion"); // Wait for the completion of all threads for(size_t i = 0; i < 3; i++) { threads[i].join(); } // Show results for(size_t i = 0; i < 3; i++) { std::printf(" Result[%lu] = %f\n", i, results[i]); } } ///---------------------------------------------------------------// std::puts("\n ===== Dealing with exceptions from threads ==== \n"); double output = 0.0; std::exception_ptr exptr = nullptr; auto thread_adapter2 = [&](int input) { std::puts(" [TRACE] Inside thread_adapter2"); // Note: if an exception is not caught, // it will not be propagated to the parent thread. // Instead, the C++ runtime will call std::terminate // causing abonormal temrination. try { output = heavy_computation(input); } catch (...) { // Catch all exceptions from the computation that // should run in a new thread and set a shared memory variable // set only once by this thread. exptr = std::current_exception(); } }; auto th = std::thread(thread_adapter2, -10); std::puts(" [TRACE] Waiting thread_adapter2 thread termination."); th.join(); if(!exptr) { std::cout << " [INFO] Result of thread_adapter2 is: " << output << "\n"; } else { try { std::rethrow_exception(exptr); } catch (std::runtime_error& err) { std::cout << " [ERROR] " << err.what() << "\n"; } } return 0; }
Building:
$ g++ return-thread.cpp -o out.bin -std=c++1z -O0 -g -lpthread
Running:
$ ./out.bin ===== Experiment 1 - Return value from threads (Parallel Computing) ==== [INFO] Waiting for thread completion Result[0] = 13.230000 Result[1] = 43.750000 Result[2] = 24.675000 [TRACE] Elapsed time: 4 ===== Dealing with exceptions from threads ==== [TRACE] Waiting thread_adapter2 thread termination. [TRACE] Inside thread_adapter2 [ERROR] Error: invalid input: -10
1.9 Thread Local Variables and TLS - Thread Local Storage
The keyword thread local, added in C+11, ensures that a new instance of a global or static variable is created for every thread using the variable.
- A variable with this annotation is implicitly static and has thread _storage duration, its lifetime is bound to the thread lifetime.
- Every using a variable annotated as thread local has its own copy of it, therefore any modification to the variable inside the thread will not have any effect for other threads.
- Allocation and memory release:
- Variables declared with thread_local storage class specifier are allocated on a per-thread base, instantiated when the thread is created and destroyed when its thread terminates.
- Thread local variables can be declared:
- Globally
- In namespaces
- As class static member variable
- Inside functions. It has the same effect as variables allocated with static keyword that persist the variable value between function calls.
- Thread local storage is implemented by the operating system and is
o.s. specific. The thrad_local keyword provide a uniform way to use
thread local storage (TLS) capabilities of many operating systems
and avoid compiler-specific language extensions for accessing the
TLS feature.
- Windows API TLS: TlsSetValue(), TlsGetValue(), TlsFree() …
- MSVC Compiler language extension: __declspec(thread)
- GCC Compiler language extension for TLS: __thread int var = 0;
- If use of a C++11 compiler is not possible, the Boost library provides the container boost:thread_specific_ptr which provides portable thread local storage.
Example:
File: thread-local.cpp
#include <iostream> #include <thread> #include <mutex> #include <cassert> thread_local int tls_variable = 100; std::mutex cout_mutex; void worker_function(int offset) { tls_variable = tls_variable + offset; auto guard = std::lock_guard<std::mutex>(cout_mutex); std::cout << " [INFO] Thread ID = " << std::this_thread::get_id() << " ; &tls_variable = " << &tls_variable << " ; VALUE(tls_variable) " << tls_variable << std::endl; } // Note: 'thread_local' is similar to declare 'static int n = 0' // that persist the variable n state between function // calls. However, unlike 'static', the thread_local annotation // ensure that there will be a copy of the variable 'n' for // every thread. void log_counter(bool show_counter, const char* thread_name) { // Same as declaring: 'static thread_local int n = 0;' thread_local int n = 0; n++; if(show_counter){ auto guard = std::lock_guard<std::mutex>(cout_mutex); std::cout << " [LOGGER] Called by THREAD_ID = " << std::this_thread::get_id() << " ; THREAD_NAME = " << thread_name << " ; ADDRESS(n) = " << &n << " ; COUNTER = " << n << "\n"; } } int main(int argc, char** argv) { using namespace std::chrono_literals; std::cout << "\n=== EXPERIMENT A ========================================\n"; // Only modified for the main thread tls_variable = 25; std::cout << " [INFO] Before => MAIN Thread ID = " << std::this_thread::get_id() << " ; &tls_variable = " << &tls_variable << " ; VALUE(tls_variable) " << tls_variable << std::endl; auto t1 = std::thread(worker_function, 15); auto t2 = std::thread(worker_function, 20); auto t3 = std::thread(worker_function, 90); t1.join(); t2.join(); t3.join(); std::cout << " [INFO] After => MAIN Thread ID = " << std::this_thread::get_id() << " ; &tls_variable = " << &tls_variable << " ; VALUE(tls_variable) " << tls_variable << std::endl; assert(tls_variable == 25); std::cout << "\n=== EXPERIMENT B == Thread local storage and functions ====\n"; auto th1 = std::thread([]{ for(int i = 0; i < 10; ++i) log_counter(false, "th1"); std::this_thread::sleep_for(2s); log_counter(true, "th1"); }); auto th2 = std::thread([]{ for(int i = 0; i < 20; ++i) log_counter(false, "th2"); std::this_thread::sleep_for(1s); log_counter(true, "th2"); }); log_counter(false, "main"); log_counter(false, "main"); log_counter(false, "main"); log_counter(true, "main"); th1.join(); th2.join(); return 0; }
Building:
$ clang++ thread-local.cpp -o out.bin -std=c++1z -O0 -g -lpthread -Wall -Wextra
Running:
$ ./out.bin === EXPERIMENT A ======================================== [INFO] Before => MAIN Thread ID = 140391161767744 ; &tls_variable = 0x7faf5d53a738 ; VALUE(tls_variable) 25 [INFO] Thread ID = 140391143909120 ; &tls_variable = 0x7faf5c4326f8 ; VALUE(tls_variable) 115 [INFO] Thread ID = 140391135516416 ; &tls_variable = 0x7faf5bc316f8 ; VALUE(tls_variable) 120 [INFO] Thread ID = 140391127123712 ; &tls_variable = 0x7faf5b4306f8 ; VALUE(tls_variable) 190 [INFO] After => MAIN Thread ID = 140391161767744 ; &tls_variable = 0x7faf5d53a738 ; VALUE(tls_variable) 25 === EXPERIMENT B == Thread local storage and functions ==== [LOGGER] Called by THREAD_ID = 140391161767744 ; THREAD_NAME = main ; ADDRESS(n) = 0x7faf5d53a73c ; COUNTER = 4 [LOGGER] Called by THREAD_ID = 140391135516416 ; THREAD_NAME = th2 ; ADDRESS(n) = 0x7faf5bc316fc ; COUNTER = 21 [LOGGER] Called by THREAD_ID = 140391127123712 ; THREAD_NAME = th1 ; ADDRESS(n) = 0x7faf5b4306fc ; COUNTER = 11
References:
- Storage class specifiers - cppreference.com
- Storage classes (C++) | Microsoft Docs
- Thread Local Storage (TLS) | Microsoft Docs
- WG21 - N2659 Thread-Local Storage (C++ Standard Proposal)
- "Proposal that defines the thread_local storage class specifier for C++11."
- TLS - Thread Local Storage - sireeshajakku
- Data that is private to a thread
- The Win32 Windows Thread Local Storage Program Example
- Chapter 8 Thread-Local Storage (Linker and Libraries Guide)
Further Reading:
- TLS performance overhead and cost on GNU/Linux
- Thread-Local-Storage Benchmark | Timj’s bits and tests
- The hidden performance cost of accessing thread-local variables |Intel® Software
- Avoiding Heap Allocations With Static Thread Locals – C++ on a Friday
- Multi Instance Thread Local Storage - CodeProject
Questions:
1.10 Task-based APIs => Futures and Promises
1.10.1 Futures overview
Overview:
- The std::future class encapsulates a value that will eventually become available from an asynchronous computation.
- This class makes easier to run functions or anything callable that returns value in a new thread and get this value without any global variables. Another benefit is that is much easier to handle exceptions from another thread than the class std::thread.
- Use cases:
- Short computations that returns a value such as a network request.
- Implement parallel algorithms and parallel computations.
- Not good for:
- Long running thread, threads that run in an infinite loop or worker thread.
- Problems:
- Many other implementations of futures run the tasks in a thread-pool, a pre-allocated set of threads used for avoiding spawning too much threads that would increase the memory footprint, reducing the performance and throughput.
- The C++ standard library's std::future API does not use thread pools.
Documentation
Other Implementations of Futures and Promises
- boost::future
- folly::Future
- stlab::future
- transwarp - A header-only C++ library for task concurrency.
- hpx::future
1.10.2 Futures Usage
The function std::async is used for creating std::future objects out of functions that return values.
// Note: simplified pseudo-signature => // Overload 1: template<typename Callable, typename ... Args> auto async( Function&& f, Args&&... args ) -> std::future<Return>; // Overload 2: template<typename Return, typename Callable, typename ... Args> auto async( std::launch policy, Function&& f, Args&&... args ) -> std::future<Return>;
Usage example:
- The object futA represents the return value or the result of the computation heavy_computation that is run in a new thread not blocking the thread that called std::async.
- The parameter std::launch::async => or launch policy is necessary to run the computation in a new thread.
double heavy_computation(int size, std::string label, bool flag) { // Sleep blocking the current thread for 10 seconds. std::this_thread::sleep_for(10s); // ... ... ... .. return output; } std::future<double> futA = std::async(std::launch::async, heavy_computation, 100, "calc1", false); auto futB = std::async(std::launch::async, [](int n) { // .... heavy CPU bound computation ... // double output = ...; ... ... ... return output; }, 200);
- std::future<T>::get()
- The member function .get() from std::future blocks the current thread (similar to std::thread::join) waiting for the completion of the computation and then returns its result.
// Block current thread - similar to std::thread::join() double resultA = futA.get(); double resultB = futB.get(); std::cout << " resultA = " << resultA << " ; " << resultB << "\n";
- std::future<T>::wait()
- The member function .wait() blocks the current thread waiting the std::future's thread termination.
// Wait for the computation completion blocking current thread. futB.wait();
- Catching exceptions.
double computation(int input) { ... .... ... ... if(bad_input(input)) { throw std::runtime_error("Invalid input, try again ..."); } ... .... ... ... return result; } // Type: std::future<double> auto afuture = std::async(std::launch::async, computation, 10); try { double result = afuture.get(); std::cout << " Result = " << result << "\n"; } catch(std::runtime_error& ex) { std::cout << " [ERROR] " << ex.what() << "\n": }
- Run computations in parallel
double computation(int input) { ... .... ... ... // Sleep blocking this thread for 8 seconds // for simulating a heavy CPU-bound computation. std::this_thread::sleep_for(8s); ... .... ... ... return result; } std::vector<std::future<double>> futures; futures.reserver(3); std::vector<int> inputs {10, 20, 25}; std::vector<double> outputs; for(auto& x: inputs){ futures.push_back( std::async(std::launch::async, computation, x); ) }; // Block current thread waiting all computations finish. // If every computation takes 8 seconds, the total time waiting // will be 8 seconds since they are all run in parallel. for(auto& fut: futures) { double x = fut.get(); outputs.push_back(x); std::cout << "Result = " << x << "\n"; }
1.10.3 Example
File: future1.cpp
#include <iostream> #include <thread> #include <chrono> #include <queue> #include <vector> // --- Concurrency Headers ---- // #include <thread> #include <mutex> #include <future> #include <type_traits> #include <optional> namespace cr = std::chrono; using namespace std::chrono_literals; #define LOG_FUNCTION_ENTRY() \ std::cerr << " [TRACE-ENTRY] Function = " << __FUNCTION__ \ << " ; threadID " << std::this_thread::get_id() << std::endl struct TimeCounter { decltype(std::chrono::steady_clock::now()) start; TimeCounter(){ start = cr::steady_clock::now(); } void report(std::string const& label) { auto end = cr::steady_clock::now(); auto duration = cr::duration_cast<cr::seconds>(end - start); std::cout << " [TRACE] Elapsed time: {" << label << "} = " << duration.count() << std::endl; } }; std::mutex mu; double expensive_computationA(int n) { { /* --- Start of critical section --- */ // Protect std::cout from race condition std::lock_guard<std::mutex> guard(mu); LOG_FUNCTION_ENTRY(); } /* --- End of critical section --- */ std::this_thread::sleep_for(4s); return 4.0 * n + 5; } double expensive_computationB(int n) { { /* --- Start of critical section --- */ // Protect std::cout from race condition std::lock_guard<std::mutex> guard(mu); LOG_FUNCTION_ENTRY(); } /* --- End of critical section --- */ std::this_thread::sleep_for(8s); return 2.515 * n + 15.8714; } std::string computatio_with_exception(int n) { LOG_FUNCTION_ENTRY(); std::this_thread::sleep_for(1s); if(n < 0) { throw std::logic_error(" Error: not allowed negative N"); } return "Hello world"; } int main(int argc, char** argv) { std::cout << " Number of hardware threads = " << std::thread::hardware_concurrency() << std::endl; #if 1 LOG_FUNCTION_ENTRY(); std::puts("\n ==== EXPERIMENT 1 >> without std::future => Run Serially ==== "); std::puts("-----------------------------------------------------------------\n"); { TimeCounter tc; double resultA = expensive_computationA(10); double resultB = expensive_computationB(10); std::cout << " [TRACE] Completed =>> resultA = " << resultA << " ; resultB = " << resultB << std::endl; tc.report("EXPERIMENT1"); } std::puts("\n ==== EXPERIMENT 2 >> with std::futures - Run in Parallel ===== "); std::puts("-----------------------------------------------------------------\n"); { TimeCounter tc; // Function runs in another thread when the future object // is instantiated. // The default policy of std::async is (async | deferred) which // would let the C++ runtime decide when run the thread. std::future<double> futA = std::async(std::launch::async , &expensive_computationA, 10); // Function runs only when the std::future<T>::get() method is called. // The function expensive_computationB is not run at the moment futB object // is created. auto futB = std::async(std::launch::async, expensive_computationB, 10); // Protect std::cout from race condition (aka data race) { std::lock_guard<std::mutex> guard(mu); std::cout << " [TRACE] Waiting futures results " << std::endl; } // Calls to .get() method blocks the current thread and waits // for the completion of the computations wrapped in the future object. double resultB = futB.get(); double resultA = futA.get(); std::cout << " [TRACE] Completed =>> resultA = " << resultA << " ; resultB = " << resultB << std::endl; tc.report("EXPERIMENT2"); // The method can only be used only once!! // otherwise, it calls std::termiante causing abnormal termination. //--------------------------------- // double resultAA = futA.get(); // DO NOT! } std::puts("\n ==== EXPERIMENT 3 >> Exceptions handling ====================== "); std::puts("-----------------------------------------------------------------\n"); { std::cout << "Future Created" << "\n"; auto fut1 = std::async(computatio_with_exception, -10); try { auto result1 = fut1.get(); std::cout << " [INFO] Result1 = " << result1 << "\n"; } catch(std::logic_error& ex) { std::cerr << " [ERROR] " << ex.what() << "\n"; } } #endif return 0; }
Building:
clang++ future1.cpp -o future1.bin -std=c++1z -lpthread -Wall -Wextra -O0 -g
Output:
$ ./future1.bin
Number of hardware threads = 4
==== EXPERIMENT 1 >> without std::future => Run Serially ====
-----------------------------------------------------------------
[TRACE-ENTRY] Function = main ; threadID 139662702782272
[TRACE-ENTRY] Function = expensive_computationA ; threadID 139662702782272
[TRACE-ENTRY] Function = expensive_computationB ; threadID 139662702782272
[TRACE] Completed =>> resultA = 45 ; resultB = 41.0214
[TRACE] Elapsed time: {EXPERIMENT1} = 12
==== EXPERIMENT 2 >> with std::futures - Run in Parallel =====
-----------------------------------------------------------------
[TRACE] Waiting futures results
[TRACE-ENTRY] Function = expensive_computationA ; threadID 139662684923648
[TRACE-ENTRY] Function = expensive_computationB ; threadID 139662676530944
[TRACE] Completed =>> resultA = 45 ; resultB = 41.0214
[TRACE] Elapsed time: {EXPERIMENT2} = 8
==== EXPERIMENT 3 >> Exceptions handling ======================
-----------------------------------------------------------------
Future Created
[TRACE-ENTRY] Function = computatio_with_exception ; threadID 139662684923648
[ERROR] Error: not allowed negative N
1.11 Condition Variables and Producer Consumer Problem
- A condition variable is a synchronization primitive which allows one or more threads to wait for a event, signal from another thread, without wasting CPU cycles.
- Mechanism: several threads wait on a condition variable, until another thread notifies this synchronization primitive.
- Note: Condition variables do not provide locking such as Mutexes, so they must be used alongside condition variables in order to avoid race conditions.
- Operations of condition Variables:
- ConditionVar.wait(MutexLock)
- => Makes current thread (waiting thread) sleep waiting from a signal
- ConditionVar.notify_one()
- => Wakes up a waiting thread. If there is no waiting thread, the operation does nothing.
- ConditionVar.notify_all()
- => Wake up all waiting threads.
- ConditionVar.wait(MutexLock)
Condition Variable Method Signatures
class condition_variable { public: condition_variable(); ~condition_variable(); condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete; /** Wake up only one sleeping thread */ void notify_one() noexcept; /** Wake up all sleeping threads that are waiting for this signal */ void notify_all() noexcept; /** Make thread which alls this method sleep (wait for signal) * without waste CPU cycles */ void wait(unique_lock<mutex>& lock); template <class Predicate> void wait(unique_lock<mutex>& lock, Predicate pred); template <class Clock, class Duration> cv_status wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time); template <class Clock, class Duration, class Predicate> bool wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time, Predicate pred); template <class Rep, class Period> cv_status wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time); template <class Rep, class Period, class Predicate> bool wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time, Predicate pred); typedef implementation-defined native_handle_type; native_handle_type native_handle(); };
Example: Usage of Conditions Variables in Producer/Consumer problem
The producer consumer problem is classical synchronization problem where a producer puts data into a data structure and another thread, called consumer, removes data from the data structure. Only a single thread should be able to access the data structure at any atime.
Sample implementation: https://gist.github.com/iikuy/8115191
Example:
- File: producer-consumer.cpp
#include <iostream> #include <thread> #include <chrono> #include <queue> #include <vector> // --- Concurrency Headers ---- // #include <thread> // threads #include <mutex> // mutex, lock_guard, unique_lock #include <future> // conditional_variables using namespace std::chrono_literals; int main(int argc, char** argv) { std::cout << " Number of hardware threads = " << std::thread::hardware_concurrency() << "\n\n"; std::condition_variable cond; std::mutex m; std::queue<double> buffer; bool finished = false; auto producer_thread = std::thread([&] { std::cout << " [PRODUCER] Producer thread started." << "\n"; for(int i = 0; i < 5; i++) { std::this_thread::sleep_for(1s); { // -- start of critical section ----// auto lock = std::lock_guard<std::mutex>{m}; double x = 5 * i + 10; std::cout << "\n [PRODUCER] Send data to buffer x = " << x << "\n"; buffer.push(x); // Mutex protects the buffer fron race condition // --- End of ciritical section ---- // } // Send signal notifying consumer thread to proceed. cond.notify_all(); } { auto lock = std::lock_guard<std::mutex>{m}; std::cout << " [PRODUCER] End of transmission" << std::endl; finished = true; } }); auto consumer_thread = std::thread([&]{ std::cout << " [CONSUMER] Consumer thread started." << "\n"; while(true) { auto lock = std::unique_lock<std::mutex>{m}; std::cout << " [CONSUMER] Waiting input " << "\n"; // The condition variable waits for cond.notify_one() signal // from the producer thread. Before this signal is sent, this // thread sleeps until receives it. cond.wait(lock, [&]{ return !buffer.empty(); }); std::cout << " [CONSUMER] Processing data ... wait" << "\n"; // Delay for simulating processing time std::this_thread::sleep_for(5s); std::cout << " [CONSUMER] Received value " << buffer.front() << "\n"; buffer.pop(); if(finished){ std::cout << " [CONSUMER] Stop consumer thread. Ok" << "\n"; break; } // Unlock in order to avoid deadlock // lock.unlock(); } }); std::cout << " [TRACE] Waiting thread completion" << "\n"; producer_thread.join(); consumer_thread.join(); return 0; }
Building and Running:
$ g++ producer-consumer.cpp -o out.bin -std=c++1z -O0 -g -Wall -lpthread
Running:
$ ./out.bin Number of hardware threads = 4 [PRODUCER] Producer thread started. [TRACE] Waiting thread completion [CONSUMER] Consumer thread started. [CONSUMER] Waiting input [PRODUCER] Send data to buffer x = 10 [CONSUMER] Processing data ... wait [CONSUMER] Received value 10 [CONSUMER] Waiting input [PRODUCER] Send data to buffer x = 15 [CONSUMER] Processing data ... wait [CONSUMER] Received value 15 [CONSUMER] Waiting input [PRODUCER] Send data to buffer x = 20 [CONSUMER] Processing data ... wait [CONSUMER] Received value 20 [CONSUMER] Waiting input [PRODUCER] Send data to buffer x = 25 [CONSUMER] Processing data ... wait [CONSUMER] Received value 25 [CONSUMER] Waiting input [PRODUCER] Send data to buffer x = 30 [PRODUCER] End of transmission [CONSUMER] Processing data ... wait [CONSUMER] Received value 30 [CONSUMER] Stop consumer thread. Ok
1.12 Messaging Passing Concurrency and Message Queue
1.12.1 Overview
Message Passing Concurrency
- In Message passing concurrency threads communicate by sending messages to each other through a message queue FIFO buffer, instead of communicating through explicit locks and mutating the shared memory.
- The mosft fundamental construct of message passing concurrency is the message queue, that allows communication between producers (aka senders) and consumers (aka receivers) threads running at different speeds.
- Message queues are also called mailbox, channels or concurrent queue.
- Message queues are used for the from implementing the following
messaging passing design patterns:
- CSP - Concurrent Sequential Process - proposed by Tony Hoare and used in GO/Golang and Clojure.
- Actor Model - Proposed by Carl Hewitt and used in Erlang and Scala's Akka.
- Active Object
- Thread Pools
Producer Consumer Problem:
- The most basic use case of message queues is solving the producer consumer/problem inter-thread communication problem where a producer and consumer thread communicates through a buffer, generally a queue. There are two scenarios, if the buffer is empty, the consumer thread gets blocked (suspended) and is only resumed when the producer puts some item in the buffer. If the queue has a limited size and is full, the producer thread gets blocked (suspended) until the consumer removes the item from the queue.
- Note: Message queues can be implemented as a class that encapsulates the queue data structure and synchronization primitives: condition variables and mutexes.
Other uses of message passing and message queues
- Message passing is not only used for inter-thread communication, this approach is also applicable for process communication through IPC - Inter Process Communication and distributed systems where multiple processes communicates over the network through sockets.
Sample Implementations of message queue
- Java:
- LinkedBlockingQueue (Bounded buffer)
- => Linked list of unlimited size that blocks/suspends the consumer thread if the queue is empty until the producer puts some element in the queue.
- ArrayBlockingQueue (Bounded buffer)
- => Fixed size queue that blocks/suspends the consumer thread (aka receiver) if the queue is empty until the producer thread (aka sender) adds an element to the queue. The producer thread gets blocked if the queue is full until the consumer thread pops an element from the queue.
- LinkedBlockingQueue (Bounded buffer)
- cameron314/concurrentqueue
- "A fast multi-producer, multi-consumer lock-free concurrent queue for C++11"
- Facebook's Folly - ProducerConsumerQueue (documentation) - Code: ProducerConsumerQueue.h
- Concurrent queue – C++11 | Juan's C++ blog
Example of Message Queue Behavior in Pseudocode:
// Message queue of fixed size 10 fixed_message_queue<double> buffer(10); auto consumer_thread = thread([&]{ while(buffer.is_closed()){ sleep(2); // Block current thread if the queue is empty double next = buffer.take(); print("Received message x = %.3f", next); } }); auto producer_thread = thread([&]{ for(double x = 0.0; x < 100.0; x += 2.0) { sleep(10); double y = 4 * x + 10; // Block current thread if the queue is full (has 10 elements) double next = buffer.put(x); print("Received message x = %.3f", next); } // Indicates that messages will no longer be sent queue.close(); });
Message Passing Concurrency in other languages
- Effective Go - The Go Programming Language - Concurrency
- On Actors and Casting | Bartosz Milewski's Programming Cafe
C++ Implementations of Message Passing (Message Queue / Actor Model and CSP)
- Asynchronous Agents Library | Microsoft Docs - PPL Library
- Mindroid.ecpp: Mindroid.ecpp - is an embedded application framework inspired by Google's Android operating system
- Writing Lock-Free Code: A Corrected Queue - Herb Sutter
- c++ - What is the best practice for passing data between threads? Queues, messages or others?
- cameron314/concurrentqueue
- "A fast multi-producer, multi-consumer lock-free concurrent queue for C++11"
- Facebook's Folly - ProducerConsumerQueue (documentation) - Code: ProducerConsumerQueue.h
- Concurrent queue – C++11 | Juan's C++ blog
- Boost Synchronized Data Structure - Synchronized Queue
- concurrent_queue Class | Microsoft Docs
- c++ - C++11 Blocking Queue learning exercise - Code Review Stack Exchange
- C++11 blocking queue using the standard library. · GitHub
- GitHub - Balnian/ChannelsCPP: C++ implementation of the Go (GoLang) Channel and Select structure
- Go channels in C++ · GitHub
1.12.2 Producer/consumer problem with messague queue
The producer consumer synchronization problem can be simplified with a message queue (aka concurrent blocking queue) which encapsulates the data structure implementation and the all synchronization promitives.
Requirements:
- The producer thread puts an element in the queue and signals the condition variable what wakes up the consumer thread.
- When the queue is empty, the consumer thread gets in sleep state and waits for some element to be put into the queue.
- The access to the queue must be thread-safe.
Parts of standard library used:
Example: Simple Blocking Queue Implementation for consumer/producer problem.
File: concurrent_queue.cpp
#include <iostream> #include <thread> #include <chrono> #include <queue> #include <vector> #include <functional> // --- Concurrency Headers ---- // #include <thread> // threads #include <mutex> // mutex, lock_guard, unique_lock #include <future> // conditional_variables using namespace std::chrono_literals; template<typename T> class MessageQueue { using guard = std::lock_guard<std::mutex>; std::queue<T> m_queue; std::condition_variable m_cvar; mutable std::mutex m_mutex; public: MessageQueue(){} // Called by producer thread void push(T& value) { auto g = guard(m_mutex); m_queue.push(value); m_cvar.notify_one(); } void push(T&& value) { auto g = guard(m_mutex); m_queue.push(value); m_cvar.notify_one(); } // Called by consumer thread. // blocks consumer thread making // it sleep when the queue is empty void pop() { auto g = guard(m_mutex); m_queue.pop(); } // Called by consumer thread T& front() { auto lock = std::unique_lock<std::mutex>(m_mutex); // The second argument of m_cvar.wait, a lambda function is // passed as predicate to prevent spurious thread wake up. m_cvar.wait(lock, [&]{ return !m_queue.empty(); }); T& result = m_queue.front(); lock.unlock(); return result; } // Called by consumer thread const T& front() const { auto lock = std::unique_lock<std::mutex>(m_mutex); m_cvar.wait(lock, [&]{ return !m_queue.empty(); }); T& result = m_queue.front(); lock.unlock(); return result; } bool empty() const { auto g = guard(m_mutex); return m_queue.front(); } std::size_t size() const { auto g = guard(m_mutex); return m_queue.size(); } }; int main(int argc, char* argv[]) { MessageQueue<double> queue; // Mutex used for protecting std::cout from frace condition std::mutex output_mutex; std::atomic_bool finished = false; auto consumer_thread = std::thread( [&] { { auto guard = std::lock_guard(output_mutex); std::cout << " [CONSUMER] Consumer thread started." << "\n"; } while(true) { auto value = queue.front(); { auto guard = std::lock_guard(output_mutex); std::cout << " [CONSUMER] Wait input ..." << "\n"; std::cout << " [CONSUMER] Received value = " << value << "\n"; } queue.pop(); if(!queue.empty() && finished) { break; } } }); // Protect cout from race condition { auto guard = std::lock_guard(output_mutex); std::cout << " [TRACE] Start interactive SHELL OK." << "\n"; } // The producer thread is the current one (main thread) for(int i = 0; i < 5; i++) { std::this_thread::sleep_for(1s); double x = i * i - 4 * i + 10.0 ; { auto guard = std::lock_guard(output_mutex); std::cout << " [PRODUCER] => Sending number: " << x << "\n"; } queue.push(x); } finished = true; consumer_thread.join(); return 0; }
Building:
$ clang++ message_queue.cpp -o out.bin -std=c++1z -O0 -Wall -g -lpthread
Running:
$ out.bin [TRACE] Start interactive SHELL OK. [CONSUMER] Consumer thread started. [PRODUCER] => Sending number: 10 [CONSUMER] Wait input ... [CONSUMER] Received value = 10 [PRODUCER] => Sending number: 7 [CONSUMER] Wait input ... [CONSUMER] Received value = 7 [PRODUCER] => Sending number: 6 [CONSUMER] Wait input ... [CONSUMER] Received value = 6 [PRODUCER] => Sending number: 7 [CONSUMER] Wait input ... [CONSUMER] Received value = 7 [PRODUCER] => Sending number: 10 [CONSUMER] Wait input ... [CONSUMER] Received value = 10
1.12.3 References
- Chapter 10 - Concurrency - Message Passing
- Reading 22: Queues and Message-Passing - MIT
- http://web.mit.edu/6.031/www/fa17/classes/22-queues/
- Note: contains "Implementing message passing with queues"
- Two models for concurrent programming - MIT
- Android Style Message Passing - Communicating between Threads using Message Queues.
- http://blog.coldflake.com/posts/Android-style-Message-Passing/
- Coverage: Message queue, active object.
- Programming with Trheads - Synchronization and Communication [PAPER]
- Producer consumer design pattern - DZone
- A Thread-Safe Message Queue
- How to use wait, nitifyAll in Java - Producer Consumer Example
- 5. Message Passing
- Paper: WG21 - C++ Concurrent Queues - Lawerence Crwol, Chris Mysen
- "Concurrent queues are a fundamental structuring tool for concurrent programs. We propose a concurrent queue concept and a concrete implementation. We propose a set of communication types that enable loosely bound program components to dynamically construct and safely share concurrent queues."
- Guide to java.util.concurrent.BlockingQueue | Baeldung
- Shows how a concurrent queue works and solves the producer/consumer problem.
- Blocking queue – Vorbrodt's C++ Blog
1.13 Thread Pools
1.13.1 Overview
- A thread pool is a widely used technique for executing many short living tasks by reusing pre-existing long running worker threads without creating and destroying too many threads what would increase the system latency and reduce its throughput.
- Use Cases:
- Run short running tasks without creating too many threads.
- Execute future/promise computations.
- Network Servers
- Web Servers
- Run parallel algorithms
- Note: NodeJS and Nginx web server uses thread pools behind the scenes for handling incoming network requests and running asynchronous computations without blocking the main thread.
- Benefits
- Reduce the overhead of creation and destruction of too many threads.
- Reuse threads.
- More lightweight task-based concurrency (future and promises).
- Implementation Parts:
- Task Queue: (Message Queue)
- A message queue which is possible to code submits tasks to.
- Tasks (aka jobs)
- In C++ a task could be represented by a function, object, callable object or lambda. In Java, the task could an object implementating the Runnable interface.
- Worker Threads
- N amount of long-running worker threads. Each thread takes a task from the task queue and get blocked when if the task queue is empty.
- Task Queue: (Message Queue)
- Functionalities of a thread pool
- Parts
- central task queue
- N worker threads
- completion queue [optional]
- Actions
- add thread
- submit task or job
- return a future object representing the return value of the taks submited to the thread pool.
- shutdown queue => stop all threads
- Parts
- Considerations:
- A suitable size for the thread pool is the total number hardware threads, number of threads that can be run in different processing units. This number is generally the number of logical CPUs (total number CPU cores).
- Thread starvation => A single CPU intensive or long running task will make a thread unable to handle other pending tasks.
- The tasks should not block any worker thread, they must be short living. Otherwise the blocked worker thread may not be able to handle other tasks from the task queue.
- It is wise to use IO and socket with timeouts in operations that may block the task queue.
- Usage for network and web servers:
- Thread pools are scalable to handle network request when used alongside non-blocking IO such as epoll on Linux, IOCP on Windows and kqueue on MacOSX and BSD-variants. Nginx web server and NodeJS uses thread pool and non blocking IO for reaching high scalability and performance.
1.13.2 Some thread pool implementations
- How to implement Thread pool in Java | Java Code Geeks - 2019 [JAVA] [BEST]
- Java implementation: Internally it uses a LinkedBlockingQueue as a task queue (message queue) and a group of worker threads taking tasks from the queue and executing them in an infinite loop. The tasks or jobs submited to the thread pool are represented by the Runnable interface.
- boost thread pool example | KeZunLin's Blog
- Thread pool built on top of Boost.Asio.
- A Thread Pool with Boost.Threads and Boost.Asio | Jakob's Devlog
- GitHub - yc2prime/ThreadPool
- A C++ thread pool implementation base on Boost.Asio.
1.13.3 Example: Simple thread pool implementation
File: thread-pool-simple.cpp
#include <iostream> #include <functional> #include <vector> #include <string> // Concurrency headers #include <deque> #include <queue> #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <chrono> #include <cassert> class ThreadPool { public: using Task = std::function<void ()>; // Create a thread pool with N worker threads ThreadPool(size_t n) { std::cout << " [TRACE] Created thread pool with N = " << n << " worker threads" << std::endl; for(size_t i = 0; i < n; ++i) { this->add_worker(); } } ThreadPool(): ThreadPool(std::thread::hardware_concurrency()){ } ~ThreadPool() { std::cout << " [TRACE] <Destructor> Waiting for termination of all threads" << std::endl; this->shutdown(); std::cout << " [TRACE] <Destructor> All Worker threads were shutdown. OK" << std::endl; } // Return the number of worker threads size_t size() const { return m_workers.size(); } // Submit a task to the thread pool void submit(Task task) { auto guard = std::lock_guard<std::mutex>{m_qmutex}; m_queue.push(task); // Wake up a single worker thread signaling it. m_qvar.notify_one(); } // Force ThreadPool shutdown void shutdown() { // Set the flag to false for indicating to worker threads that // there nor further tasks(jobs) will be submited. m_is_active = false; // Signal all threads, wake up all worker threads m_qvar.notify_all(); // Wait for completion of all threads. for(auto& worker: m_workers) { worker.join(); } // Remove all worker threads m_workers.clear(); } // Block the calling thread and waits for the completion of all tasks. void wait_tasks() { auto lock = std::unique_lock<std::mutex>{m_qmutex}; m_qempty.wait(lock, [&]{ return m_queue.empty(); } ); lock.unlock(); } // Pop a task from queue => blocking the calling thread // if the queue is empty. Task pop_task() { auto lock = std::unique_lock<std::mutex>{m_qmutex}; // Block current thread if the queue is empty or until // the condition variable is signaled from the queue // thread. m_qvar.wait(lock, [&]{ return !m_queue.empty(); }); auto task = m_queue.front(); m_queue.pop(); // Wake up thread that called .wait_tasks() if(m_queue.empty()){ m_qempty.notify_all(); } // lock.unlock(); return task; } // Add new worker thread void add_worker() { auto th = std::thread([this]{ while(true) { // std::puts(" [INFO] Waiting task"); auto task = pop_task(); task(); // Shutdown worker thread when the flag m_is_active // is set to false and the queue is empty auto guard = std::lock_guard<std::mutex>{m_qmutex}; if(!m_is_active && m_queue.empty()){ return; } } }); std::cout << " [TRACE] <add_worker> - Created worker thread ID = " << th.get_id() << std::endl; m_workers.push_back(std::move(th)); } // --- End of add_worker() --- // private: // Flag indicating that the thread pool is active (running) std::atomic<bool> m_is_active = true; // Task queue (message queue) std::queue<Task> m_queue; // Worker threads std::deque<std::thread> m_workers; // Mutex for protecting the queue std::mutex m_qmutex; // Condition variable for signaling that // there are tasks in the queue std::condition_variable m_qvar; // Condition variable for signaling that the task queue is empty std::condition_variable m_qempty; }; int main(int argc, char** argv) { using namespace std::chrono_literals; if(argc < 3){ std::cerr<< " Error: invalid command line switch." << std::endl; return EXIT_FAILURE; } auto cmd = std::string(argv[1]); auto size = std::stoi(argv[2]); assert(size > 0); if(cmd == "serial") { for(int i = 0; i < size; i++) { std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << " Task id = " << i << " thread id = " << std::this_thread::get_id() << " completed OK. \n"; } return EXIT_FAILURE; } if(cmd == "parallel") { // Get thread pool size from environemnt variable const char* thsize = std::getenv("THREAD_POOL_SIZE"); // std::cout << " thsize = " << thsize << std::endl; int n_workers = thsize != nullptr ? std::stoi(thsize) : std::thread::hardware_concurrency(); ThreadPool thp(n_workers); // Mutex used for protecting std::cout from data race std::mutex cout_mutex; // Run for(int i = 0; i < size; i++) { thp.submit([=, &cout_mutex]{ std::this_thread::sleep_for(std::chrono::seconds(2)); { std::lock_guard<std::mutex> guard(cout_mutex); std::cout << " Task id = " << i << " thread id = " << std::this_thread::get_id() << " completed OK. \n"; } }); } std::cout << " [TRACE] <main> Waiting tasks execution ..." << "\n"; } return 0; }
Building:
$ clang++ thread-pool-simple.cpp -o out.bin -std=c++1z -lpthread -O0 -g -Wall -Wextra -fsanitize=thread -fsanitize=undefined
Run 20 tasks serially: (40 seconds)
$ time ./out.bin serial 20 Task id = 0 thread id = 140386475086592 completed OK. Task id = 1 thread id = 140386475086592 completed OK. Task id = 2 thread id = 140386475086592 completed OK. Task id = 3 thread id = 140386475086592 completed OK. ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... Task id = 17 thread id = 140386475086592 completed OK. Task id = 18 thread id = 140386475086592 completed OK. Task id = 19 thread id = 140386475086592 completed OK. real 0m40.086s user 0m0.037s sys 0m0.045s
Run 20 tasks with thread pool of 4 threads. (10 seconds)
$ time ./out.bin parallel 20 [TRACE] Created thread pool with N = 4 worker threads [TRACE] <add_worker> - Created worker thread ID = 139760855148288 [TRACE] <add_worker> - Created worker thread ID = 139760846755584 [TRACE] <add_worker> - Created worker thread ID = 139760836278016 [TRACE] <add_worker> - Created worker thread ID = 139760827885312 [TRACE] <main> Waiting tasks execution ... [TRACE] <Destructor> Waiting for termination of all threads Task id = 0 thread id = 139760846755584 completed OK. Task id = 3 thread id = 139760827885312 completed OK. Task id = 1 thread id = 139760855148288 completed OK. Task id = 2 thread id = 139760836278016 completed OK. ... ... ... ... ... ... ... ... ... ... ... ... Task id = 17 thread id = 139760827885312 completed OK. Task id = 18 thread id = 139760836278016 completed OK. Task id = 19 thread id = 139760846755584 completed OK. [TRACE] <Destructor> All Worker threads were shutdown. OK real 0m10.103s user 0m0.050s sys 0m0.062s
1.13.4 Further Reading
- Thread pool - Wikipedia
- How to implement Thread pool in Java | Java Code Geeks - 2019 [BEST]
- Thread Pools in Java - GeeksforGeeks [BEST]
- Shows some potential problems of thread pools, namely, deadlock, thread leakage and resource trashing.
- How Thread Pool design pattern works - Zhidko
- Finally Getting the Most out of the Java Thread Pool
- Designing a Thread Pool Framework Part 1: What’s the need of a Thread Pool – thispointer.com
- A Method of Worker Thread Pooling - CodeProject
- java - Design pattern for threadpooling - Stack Overflow