barretenberg
Loading...
Searching...
No Matches
concurrentqueue.h
1// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2// An overview, including benchmark results, is provided here:
3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4// The full design is also described in excruciating detail at:
5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6
7// Simplified BSD license:
8// Copyright (c) 2013-2020, Cameron Desrochers.
9// All rights reserved.
10//
11// Redistribution and use in source and binary forms, with or without modification,
12// are permitted provided that the following conditions are met:
13//
14// - Redistributions of source code must retain the above copyright notice, this list of
15// conditions and the following disclaimer.
16// - Redistributions in binary form must reproduce the above copyright notice, this list of
17// conditions and the following disclaimer in the documentation and/or other materials
18// provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30// Also dual-licensed under the Boost Software License (see LICENSE.md)
31
32#pragma once
33
34#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
35// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37// upon assigning any computed values)
38#pragma GCC diagnostic push
39#pragma GCC diagnostic ignored "-Wconversion"
40
41#ifdef MCDBGQ_USE_RELACY
42#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43#endif
44#endif
45
46#if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
47// VS2019 with /W4 warns about constant conditional expressions but unless /std=c++17 or higher
48// does not support `if constexpr`, so we have no choice but to simply disable the warning
49#pragma warning(push)
50#pragma warning(disable : 4127) // conditional expression is constant
51#endif
52
53#if defined(__APPLE__)
54#include "TargetConditionals.h"
55#endif
56
57#ifdef MCDBGQ_USE_RELACY
58#include "relacy/relacy_std.hpp"
59#include "relacy_shims.h"
60// We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
61// We'll override the default trait malloc ourselves without a macro.
62#undef new
63#undef delete
64#undef malloc
65#undef free
66#else
67#include <atomic> // Requires C++11. Sorry VS2010.
68#include <cassert>
69#endif
70#include <cstddef> // for max_align_t
71#include <cstdint>
72#include <cstdlib>
73#include <type_traits>
74#include <algorithm>
75#include <utility>
76#include <limits>
77#include <climits> // for CHAR_BIT
78#include <array>
79#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
80#include <mutex> // used for thread exit synchronization
81
82// Platform-specific definitions of a numeric thread ID type and an invalid value
83namespace moodycamel {
84namespace details {
85template <typename thread_id_t> struct thread_id_converter {
86 typedef thread_id_t thread_id_numeric_size_t;
87 typedef thread_id_t thread_id_hash_t;
88 static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
89};
90} // namespace details
91} // namespace moodycamel
92#if defined(MCDBGQ_USE_RELACY)
93namespace moodycamel {
94namespace details {
95typedef std::uint32_t thread_id_t;
96static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
97static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
98static inline thread_id_t thread_id()
99{
100 return rl::thread_index();
101}
102} // namespace details
103} // namespace moodycamel
104#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
105// No sense pulling in windows.h in a header, we'll manually declare the function
106// we use and rely on backwards-compatibility for this not to break
107extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
108namespace moodycamel {
109namespace details {
110static_assert(sizeof(unsigned long) == sizeof(std::uint32_t),
111 "Expected size of unsigned long to be 32 bits on Windows");
112typedef std::uint32_t thread_id_t;
113static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
114static const thread_id_t invalid_thread_id2 =
115 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread
116 // IDs are presently multiples of 4.
117static inline thread_id_t thread_id()
118{
119 return static_cast<thread_id_t>(::GetCurrentThreadId());
120}
121} // namespace details
122} // namespace moodycamel
123#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || \
124 defined(MOODYCAMEL_NO_THREAD_LOCAL)
125namespace moodycamel {
126namespace details {
127static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8,
128 "std::thread::id is expected to be either 4 or 8 bytes");
129
130typedef std::thread::id thread_id_t;
131static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
132
133// Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
134// only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
135// be.
136static inline thread_id_t thread_id()
137{
138 return std::this_thread::get_id();
139}
140
141template <std::size_t> struct thread_id_size {};
142template <> struct thread_id_size<4> {
143 typedef std::uint32_t numeric_t;
144};
145template <> struct thread_id_size<8> {
146 typedef std::uint64_t numeric_t;
147};
148
149template <> struct thread_id_converter<thread_id_t> {
150 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
151#ifndef __APPLE__
152 typedef std::size_t thread_id_hash_t;
153#else
154 typedef thread_id_numeric_size_t thread_id_hash_t;
155#endif
156
157 static thread_id_hash_t prehash(thread_id_t const& x)
158 {
159#ifndef __APPLE__
160 return std::hash<std::thread::id>()(x);
161#else
162 return *reinterpret_cast<thread_id_hash_t const*>(&x);
163#endif
164 }
165};
166}
167}
168#else
169// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
170// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
171// static variable's address as a thread identifier :-)
172#if defined(__GNUC__) || defined(__INTEL_COMPILER)
173#define MOODYCAMEL_THREADLOCAL __thread
174#elif defined(_MSC_VER)
175#define MOODYCAMEL_THREADLOCAL __declspec(thread)
176#else
177// Assume C++11 compliant compiler
178#define MOODYCAMEL_THREADLOCAL thread_local
179#endif
180namespace moodycamel {
181namespace details {
182typedef std::uintptr_t thread_id_t;
183static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
184static const thread_id_t invalid_thread_id2 =
185 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
186inline thread_id_t thread_id()
187{
188 static MOODYCAMEL_THREADLOCAL int x;
189 return reinterpret_cast<thread_id_t>(&x);
190}
191}
192}
193#endif
194
195// Constexpr if
196#ifndef MOODYCAMEL_CONSTEXPR_IF
197#if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
198#define MOODYCAMEL_CONSTEXPR_IF if constexpr
199#define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
200#else
201#define MOODYCAMEL_CONSTEXPR_IF if
202#define MOODYCAMEL_MAYBE_UNUSED
203#endif
204#endif
205
206// Exceptions
207#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
208#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
209 (!defined(_MSC_VER) && !defined(__GNUC__))
210#define MOODYCAMEL_EXCEPTIONS_ENABLED
211#endif
212#endif
213#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
214#define MOODYCAMEL_TRY try
215#define MOODYCAMEL_CATCH(...) catch (__VA_ARGS__)
216#define MOODYCAMEL_RETHROW throw
217#define MOODYCAMEL_THROW(expr) throw(expr)
218#else
219#define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF(true)
220#define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF(false)
221#define MOODYCAMEL_RETHROW
222#define MOODYCAMEL_THROW(expr)
223#endif
224
225#ifndef MOODYCAMEL_NOEXCEPT
226#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
227#define MOODYCAMEL_NOEXCEPT
228#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
229#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
230#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
231// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
232// We have to assume *all* non-trivial constructors may throw on VS2012!
233#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
234#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
235 (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value \
236 ? std::is_trivially_move_constructible<type>::value \
237 : std::is_trivially_copy_constructible<type>::value)
238#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
239 ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value \
240 ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value \
241 : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && \
242 MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
243#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
244#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
245#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
246 (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value \
247 ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value \
248 : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
249#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
250 ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value \
251 ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value \
252 : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && \
253 MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
254#else
255#define MOODYCAMEL_NOEXCEPT noexcept
256#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
257#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
258#endif
259#endif
260
261#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
262#ifdef MCDBGQ_USE_RELACY
263#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
264#else
265// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug:
266// http://sourceforge.net/p/mingw-w64/bugs/445 g++ <=4.7 doesn't support thread_local either. Finally, iOS/ARM doesn't
267// have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
268#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && \
269 (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && \
270 (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && \
271 (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
272// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
273#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // tentatively enabled for now; years ago several users report having
274 // problems with it on
275#endif
276#endif
277#endif
278
279// VS2012 doesn't support deleted functions.
280// In this case, we declare the function normally but don't define it. A link error will be generated if the function is
281// called.
282#ifndef MOODYCAMEL_DELETE_FUNCTION
283#if defined(_MSC_VER) && _MSC_VER < 1800
284#define MOODYCAMEL_DELETE_FUNCTION
285#else
286#define MOODYCAMEL_DELETE_FUNCTION = delete
287#endif
288#endif
289
290namespace moodycamel {
291namespace details {
292#ifndef MOODYCAMEL_ALIGNAS
293// VS2013 doesn't support alignas or alignof, and align() requires a constant literal
294#if defined(_MSC_VER) && _MSC_VER <= 1800
295#define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
296#define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
297#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
298template <int Align, typename T> struct Vs2013Aligned {}; // default, unsupported alignment
299template <typename T> struct Vs2013Aligned<1, T> {
300 typedef __declspec(align(1)) T type;
301};
302template <typename T> struct Vs2013Aligned<2, T> {
303 typedef __declspec(align(2)) T type;
304};
305template <typename T> struct Vs2013Aligned<4, T> {
306 typedef __declspec(align(4)) T type;
307};
308template <typename T> struct Vs2013Aligned<8, T> {
309 typedef __declspec(align(8)) T type;
310};
311template <typename T> struct Vs2013Aligned<16, T> {
312 typedef __declspec(align(16)) T type;
313};
314template <typename T> struct Vs2013Aligned<32, T> {
315 typedef __declspec(align(32)) T type;
316};
317template <typename T> struct Vs2013Aligned<64, T> {
318 typedef __declspec(align(64)) T type;
319};
320template <typename T> struct Vs2013Aligned<128, T> {
321 typedef __declspec(align(128)) T type;
322};
323template <typename T> struct Vs2013Aligned<256, T> {
324 typedef __declspec(align(256)) T type;
325};
326#else
327template <typename T> struct identity {
328 typedef T type;
329};
330#define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
331#define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
332#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
333#endif
334#endif
335} // namespace details
336} // namespace moodycamel
337
338// TSAN can false report races in lock-free code. To enable TSAN to be used from projects that use this one,
339// we can apply per-function compile-time suppression.
340// See https://clang.llvm.org/docs/ThreadSanitizer.html#has-feature-thread-sanitizer
341#define MOODYCAMEL_NO_TSAN
342#if defined(__has_feature)
343#if __has_feature(thread_sanitizer)
344#undef MOODYCAMEL_NO_TSAN
345#define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
346#endif // TSAN
347#endif // TSAN
348
349// Compiler-specific likely/unlikely hints
350namespace moodycamel {
351namespace details {
352#if defined(__GNUC__)
353static inline bool(likely)(bool x)
354{
355 return __builtin_expect((x), true);
356}
357static inline bool(unlikely)(bool x)
358{
359 return __builtin_expect((x), false);
360}
361#else
362static inline bool(likely)(bool x)
363{
364 return x;
365}
366static inline bool(unlikely)(bool x)
367{
368 return x;
369}
370#endif
371} // namespace details
372} // namespace moodycamel
373
374#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
375#include "internal/concurrentqueue_internal_debug.h"
376#endif
377
378namespace moodycamel {
379namespace details {
380template <typename T> struct const_numeric_max {
381 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
382 static const T value = std::numeric_limits<T>::is_signed
383 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
384 : static_cast<T>(-1);
385};
386
387#if defined(__GLIBCXX__)
388typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
389#else
390typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
391#endif
392
393// Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
394// 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
395typedef union {
396 std_max_align_t x;
397 long long y;
398 void* z;
400} // namespace details
401
402// Default traits for the ConcurrentQueue. To change some of the
403// traits without re-implementing all of them, inherit from this
404// struct and shadow the declarations you wish to be different;
405// since the traits are used as a template type parameter, the
406// shadowed declarations will be used where defined, and the defaults
407// otherwise.
409 // General-purpose size type. std::size_t is strongly recommended.
410 typedef std::size_t size_t;
411
412 // The type used for the enqueue and dequeue indices. Must be at least as
413 // large as size_t. Should be significantly larger than the number of elements
414 // you expect to hold at once, especially if you have a high turnover rate;
415 // for example, on 32-bit x86, if you expect to have over a hundred million
416 // elements or pump several million elements through your queue in a very
417 // short space of time, using a 32-bit type *may* trigger a race condition.
418 // A 64-bit int type is recommended in that case, and in practice will
419 // prevent a race condition no matter the usage of the queue. Note that
420 // whether the queue is lock-free with a 64-int type depends on the whether
421 // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
422 typedef std::size_t index_t;
423
424 // Internally, all elements are enqueued and dequeued from multi-element
425 // blocks; this is the smallest controllable unit. If you expect few elements
426 // but many producers, a smaller block size should be favoured. For few producers
427 // and/or many elements, a larger block size is preferred. A sane default
428 // is provided. Must be a power of 2.
429 static const size_t BLOCK_SIZE = 32;
430
431 // For explicit producers (i.e. when using a producer token), the block is
432 // checked for being empty by iterating through a list of flags, one per element.
433 // For large block sizes, this is too inefficient, and switching to an atomic
434 // counter-based approach is faster. The switch is made for block sizes strictly
435 // larger than this threshold.
436 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
437
438 // How many full blocks can be expected for a single explicit producer? This should
439 // reflect that number's maximum for optimal performance. Must be a power of 2.
440 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
441
442 // How many full blocks can be expected for a single implicit producer? This should
443 // reflect that number's maximum for optimal performance. Must be a power of 2.
444 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
445
446 // The initial size of the hash table mapping thread IDs to implicit producers.
447 // Note that the hash is resized every time it becomes half full.
448 // Must be a power of two, and either 0 or at least 1. If 0, implicit production
449 // (using the enqueue methods without an explicit producer token) is disabled.
450 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
451
452 // Controls the number of items that an explicit consumer (i.e. one with a token)
453 // must consume before it causes all consumers to rotate and move on to the next
454 // internal queue.
455 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
456
457 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
458 // Enqueue operations that would cause this limit to be surpassed will fail. Note
459 // that this limit is enforced at the block level (for performance reasons), i.e.
460 // it's rounded up to the nearest block size.
461 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
462
463 // The number of times to spin before sleeping when waiting on a semaphore.
464 // Recommended values are on the order of 1000-10000 unless the number of
465 // consumer threads exceeds the number of idle cores (in which case try 0-100).
466 // Only affects instances of the BlockingConcurrentQueue.
467 static const int MAX_SEMA_SPINS = 10000;
468
469 // Whether to recycle dynamically-allocated blocks into an internal free list or
470 // not. If false, only pre-allocated blocks (controlled by the constructor
471 // arguments) will be recycled, and all others will be `free`d back to the heap.
472 // Note that blocks consumed by explicit producers are only freed on destruction
473 // of the queue (not following destruction of the token) regardless of this trait.
474 static const bool RECYCLE_ALLOCATED_BLOCKS = false;
475
476#ifndef MCDBGQ_USE_RELACY
477 // Memory allocation can be customized if needed.
478 // malloc should return nullptr on failure, and handle alignment like std::malloc.
479#if defined(malloc) || defined(free)
480 // Gah, this is 2015, stop defining macros that break standard code already!
481 // Work around malloc/free being special macros:
482 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
483 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
484 static inline void*(malloc)(size_t size) { return WORKAROUND_malloc(size); }
485 static inline void(free)(void* ptr) { return WORKAROUND_free(ptr); }
486#else
487 static inline void* malloc(size_t size) { return std::malloc(size); }
488 static inline void free(void* ptr) { return std::free(ptr); }
489#endif
490#else
491 // Debug versions when running under the Relacy race detector (ignore
492 // these in user code)
493 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
494 static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
495#endif
496};
497
498// When producing or consuming many elements, the most efficient way is to:
499// 1) Use one of the bulk-operation methods of the queue with a token
500// 2) Failing that, use the bulk-operation methods without a token
501// 3) Failing that, create a token and use that with the single-item methods
502// 4) Failing that, use the single-parameter methods of the queue
503// Having said that, don't create tokens willy-nilly -- ideally there should be
504// a maximum of one token per thread (of each kind).
505struct ProducerToken;
506struct ConsumerToken;
507
508template <typename T, typename Traits> class ConcurrentQueue;
509template <typename T, typename Traits> class BlockingConcurrentQueue;
510class ConcurrentQueueTests;
511
512namespace details {
515 std::atomic<bool> inactive;
516 ProducerToken* token;
517
519 : next(nullptr)
520 , inactive(false)
521 , token(nullptr)
522 {}
523};
524
525template <bool use32> struct _hash_32_or_64 {
526 static inline std::uint32_t hash(std::uint32_t h)
527 {
528 // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
529 // Since the thread ID is already unique, all we really want to do is propagate that
530 // uniqueness evenly across all the bits, so that we can use a subset of the bits while
531 // reducing collisions significantly
532 h ^= h >> 16;
533 h *= 0x85ebca6b;
534 h ^= h >> 13;
535 h *= 0xc2b2ae35;
536 return h ^ (h >> 16);
537 }
538};
539template <> struct _hash_32_or_64<1> {
540 static inline std::uint64_t hash(std::uint64_t h)
541 {
542 h ^= h >> 33;
543 h *= 0xff51afd7ed558ccd;
544 h ^= h >> 33;
545 h *= 0xc4ceb9fe1a85ec53;
546 return h ^ (h >> 33);
547 }
548};
549template <std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> {};
550
551static inline size_t hash_thread_id(thread_id_t id)
552{
553 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
554 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
556}
557
558template <typename T> static inline bool circular_less_than(T a, T b)
559{
560 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
561 "circular_less_than is intended to be used only with unsigned integer types");
562 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << (static_cast<T>(sizeof(T) * CHAR_BIT - 1)));
563 // Note: extra parens around rhs of operator<< is MSVC bug:
564 // https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931
565 // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when
566 // done here.
567}
568
569template <typename U> static inline char* align_for(char* ptr)
570{
571 const std::size_t alignment = std::alignment_of<U>::value;
572 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
573}
574
575template <typename T> static inline T ceil_to_pow_2(T x)
576{
577 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
578 "ceil_to_pow_2 is intended to be used only with unsigned integer types");
579
580 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
581 --x;
582 x |= x >> 1;
583 x |= x >> 2;
584 x |= x >> 4;
585 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
586 x |= x >> (i << 3);
587 }
588 ++x;
589 return x;
590}
591
592template <typename T> static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
593{
594 T temp = std::move(left.load(std::memory_order_relaxed));
595 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
596 right.store(std::move(temp), std::memory_order_relaxed);
597}
598
599template <typename T> static inline T const& nomove(T const& x)
600{
601 return x;
602}
603
604template <bool Enable> struct nomove_if {
605 template <typename T> static inline T const& eval(T const& x) { return x; }
606};
607
608template <> struct nomove_if<false> {
609 template <typename U> static inline auto eval(U&& x) -> decltype(std::forward<U>(x)) { return std::forward<U>(x); }
610};
611
612template <typename It> static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT->decltype(*it)
613{
614 return *it;
615}
616
617#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
618template <typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> {};
619#else
620template <typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> {};
621#endif
622
623#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
624#ifdef MCDBGQ_USE_RELACY
625typedef RelacyThreadExitListener ThreadExitListener;
626typedef RelacyThreadExitNotifier ThreadExitNotifier;
627#else
628class ThreadExitNotifier;
629
631 typedef void (*callback_t)(void*);
632 callback_t callback;
633 void* userData;
634
635 ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
636 ThreadExitNotifier* chain; // reserved for use by the ThreadExitNotifier
637};
638
640 public:
641 static void subscribe(ThreadExitListener* listener)
642 {
643 auto& tlsInst = instance();
644 std::lock_guard<std::mutex> guard(mutex());
645 listener->next = tlsInst.tail;
646 listener->chain = &tlsInst;
647 tlsInst.tail = listener;
648 }
649
650 static void unsubscribe(ThreadExitListener* listener)
651 {
652 std::lock_guard<std::mutex> guard(mutex());
653 if (!listener->chain) {
654 return; // race with ~ThreadExitNotifier
655 }
656 auto& tlsInst = *listener->chain;
657 listener->chain = nullptr;
658 ThreadExitListener** prev = &tlsInst.tail;
659 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
660 if (ptr == listener) {
661 *prev = ptr->next;
662 break;
663 }
664 prev = &ptr->next;
665 }
666 }
667
668 private:
670 : tail(nullptr)
671 {}
672 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
673 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
674
676 {
677 // This thread is about to exit, let everyone know!
678 assert(this == &instance() &&
679 "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that "
680 "MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
681 std::lock_guard<std::mutex> guard(mutex());
682 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
683 ptr->chain = nullptr;
684 ptr->callback(ptr->userData);
685 }
686 }
687
688 // Thread-local
689 static inline ThreadExitNotifier& instance()
690 {
691 static thread_local ThreadExitNotifier notifier;
692 return notifier;
693 }
694
695 static inline std::mutex& mutex()
696 {
697 // Must be static because the ThreadExitNotifier could be destroyed while unsubscribe is called
698 static std::mutex mutex;
699 return mutex;
700 }
701
702 private:
703 ThreadExitListener* tail;
704};
705#endif
706#endif
707
708template <typename T> struct static_is_lock_free_num {
709 enum { value = 0 };
710};
711template <> struct static_is_lock_free_num<signed char> {
712 enum { value = ATOMIC_CHAR_LOCK_FREE };
713};
714template <> struct static_is_lock_free_num<short> {
715 enum { value = ATOMIC_SHORT_LOCK_FREE };
716};
717template <> struct static_is_lock_free_num<int> {
718 enum { value = ATOMIC_INT_LOCK_FREE };
719};
720template <> struct static_is_lock_free_num<long> {
721 enum { value = ATOMIC_LONG_LOCK_FREE };
722};
723template <> struct static_is_lock_free_num<long long> {
724 enum { value = ATOMIC_LLONG_LOCK_FREE };
725};
726template <typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> {};
727template <> struct static_is_lock_free<bool> {
728 enum { value = ATOMIC_BOOL_LOCK_FREE };
729};
730template <typename U> struct static_is_lock_free<U*> {
731 enum { value = ATOMIC_POINTER_LOCK_FREE };
732};
733} // namespace details
734
736 template <typename T, typename Traits> explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
737
738 template <typename T, typename Traits> explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
739
740 ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT : producer(other.producer)
741 {
742 other.producer = nullptr;
743 if (producer != nullptr) {
744 producer->token = this;
745 }
746 }
747
748 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
749 {
750 swap(other);
751 return *this;
752 }
753
754 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
755 {
756 std::swap(producer, other.producer);
757 if (producer != nullptr) {
758 producer->token = this;
759 }
760 if (other.producer != nullptr) {
761 other.producer->token = &other;
762 }
763 }
764
765 // A token is always valid unless:
766 // 1) Memory allocation failed during construction
767 // 2) It was moved via the move constructor
768 // (Note: assignment does a swap, leaving both potentially valid)
769 // 3) The associated queue was destroyed
770 // Note that if valid() returns true, that only indicates
771 // that the token is valid for use with a specific queue,
772 // but not which one; that's up to the user to track.
773 inline bool valid() const { return producer != nullptr; }
774
776 {
777 if (producer != nullptr) {
778 producer->token = nullptr;
779 producer->inactive.store(true, std::memory_order_release);
780 }
781 }
782
783 // Disable copying and assignment
784 ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
785 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
786
787 private:
788 template <typename T, typename Traits> friend class ConcurrentQueue;
789 friend class ConcurrentQueueTests;
790
791 protected:
793};
794
796 template <typename T, typename Traits> explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
797
798 template <typename T, typename Traits> explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
799
800 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT : initialOffset(other.initialOffset),
801 lastKnownGlobalOffset(other.lastKnownGlobalOffset),
802 itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
803 currentProducer(other.currentProducer),
804 desiredProducer(other.desiredProducer)
805 {}
806
807 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
808 {
809 swap(other);
810 return *this;
811 }
812
813 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
814 {
815 std::swap(initialOffset, other.initialOffset);
816 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
817 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
818 std::swap(currentProducer, other.currentProducer);
819 std::swap(desiredProducer, other.desiredProducer);
820 }
821
822 // Disable copying and assignment
823 ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
824 ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
825
826 private:
827 template <typename T, typename Traits> friend class ConcurrentQueue;
828 friend class ConcurrentQueueTests;
829
830 private: // but shared with ConcurrentQueue
831 std::uint32_t initialOffset;
832 std::uint32_t lastKnownGlobalOffset;
833 std::uint32_t itemsConsumedFromCurrent;
836};
837
838// Need to forward-declare this swap because it's in a namespace.
839// See
840// http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
841template <typename T, typename Traits>
842inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a,
843 typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
844
845template <typename T, typename Traits = ConcurrentQueueDefaultTraits> class ConcurrentQueue {
846 public:
849
850 typedef typename Traits::index_t index_t;
851 typedef typename Traits::size_t size_t;
852
853 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
854 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
855 static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
856 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
857 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
858 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE =
859 static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
860 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
861 static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
862#ifdef _MSC_VER
863#pragma warning(push)
864#pragma warning(disable : 4307) // + integral constant overflow (that's what the ternary expression is for!)
865#pragma warning(disable : 4309) // static_cast: Truncation of constant value
866#endif
867 static const size_t MAX_SUBQUEUE_SIZE =
868 (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE)
870 : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
871#ifdef _MSC_VER
872#pragma warning(pop)
873#endif
874
875 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
876 "Traits::size_t must be an unsigned integral type");
877 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
878 "Traits::index_t must be an unsigned integral type");
879 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
880 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
881 "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
882 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) &&
883 !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
884 "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
885 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) &&
886 !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
887 "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
888 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) &&
889 !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
890 "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
891 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) ||
892 !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
893 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
894 static_assert(
895 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
896 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
897
898 public:
899 // Creates a queue with at least `capacity` element slots; note that the
900 // actual number of elements that can be inserted without additional memory
901 // allocation depends on the number of producers and the block size (e.g. if
902 // the block size is equal to `capacity`, only a single block will be allocated
903 // up-front, which means only a single producer will be able to enqueue elements
904 // without an extra allocation -- blocks aren't shared between producers).
905 // This method is not thread safe -- it is up to the user to ensure that the
906 // queue is fully constructed before it starts being used by other threads (this
907 // includes making the memory effects of construction visible, possibly with a
908 // memory barrier).
909 explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
910 : producerListTail(nullptr)
911 , producerCount(0)
912 , initialBlockPoolIndex(0)
913 , nextExplicitConsumerId(0)
914 , globalExplicitConsumerOffset(0)
915 {
916 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
917 populate_initial_implicit_producer_hash();
918 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
919
920#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
921 // Track all the producers using a fully-resolved typed list for
922 // each kind; this makes it possible to debug them starting from
923 // the root queue object (otherwise wacky casts are needed that
924 // don't compile in the debugger's expression evaluator).
925 explicitProducers.store(nullptr, std::memory_order_relaxed);
926 implicitProducers.store(nullptr, std::memory_order_relaxed);
927#endif
928 }
929
930 // Computes the correct amount of pre-allocated blocks for you based
931 // on the minimum number of elements you want available at any given
932 // time, and the maximum concurrent number of each type of producer.
933 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
934 : producerListTail(nullptr)
935 , producerCount(0)
936 , initialBlockPoolIndex(0)
937 , nextExplicitConsumerId(0)
938 , globalExplicitConsumerOffset(0)
939 {
940 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
941 populate_initial_implicit_producer_hash();
942 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) +
943 2 * (maxExplicitProducers + maxImplicitProducers);
944 populate_initial_block_list(blocks);
945
946#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
947 explicitProducers.store(nullptr, std::memory_order_relaxed);
948 implicitProducers.store(nullptr, std::memory_order_relaxed);
949#endif
950 }
951
952 // Note: The queue should not be accessed concurrently while it's
953 // being deleted. It's up to the user to synchronize this.
954 // This method is not thread safe.
956 {
957 // Destroy producers
958 auto ptr = producerListTail.load(std::memory_order_relaxed);
959 while (ptr != nullptr) {
960 auto next = ptr->next_prod();
961 if (ptr->token != nullptr) {
962 ptr->token->producer = nullptr;
963 }
964 destroy(ptr);
965 ptr = next;
966 }
967
968 // Destroy implicit producer hash tables
969 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0)
970 {
971 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
972 while (hash != nullptr) {
973 auto prev = hash->prev;
974 if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
975 for (size_t i = 0; i != hash->capacity; ++i) {
976 hash->entries[i].~ImplicitProducerKVP();
977 }
978 hash->~ImplicitProducerHash();
979 (Traits::free)(hash);
980 }
981 hash = prev;
982 }
983 }
984
985 // Destroy global free list
986 auto block = freeList.head_unsafe();
987 while (block != nullptr) {
988 auto next = block->freeListNext.load(std::memory_order_relaxed);
989 if (block->dynamicallyAllocated) {
990 destroy(block);
991 }
992 block = next;
993 }
994
995 // Destroy initial free list
996 destroy_array(initialBlockPool, initialBlockPoolSize);
997 }
998
999 // Disable copying and copy assignment
1000 ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
1001 ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
1002
1003 // Moving is supported, but note that it is *not* a thread-safe operation.
1004 // Nobody can use the queue while it's being moved, and the memory effects
1005 // of that move must be propagated to other threads before they can use it.
1006 // Note: When a queue is moved, its tokens are still valid but can only be
1007 // used with the destination queue (i.e. semantically they are moved along
1008 // with the queue itself).
1009 ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
1010 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
1011 producerCount(other.producerCount.load(std::memory_order_relaxed)),
1012 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
1013 initialBlockPool(other.initialBlockPool),
1014 initialBlockPoolSize(other.initialBlockPoolSize),
1015 freeList(std::move(other.freeList)),
1016 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
1017 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
1018 {
1019 // Move the other one into this, and leave the other one as an empty queue
1020 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1021 populate_initial_implicit_producer_hash();
1022 swap_implicit_producer_hashes(other);
1023
1024 other.producerListTail.store(nullptr, std::memory_order_relaxed);
1025 other.producerCount.store(0, std::memory_order_relaxed);
1026 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
1027 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
1028
1029#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1030 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1031 other.explicitProducers.store(nullptr, std::memory_order_relaxed);
1032 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1033 other.implicitProducers.store(nullptr, std::memory_order_relaxed);
1034#endif
1035
1036 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
1037 other.initialBlockPoolSize = 0;
1038 other.initialBlockPool = nullptr;
1039
1040 reown_producers();
1041 }
1042
1043 inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT { return swap_internal(other); }
1044
1045 // Swaps this queue's state with the other's. Not thread-safe.
1046 // Swapping two queues does not invalidate their tokens, however
1047 // the tokens that were created for one queue must be used with
1048 // only the swapped queue (i.e. the tokens are tied to the
1049 // queue's movable state, not the object itself).
1050 inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT { swap_internal(other); }
1051
1052 private:
1053 ConcurrentQueue& swap_internal(ConcurrentQueue& other)
1054 {
1055 if (this == &other) {
1056 return *this;
1057 }
1058
1059 details::swap_relaxed(producerListTail, other.producerListTail);
1060 details::swap_relaxed(producerCount, other.producerCount);
1061 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
1062 std::swap(initialBlockPool, other.initialBlockPool);
1063 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
1064 freeList.swap(other.freeList);
1065 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
1066 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
1067
1068 swap_implicit_producer_hashes(other);
1069
1070 reown_producers();
1071 other.reown_producers();
1072
1073#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1074 details::swap_relaxed(explicitProducers, other.explicitProducers);
1075 details::swap_relaxed(implicitProducers, other.implicitProducers);
1076#endif
1077
1078 return *this;
1079 }
1080
1081 public:
1082 // Enqueues a single item (by copying it).
1083 // Allocates memory if required. Only fails if memory allocation fails (or implicit
1084 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
1085 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1086 // Thread-safe.
1087 inline bool enqueue(T const& item)
1088 {
1089 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1090 else return inner_enqueue<CanAlloc>(item);
1091 }
1092
1093 // Enqueues a single item (by moving it, if possible).
1094 // Allocates memory if required. Only fails if memory allocation fails (or implicit
1095 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
1096 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1097 // Thread-safe.
1098 inline bool enqueue(T&& item)
1099 {
1100 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1101 else return inner_enqueue<CanAlloc>(std::move(item));
1102 }
1103
1104 // Enqueues a single item (by copying it) using an explicit producer token.
1105 // Allocates memory if required. Only fails if memory allocation fails (or
1106 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1107 // Thread-safe.
1108 inline bool enqueue(producer_token_t const& token, T const& item) { return inner_enqueue<CanAlloc>(token, item); }
1109
1110 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1111 // Allocates memory if required. Only fails if memory allocation fails (or
1112 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1113 // Thread-safe.
1114 inline bool enqueue(producer_token_t const& token, T&& item)
1115 {
1116 return inner_enqueue<CanAlloc>(token, std::move(item));
1117 }
1118
1119 // Enqueues several items.
1120 // Allocates memory if required. Only fails if memory allocation fails (or
1121 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1122 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1123 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
1124 // Thread-safe.
1125 template <typename It> bool enqueue_bulk(It itemFirst, size_t count)
1126 {
1127 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1128 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1129 }
1130
1131 // Enqueues several items using an explicit producer token.
1132 // Allocates memory if required. Only fails if memory allocation fails
1133 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1134 // Note: Use std::make_move_iterator if the elements should be moved
1135 // instead of copied.
1136 // Thread-safe.
1137 template <typename It> bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1138 {
1139 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1140 }
1141
1142 // Enqueues a single item (by copying it).
1143 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
1144 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1145 // is 0).
1146 // Thread-safe.
1147 inline bool try_enqueue(T const& item)
1148 {
1149 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1150 else return inner_enqueue<CannotAlloc>(item);
1151 }
1152
1153 // Enqueues a single item (by moving it, if possible).
1154 // Does not allocate memory (except for one-time implicit producer).
1155 // Fails if not enough room to enqueue (or implicit production is
1156 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1157 // Thread-safe.
1158 inline bool try_enqueue(T&& item)
1159 {
1160 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1161 else return inner_enqueue<CannotAlloc>(std::move(item));
1162 }
1163
1164 // Enqueues a single item (by copying it) using an explicit producer token.
1165 // Does not allocate memory. Fails if not enough room to enqueue.
1166 // Thread-safe.
1167 inline bool try_enqueue(producer_token_t const& token, T const& item)
1168 {
1169 return inner_enqueue<CannotAlloc>(token, item);
1170 }
1171
1172 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1173 // Does not allocate memory. Fails if not enough room to enqueue.
1174 // Thread-safe.
1175 inline bool try_enqueue(producer_token_t const& token, T&& item)
1176 {
1177 return inner_enqueue<CannotAlloc>(token, std::move(item));
1178 }
1179
1180 // Enqueues several items.
1181 // Does not allocate memory (except for one-time implicit producer).
1182 // Fails if not enough room to enqueue (or implicit production is
1183 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1184 // Note: Use std::make_move_iterator if the elements should be moved
1185 // instead of copied.
1186 // Thread-safe.
1187 template <typename It> bool try_enqueue_bulk(It itemFirst, size_t count)
1188 {
1189 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1190 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1191 }
1192
1193 // Enqueues several items using an explicit producer token.
1194 // Does not allocate memory. Fails if not enough room to enqueue.
1195 // Note: Use std::make_move_iterator if the elements should be moved
1196 // instead of copied.
1197 // Thread-safe.
1198 template <typename It> bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1199 {
1200 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1201 }
1202
1203 // Attempts to dequeue from the queue.
1204 // Returns false if all producer streams appeared empty at the time they
1205 // were checked (so, the queue is likely but not guaranteed to be empty).
1206 // Never allocates. Thread-safe.
1207 template <typename U> bool try_dequeue(U& item)
1208 {
1209 // Instead of simply trying each producer in turn (which could cause needless contention on the first
1210 // producer), we score them heuristically.
1211 size_t nonEmptyCount = 0;
1212 ProducerBase* best = nullptr;
1213 size_t bestSize = 0;
1214 for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr;
1215 ptr = ptr->next_prod()) {
1216 auto size = ptr->size_approx();
1217 if (size > 0) {
1218 if (size > bestSize) {
1219 bestSize = size;
1220 best = ptr;
1221 }
1222 ++nonEmptyCount;
1223 }
1224 }
1225
1226 // If there was at least one non-empty queue but it appears empty at the time
1227 // we try to dequeue from it, we need to make sure every queue's been tried
1228 if (nonEmptyCount > 0) {
1229 if ((details::likely)(best->dequeue(item))) {
1230 return true;
1231 }
1232 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1233 if (ptr != best && ptr->dequeue(item)) {
1234 return true;
1235 }
1236 }
1237 }
1238 return false;
1239 }
1240
1241 // Attempts to dequeue from the queue.
1242 // Returns false if all producer streams appeared empty at the time they
1243 // were checked (so, the queue is likely but not guaranteed to be empty).
1244 // This differs from the try_dequeue(item) method in that this one does
1245 // not attempt to reduce contention by interleaving the order that producer
1246 // streams are dequeued from. So, using this method can reduce overall throughput
1247 // under contention, but will give more predictable results in single-threaded
1248 // consumer scenarios. This is mostly only useful for internal unit tests.
1249 // Never allocates. Thread-safe.
1250 template <typename U> bool try_dequeue_non_interleaved(U& item)
1251 {
1252 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1253 if (ptr->dequeue(item)) {
1254 return true;
1255 }
1256 }
1257 return false;
1258 }
1259
1260 // Attempts to dequeue from the queue using an explicit consumer token.
1261 // Returns false if all producer streams appeared empty at the time they
1262 // were checked (so, the queue is likely but not guaranteed to be empty).
1263 // Never allocates. Thread-safe.
1264 template <typename U> bool try_dequeue(consumer_token_t& token, U& item)
1265 {
1266 // The idea is roughly as follows:
1267 // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the
1268 // highest efficiency consumer dictates the rotation speed of everyone else, more or less If you see that the
1269 // global offset has changed, you must reset your consumption counter and move to your designated place If
1270 // there's no items where you're supposed to be, keep moving until you find a producer with some items If the
1271 // global offset has not changed but you've run out of items to consume, move over from your current position
1272 // until you find an producer with something in it
1273
1274 if (token.desiredProducer == nullptr ||
1275 token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1276 if (!update_current_producer_after_rotation(token)) {
1277 return false;
1278 }
1279 }
1280
1281 // If there was at least one non-empty queue but it appears empty at the time
1282 // we try to dequeue from it, we need to make sure every queue's been tried
1283 if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1284 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1285 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1286 }
1287 return true;
1288 }
1289
1290 auto tail = producerListTail.load(std::memory_order_acquire);
1291 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1292 if (ptr == nullptr) {
1293 ptr = tail;
1294 }
1295 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1296 if (ptr->dequeue(item)) {
1297 token.currentProducer = ptr;
1298 token.itemsConsumedFromCurrent = 1;
1299 return true;
1300 }
1301 ptr = ptr->next_prod();
1302 if (ptr == nullptr) {
1303 ptr = tail;
1304 }
1305 }
1306 return false;
1307 }
1308
1309 // Attempts to dequeue several elements from the queue.
1310 // Returns the number of items actually dequeued.
1311 // Returns 0 if all producer streams appeared empty at the time they
1312 // were checked (so, the queue is likely but not guaranteed to be empty).
1313 // Never allocates. Thread-safe.
1314 template <typename It> size_t try_dequeue_bulk(It itemFirst, size_t max)
1315 {
1316 size_t count = 0;
1317 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1318 count += ptr->dequeue_bulk(itemFirst, max - count);
1319 if (count == max) {
1320 break;
1321 }
1322 }
1323 return count;
1324 }
1325
1326 // Attempts to dequeue several elements from the queue using an explicit consumer token.
1327 // Returns the number of items actually dequeued.
1328 // Returns 0 if all producer streams appeared empty at the time they
1329 // were checked (so, the queue is likely but not guaranteed to be empty).
1330 // Never allocates. Thread-safe.
1331 template <typename It> size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1332 {
1333 if (token.desiredProducer == nullptr ||
1334 token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1335 if (!update_current_producer_after_rotation(token)) {
1336 return 0;
1337 }
1338 }
1339
1340 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1341 if (count == max) {
1342 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >=
1343 EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1344 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1345 }
1346 return max;
1347 }
1348 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1349 max -= count;
1350
1351 auto tail = producerListTail.load(std::memory_order_acquire);
1352 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1353 if (ptr == nullptr) {
1354 ptr = tail;
1355 }
1356 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1357 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1358 count += dequeued;
1359 if (dequeued != 0) {
1360 token.currentProducer = ptr;
1361 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1362 }
1363 if (dequeued == max) {
1364 break;
1365 }
1366 max -= dequeued;
1367 ptr = ptr->next_prod();
1368 if (ptr == nullptr) {
1369 ptr = tail;
1370 }
1371 }
1372 return count;
1373 }
1374
1375 // Attempts to dequeue from a specific producer's inner queue.
1376 // If you happen to know which producer you want to dequeue from, this
1377 // is significantly faster than using the general-case try_dequeue methods.
1378 // Returns false if the producer's queue appeared empty at the time it
1379 // was checked (so, the queue is likely but not guaranteed to be empty).
1380 // Never allocates. Thread-safe.
1381 template <typename U> inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1382 {
1383 return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1384 }
1385
1386 // Attempts to dequeue several elements from a specific producer's inner queue.
1387 // Returns the number of items actually dequeued.
1388 // If you happen to know which producer you want to dequeue from, this
1389 // is significantly faster than using the general-case try_dequeue methods.
1390 // Returns 0 if the producer's queue appeared empty at the time it
1391 // was checked (so, the queue is likely but not guaranteed to be empty).
1392 // Never allocates. Thread-safe.
1393 template <typename It>
1394 inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1395 {
1396 return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1397 }
1398
1399 // Returns an estimate of the total number of elements currently in the queue. This
1400 // estimate is only accurate if the queue has completely stabilized before it is called
1401 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1402 // visible on the calling thread, and no further operations start while this method is
1403 // being called).
1404 // Thread-safe.
1405 size_t size_approx() const
1406 {
1407 size_t size = 0;
1408 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1409 size += ptr->size_approx();
1410 }
1411 return size;
1412 }
1413
1414 // Returns true if the underlying atomic variables used by
1415 // the queue are lock-free (they should be on most platforms).
1416 // Thread-safe.
1417 static constexpr bool is_lock_free()
1418 {
1423 typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value == 2;
1424 }
1425
1426 private:
1427 friend struct ProducerToken;
1428 friend struct ConsumerToken;
1429 struct ExplicitProducer;
1430 friend struct ExplicitProducer;
1431 struct ImplicitProducer;
1432 friend struct ImplicitProducer;
1433 friend class ConcurrentQueueTests;
1434
1435 enum AllocationMode { CanAlloc, CannotAlloc };
1436
1438 // Queue methods
1440
1441 template <AllocationMode canAlloc, typename U> inline bool inner_enqueue(producer_token_t const& token, U&& element)
1442 {
1443 return static_cast<ExplicitProducer*>(token.producer)
1444 ->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1445 }
1446
1447 template <AllocationMode canAlloc, typename U> inline bool inner_enqueue(U&& element)
1448 {
1449 auto producer = get_or_add_implicit_producer();
1450 return producer == nullptr
1451 ? false
1452 : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1453 }
1454
1455 template <AllocationMode canAlloc, typename It>
1456 inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1457 {
1458 return static_cast<ExplicitProducer*>(token.producer)
1459 ->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1460 }
1461
1462 template <AllocationMode canAlloc, typename It> inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1463 {
1464 auto producer = get_or_add_implicit_producer();
1465 return producer == nullptr
1466 ? false
1467 : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1468 }
1469
1470 inline bool update_current_producer_after_rotation(consumer_token_t& token)
1471 {
1472 // Ah, there's been a rotation, figure out where we should be!
1473 auto tail = producerListTail.load(std::memory_order_acquire);
1474 if (token.desiredProducer == nullptr && tail == nullptr) {
1475 return false;
1476 }
1477 auto prodCount = producerCount.load(std::memory_order_relaxed);
1478 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1479 if ((details::unlikely)(token.desiredProducer == nullptr)) {
1480 // Aha, first time we're dequeueing anything.
1481 // Figure out our local position
1482 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1483 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1484 token.desiredProducer = tail;
1485 for (std::uint32_t i = 0; i != offset; ++i) {
1486 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1487 if (token.desiredProducer == nullptr) {
1488 token.desiredProducer = tail;
1489 }
1490 }
1491 }
1492
1493 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1494 if (delta >= prodCount) {
1495 delta = delta % prodCount;
1496 }
1497 for (std::uint32_t i = 0; i != delta; ++i) {
1498 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1499 if (token.desiredProducer == nullptr) {
1500 token.desiredProducer = tail;
1501 }
1502 }
1503
1504 token.lastKnownGlobalOffset = globalOffset;
1505 token.currentProducer = token.desiredProducer;
1506 token.itemsConsumedFromCurrent = 0;
1507 return true;
1508 }
1509
1511 // Free list
1513
1514 template <typename N> struct FreeListNode {
1515 FreeListNode()
1516 : freeListRefs(0)
1517 , freeListNext(nullptr)
1518 {}
1519
1520 std::atomic<std::uint32_t> freeListRefs;
1521 std::atomic<N*> freeListNext;
1522 };
1523
1524 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1525 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1526 // speedy under low contention.
1527 template <typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1528 struct FreeList {
1529 FreeList()
1530 : freeListHead(nullptr)
1531 {}
1532 FreeList(FreeList&& other)
1533 : freeListHead(other.freeListHead.load(std::memory_order_relaxed))
1534 {
1535 other.freeListHead.store(nullptr, std::memory_order_relaxed);
1536 }
1537 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1538
1539 FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1540 FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1541
1542 inline void add(N* node)
1543 {
1544#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1545 debug::DebugLock lock(mutex);
1546#endif
1547 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1548 // set it using a fetch_add
1549 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1550 // Oh look! We were the last ones referencing this node, and we know
1551 // we want to add it to the free list, so let's do it!
1552 add_knowing_refcount_is_zero(node);
1553 }
1554 }
1555
1556 inline N* try_get()
1557 {
1558#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1559 debug::DebugLock lock(mutex);
1560#endif
1561 auto head = freeListHead.load(std::memory_order_acquire);
1562 while (head != nullptr) {
1563 auto prevHead = head;
1564 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1565 if ((refs & REFS_MASK) == 0 ||
1566 !head->freeListRefs.compare_exchange_strong(
1567 refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1568 head = freeListHead.load(std::memory_order_acquire);
1569 continue;
1570 }
1571
1572 // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1573 // next and not worry about it changing between now and the time we do the CAS
1574 auto next = head->freeListNext.load(std::memory_order_relaxed);
1575 if (freeListHead.compare_exchange_strong(
1576 head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1577 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1578 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put
1579 // back on).
1580 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1581
1582 // Decrease refcount twice, once for our ref, and once for the list's ref
1583 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1584 return head;
1585 }
1586
1587 // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1588 // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1589 // count decrement happens-after the CAS on the head.
1590 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1591 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1592 add_knowing_refcount_is_zero(prevHead);
1593 }
1594 }
1595
1596 return nullptr;
1597 }
1598
1599 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1600 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1601
1602 private:
1603 inline void add_knowing_refcount_is_zero(N* node)
1604 {
1605 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1606 // only one copy of this method per node at a time, i.e. the single thread case), then we know
1607 // we can safely change the next pointer of the node; however, once the refcount is back above
1608 // zero, then other threads could increase it (happens under heavy contention, when the refcount
1609 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1610 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1611 // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1612 // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1613 auto head = freeListHead.load(std::memory_order_relaxed);
1614 while (true) {
1615 node->freeListNext.store(head, std::memory_order_relaxed);
1616 node->freeListRefs.store(1, std::memory_order_release);
1617 if (!freeListHead.compare_exchange_strong(
1618 head, node, std::memory_order_release, std::memory_order_relaxed)) {
1619 // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1620 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1621 continue;
1622 }
1623 }
1624 return;
1625 }
1626 }
1627
1628 private:
1629 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under
1630 // contention)
1631 std::atomic<N*> freeListHead;
1632
1633 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1634 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1635
1636#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1637 debug::DebugMutex mutex;
1638#endif
1639 };
1640
1642 // Block
1644
1645 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1646
1647 struct Block {
1648 Block()
1649 : next(nullptr)
1650 , elementsCompletelyDequeued(0)
1651 , freeListRefs(0)
1652 , freeListNext(nullptr)
1653 , dynamicallyAllocated(true)
1654 {
1655#ifdef MCDBGQ_TRACKMEM
1656 owner = nullptr;
1657#endif
1658 }
1659
1660 template <InnerQueueContext context> inline bool is_empty() const
1661 {
1662 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1663 {
1664 // Check flags
1665 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1666 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1667 return false;
1668 }
1669 }
1670
1671 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1672 std::atomic_thread_fence(std::memory_order_acquire);
1673 return true;
1674 }
1675 else
1676 {
1677 // Check counter
1678 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1679 std::atomic_thread_fence(std::memory_order_acquire);
1680 return true;
1681 }
1682 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1683 return false;
1684 }
1685 }
1686
1687 // Returns true if the block is now empty (does not apply in explicit context)
1688 template <InnerQueueContext context> inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
1689 {
1690 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1691 {
1692 // Set flag
1693 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(
1694 std::memory_order_relaxed));
1695 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(
1696 true, std::memory_order_release);
1697 return false;
1698 }
1699 else
1700 {
1701 // Increment counter
1702 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1703 assert(prevVal < BLOCK_SIZE);
1704 return prevVal == BLOCK_SIZE - 1;
1705 }
1706 }
1707
1708 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1709 // Returns true if the block is now empty (does not apply in explicit context).
1710 template <InnerQueueContext context> inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count)
1711 {
1712 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1713 {
1714 // Set flags
1715 std::atomic_thread_fence(std::memory_order_release);
1716 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1717 for (size_t j = 0; j != count; ++j) {
1718 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1719 emptyFlags[i + j].store(true, std::memory_order_relaxed);
1720 }
1721 return false;
1722 }
1723 else
1724 {
1725 // Increment counter
1726 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1727 assert(prevVal + count <= BLOCK_SIZE);
1728 return prevVal + count == BLOCK_SIZE;
1729 }
1730 }
1731
1732 template <InnerQueueContext context> inline void set_all_empty()
1733 {
1734 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1735 {
1736 // Set all flags
1737 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1738 emptyFlags[i].store(true, std::memory_order_relaxed);
1739 }
1740 }
1741 else
1742 {
1743 // Reset counter
1744 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1745 }
1746 }
1747
1748 template <InnerQueueContext context> inline void reset_empty()
1749 {
1750 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1751 {
1752 // Reset flags
1753 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1754 emptyFlags[i].store(false, std::memory_order_relaxed);
1755 }
1756 }
1757 else
1758 {
1759 // Reset counter
1760 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1761 }
1762 }
1763
1764 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT
1765 {
1766 return static_cast<T*>(static_cast<void*>(elements)) +
1767 static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1768 }
1769 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT
1770 {
1771 return static_cast<T const*>(static_cast<void const*>(elements)) +
1772 static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1773 }
1774
1775 private:
1776 static_assert(std::alignment_of<T>::value <= sizeof(T),
1777 "The queue does not support types with an alignment greater than their size at this time");
1778 MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T) elements;
1779
1780 public:
1781 Block* next;
1782 std::atomic<size_t> elementsCompletelyDequeued;
1783 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1784
1785 public:
1786 std::atomic<std::uint32_t> freeListRefs;
1787 std::atomic<Block*> freeListNext;
1788 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1789
1790#ifdef MCDBGQ_TRACKMEM
1791 void* owner;
1792#endif
1793 };
1794 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value,
1795 "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1796
1797#ifdef MCDBGQ_TRACKMEM
1798 public:
1799 struct MemStats;
1800
1801 private:
1802#endif
1803
1805 // Producer base
1807
1808 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase {
1809 ProducerBase(ConcurrentQueue* parent_, bool isExplicit_)
1810 : tailIndex(0)
1811 , headIndex(0)
1812 , dequeueOptimisticCount(0)
1813 , dequeueOvercommit(0)
1814 , tailBlock(nullptr)
1815 , isExplicit(isExplicit_)
1816 , parent(parent_)
1817 {}
1818
1819 virtual ~ProducerBase() {}
1820
1821 template <typename U> inline bool dequeue(U& element)
1822 {
1823 if (isExplicit) {
1824 return static_cast<ExplicitProducer*>(this)->dequeue(element);
1825 } else {
1826 return static_cast<ImplicitProducer*>(this)->dequeue(element);
1827 }
1828 }
1829
1830 template <typename It> inline size_t dequeue_bulk(It& itemFirst, size_t max)
1831 {
1832 if (isExplicit) {
1833 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1834 } else {
1835 return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1836 }
1837 }
1838
1839 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1840
1841 inline size_t size_approx() const
1842 {
1843 auto tail = tailIndex.load(std::memory_order_relaxed);
1844 auto head = headIndex.load(std::memory_order_relaxed);
1845 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1846 }
1847
1848 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1849
1850 protected:
1851 std::atomic<index_t> tailIndex; // Where to enqueue to next
1852 std::atomic<index_t> headIndex; // Where to dequeue from next
1853
1854 std::atomic<index_t> dequeueOptimisticCount;
1855 std::atomic<index_t> dequeueOvercommit;
1856
1857 Block* tailBlock;
1858
1859 public:
1860 bool isExplicit;
1861 ConcurrentQueue* parent;
1862
1863 protected:
1864#ifdef MCDBGQ_TRACKMEM
1865 friend struct MemStats;
1866#endif
1867 };
1868
1870 // Explicit queue
1872
1873 struct ExplicitProducer : public ProducerBase {
1874 explicit ExplicitProducer(ConcurrentQueue* parent_)
1875 : ProducerBase(parent_, true)
1876 , blockIndex(nullptr)
1877 , pr_blockIndexSlotsUsed(0)
1878 , pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1)
1879 , pr_blockIndexFront(0)
1880 , pr_blockIndexEntries(nullptr)
1881 , pr_blockIndexRaw(nullptr)
1882 {
1883 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1884 if (poolBasedIndexSize > pr_blockIndexSize) {
1885 pr_blockIndexSize = poolBasedIndexSize;
1886 }
1887
1888 new_block_index(
1889 0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1890 }
1891
1892 ~ExplicitProducer()
1893 {
1894 // Destruct any elements not yet dequeued.
1895 // Since we're in the destructor, we can assume all elements
1896 // are either completely dequeued or completely not (no halfways).
1897 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1898 // First find the block that's partially dequeued, if any
1899 Block* halfDequeuedBlock = nullptr;
1900 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1901 // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1902 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a
1903 // boundary)
1904 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1905 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1906 this->headIndex.load(std::memory_order_relaxed))) {
1907 i = (i + 1) & (pr_blockIndexSize - 1);
1908 }
1909 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1910 this->headIndex.load(std::memory_order_relaxed)));
1911 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1912 }
1913
1914 // Start at the head block (note the first line in the loop gives us the head from the tail on the first
1915 // iteration)
1916 auto block = this->tailBlock;
1917 do {
1918 block = block->next;
1919 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1920 continue;
1921 }
1922
1923 size_t i = 0; // Offset into block
1924 if (block == halfDequeuedBlock) {
1925 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
1926 static_cast<index_t>(BLOCK_SIZE - 1));
1927 }
1928
1929 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach
1930 // the tail index
1931 auto lastValidIndex =
1932 (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0
1933 ? BLOCK_SIZE
1934 : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) &
1935 static_cast<index_t>(BLOCK_SIZE - 1));
1936 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1937 (*block)[i++]->~T();
1938 }
1939 } while (block != this->tailBlock);
1940 }
1941
1942 // Destroy all blocks that we own
1943 if (this->tailBlock != nullptr) {
1944 auto block = this->tailBlock;
1945 do {
1946 auto nextBlock = block->next;
1947 this->parent->add_block_to_free_list(block);
1948 block = nextBlock;
1949 } while (block != this->tailBlock);
1950 }
1951
1952 // Destroy the block indices
1953 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1954 while (header != nullptr) {
1955 auto prev = static_cast<BlockIndexHeader*>(header->prev);
1956 header->~BlockIndexHeader();
1957 (Traits::free)(header);
1958 header = prev;
1959 }
1960 }
1961
1962 template <AllocationMode allocMode, typename U> inline bool enqueue(U&& element)
1963 {
1964 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1965 index_t newTailIndex = 1 + currentTailIndex;
1966 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1967 // We reached the end of a block, start a new one
1968 auto startBlock = this->tailBlock;
1969 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1970 if (this->tailBlock != nullptr &&
1971 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1972 // We can re-use the block ahead of us, it's empty!
1973 this->tailBlock = this->tailBlock->next;
1974 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1975
1976 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing
1977 // the last block from it first -- except instead of removing then adding, we can just overwrite).
1978 // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1979 // it would have been re-attempted when adding the first block to the queue; since there is such
1980 // a block, a block index must have been successfully allocated.
1981 } else {
1982 // Whatever head value we see here is >= the last value we saw here (relatively),
1983 // and <= its current value. Since we have the most recent tail, the head must be
1984 // <= to it.
1985 auto head = this->headIndex.load(std::memory_order_relaxed);
1986 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1987 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
1988 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
1989 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1990 // We can't enqueue in another block because there's not enough leeway -- the
1991 // tail could surpass the head by the time the block fills up! (Or we'll exceed
1992 // the size limit, if the second part of the condition was true.)
1993 return false;
1994 }
1995 // We're going to need a new block; check that the block index has room
1996 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1997 // Hmm, the circular block index is already full -- we'll need
1998 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1999 // the initial allocation failed in the constructor.
2000
2001 MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
2002 {
2003 return false;
2004 }
2005 else if (!new_block_index(pr_blockIndexSlotsUsed))
2006 {
2007 return false;
2008 }
2009 }
2010
2011 // Insert a new block in the circular linked list
2012 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2013 if (newBlock == nullptr) {
2014 return false;
2015 }
2016#ifdef MCDBGQ_TRACKMEM
2017 newBlock->owner = this;
2018#endif
2019 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2020 if (this->tailBlock == nullptr) {
2021 newBlock->next = newBlock;
2022 } else {
2023 newBlock->next = this->tailBlock->next;
2024 this->tailBlock->next = newBlock;
2025 }
2026 this->tailBlock = newBlock;
2027 ++pr_blockIndexSlotsUsed;
2028 }
2029
2030 MOODYCAMEL_CONSTEXPR_IF(
2031 !MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2032 {
2033 // The constructor may throw. We want the element not to appear in the queue in
2034 // that case (without corrupting the queue):
2035 MOODYCAMEL_TRY
2036 {
2037 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2038 }
2039 MOODYCAMEL_CATCH(...)
2040 {
2041 // Revert change to the current block, but leave the new block available
2042 // for next time
2043 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2044 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
2045 MOODYCAMEL_RETHROW;
2046 }
2047 }
2048 else
2049 {
2050 (void)startBlock;
2051 (void)originalBlockIndexSlotsUsed;
2052 }
2053
2054 // Add block to block index
2055 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2056 entry.base = currentTailIndex;
2057 entry.block = this->tailBlock;
2058 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
2059 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2060
2061 MOODYCAMEL_CONSTEXPR_IF(
2062 !MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2063 {
2064 this->tailIndex.store(newTailIndex, std::memory_order_release);
2065 return true;
2066 }
2067 }
2068
2069 // Enqueue
2070 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2071
2072 this->tailIndex.store(newTailIndex, std::memory_order_release);
2073 return true;
2074 }
2075
2076 template <typename U> bool dequeue(U& element)
2077 {
2078 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2079 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2080 if (details::circular_less_than<index_t>(
2081 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2082 // Might be something to dequeue, let's give it a try
2083
2084 // Note that this if is purely for performance purposes in the common case when the queue is
2085 // empty and the values are eventually consistent -- we may enter here spuriously.
2086
2087 // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
2088 // change them) and must be the same value at this point (inside the if) as when the if condition was
2089 // evaluated.
2090
2091 // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit
2092 // below. This ensures that whatever the value we got loaded into overcommit, the load of
2093 // dequeueOptisticCount in the fetch_add below will result in a value at least as recent as that (and
2094 // therefore at least as large). Note that I believe a compiler (signal) fence here would be sufficient
2095 // due to the nature of fetch_add (all read-modify-write operations are guaranteed to work on the latest
2096 // value in the modification order), but unfortunately that can't be shown to be correct using only the
2097 // C++11 standard. See
2098 // http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
2099 std::atomic_thread_fence(std::memory_order_acquire);
2100
2101 // Increment optimistic counter, then check if it went over the boundary
2102 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2103
2104 // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is
2105 // only ever incremented after dequeueOptimisticCount -- this is enforced in the `else` block below),
2106 // and since we now have a version of dequeueOptimisticCount that is at least as recent as overcommit
2107 // (due to the release upon incrementing dequeueOvercommit and the acquire above that synchronizes with
2108 // it), overcommit <= myDequeueCount. However, we can't assert this since both dequeueOptimisticCount
2109 // and dequeueOvercommit may (independently) overflow; in such a case, though, the logic still holds
2110 // since the difference between the two is maintained.
2111
2112 // Note that we reload tail here in case it changed; it will be the same value as before or greater,
2113 // since this load is sequenced after (happens after) the earlier load above. This is supported by
2114 // read-read coherency (as defined in the standard), explained here:
2115 // http://en.cppreference.com/w/cpp/atomic/memory_order
2116 tail = this->tailIndex.load(std::memory_order_acquire);
2117 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2118 // Guaranteed to be at least one element to dequeue!
2119
2120 // Get the index. Note that since there's guaranteed to be at least one element, this
2121 // will never exceed tail. We need to do an acquire-release fence here since it's possible
2122 // that whatever condition got us to this point was for an earlier enqueued element (that
2123 // we already see the memory effects for), but that by the time we increment somebody else
2124 // has incremented it, and we need to see the memory effects for *that* element, which is
2125 // in such a case is necessarily visible on the thread that incremented it in the first
2126 // place with the more current condition (they must have acquired a tail that is at least
2127 // as recent).
2128 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2129
2130 // Determine which block the element is in
2131
2132 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2133 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2134
2135 // We need to be careful here about subtracting and dividing because of index wrap-around.
2136 // When an index wraps, we need to preserve the sign of the offset when dividing it by the
2137 // block size (in order to get a correct signed block count offset in all cases):
2138 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2139 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
2140 auto offset = static_cast<size_t>(
2141 static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) /
2142 static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2143 auto block =
2144 localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2145
2146 // Dequeue
2147 auto& el = *((*block)[index]);
2148 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2149 // Make sure the element is still fully dequeued and destroyed even if the assignment
2150 // throws
2151 struct Guard {
2152 Block* block;
2153 index_t index;
2154
2155 ~Guard()
2156 {
2157 (*block)[index]->~T();
2158 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2159 }
2160 } guard = { block, index };
2161
2162 element = std::move(el); // NOLINT
2163 } else {
2164 element = std::move(el); // NOLINT
2165 el.~T(); // NOLINT
2166 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2167 }
2168
2169 return true;
2170 } else {
2171 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2172 this->dequeueOvercommit.fetch_add(
2173 1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is
2174 // guaranteed to happen before this write
2175 }
2176 }
2177
2178 return false;
2179 }
2180
2181 template <AllocationMode allocMode, typename It>
2182 bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count)
2183 {
2184 // First, we need to make sure we have enough room to enqueue all of the elements;
2185 // this means pre-allocating blocks and putting them in the block index (but only if
2186 // all the allocations succeeded).
2187 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2188 auto startBlock = this->tailBlock;
2189 auto originalBlockIndexFront = pr_blockIndexFront;
2190 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2191
2192 Block* firstAllocatedBlock = nullptr;
2193
2194 // Figure out how many blocks we'll need to allocate, and do so
2195 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2196 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2197 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2198 if (blockBaseDiff > 0) {
2199 // Allocate as many blocks as possible from ahead
2200 while (blockBaseDiff > 0 && this->tailBlock != nullptr &&
2201 this->tailBlock->next != firstAllocatedBlock &&
2202 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2203 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2204 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2205
2206 this->tailBlock = this->tailBlock->next;
2207 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2208
2209 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2210 entry.base = currentTailIndex;
2211 entry.block = this->tailBlock;
2212 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2213 }
2214
2215 // Now allocate as many blocks as necessary from the block pool
2216 while (blockBaseDiff > 0) {
2217 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2218 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2219
2220 auto head = this->headIndex.load(std::memory_order_relaxed);
2221 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2222 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2223 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2224 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2225 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2226 MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
2227 {
2228 // Failed to allocate, undo changes (but keep injected blocks)
2229 pr_blockIndexFront = originalBlockIndexFront;
2230 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2231 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2232 return false;
2233 }
2234 else if (full || !new_block_index(originalBlockIndexSlotsUsed))
2235 {
2236 // Failed to allocate, undo changes (but keep injected blocks)
2237 pr_blockIndexFront = originalBlockIndexFront;
2238 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2239 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2240 return false;
2241 }
2242
2243 // pr_blockIndexFront is updated inside new_block_index, so we need to
2244 // update our fallback value too (since we keep the new index even if we
2245 // later fail)
2246 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2247 }
2248
2249 // Insert a new block in the circular linked list
2250 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2251 if (newBlock == nullptr) {
2252 pr_blockIndexFront = originalBlockIndexFront;
2253 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2254 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2255 return false;
2256 }
2257
2258#ifdef MCDBGQ_TRACKMEM
2259 newBlock->owner = this;
2260#endif
2261 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2262 if (this->tailBlock == nullptr) {
2263 newBlock->next = newBlock;
2264 } else {
2265 newBlock->next = this->tailBlock->next;
2266 this->tailBlock->next = newBlock;
2267 }
2268 this->tailBlock = newBlock;
2269 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2270
2271 ++pr_blockIndexSlotsUsed;
2272
2273 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2274 entry.base = currentTailIndex;
2275 entry.block = this->tailBlock;
2276 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2277 }
2278
2279 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2280 // publish the new block index front
2281 auto block = firstAllocatedBlock;
2282 while (true) {
2283 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2284 if (block == this->tailBlock) {
2285 break;
2286 }
2287 block = block->next;
2288 }
2289
2290 MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
2291 T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2292 {
2293 blockIndex.load(std::memory_order_relaxed)
2294 ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2295 }
2296 }
2297
2298 // Enqueue, one block at a time
2299 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2300 currentTailIndex = startTailIndex;
2301 auto endBlock = this->tailBlock;
2302 this->tailBlock = startBlock;
2303 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr ||
2304 count == 0);
2305 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2306 this->tailBlock = firstAllocatedBlock;
2307 }
2308 while (true) {
2309 index_t stopIndex =
2310 (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2311 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2312 stopIndex = newTailIndex;
2313 }
2314 MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
2315 T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2316 {
2317 while (currentTailIndex != stopIndex) {
2318 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2319 }
2320 }
2321 else
2322 {
2323 MOODYCAMEL_TRY
2324 {
2325 while (currentTailIndex != stopIndex) {
2326 // Must use copy constructor even if move constructor is available
2327 // because we may have to revert if there's an exception.
2328 // Sorry about the horrible templated next line, but it was the only way
2329 // to disable moving *at compile time*, which is important because a type
2330 // may only define a (noexcept) move constructor, and so calls to the
2331 // cctor will not compile, even if they are in an if branch that will never
2332 // be executed
2333 new ((*this->tailBlock)[currentTailIndex])
2334 T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(
2335 T,
2336 decltype(*itemFirst),
2337 new (static_cast<T*>(nullptr))
2338 T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2339 ++currentTailIndex;
2340 ++itemFirst;
2341 }
2342 }
2343 MOODYCAMEL_CATCH(...)
2344 {
2345 // Oh dear, an exception's been thrown -- destroy the elements that
2346 // were enqueued so far and revert the entire bulk operation (we'll keep
2347 // any allocated blocks in our linked list for later, though).
2348 auto constructedStopIndex = currentTailIndex;
2349 auto lastBlockEnqueued = this->tailBlock;
2350
2351 pr_blockIndexFront = originalBlockIndexFront;
2352 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2353 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2354
2356 auto block = startBlock;
2357 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2358 block = firstAllocatedBlock;
2359 }
2360 currentTailIndex = startTailIndex;
2361 while (true) {
2362 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2363 static_cast<index_t>(BLOCK_SIZE);
2364 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2365 stopIndex = constructedStopIndex;
2366 }
2367 while (currentTailIndex != stopIndex) {
2368 (*block)[currentTailIndex++]->~T();
2369 }
2370 if (block == lastBlockEnqueued) {
2371 break;
2372 }
2373 block = block->next;
2374 }
2375 }
2376 MOODYCAMEL_RETHROW;
2377 }
2378 }
2379
2380 if (this->tailBlock == endBlock) {
2381 assert(currentTailIndex == newTailIndex);
2382 break;
2383 }
2384 this->tailBlock = this->tailBlock->next;
2385 }
2386
2387 MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(
2388 T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2389 {
2390 if (firstAllocatedBlock != nullptr)
2391 blockIndex.load(std::memory_order_relaxed)
2392 ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2393 }
2394
2395 this->tailIndex.store(newTailIndex, std::memory_order_release);
2396 return true;
2397 }
2398
2399 template <typename It> size_t dequeue_bulk(It& itemFirst, size_t max)
2400 {
2401 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2402 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2403 auto desiredCount =
2404 static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2405 if (details::circular_less_than<size_t>(0, desiredCount)) {
2406 desiredCount = desiredCount < max ? desiredCount : max;
2407 std::atomic_thread_fence(std::memory_order_acquire);
2408
2409 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2410
2411 tail = this->tailIndex.load(std::memory_order_acquire);
2412 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2413 if (details::circular_less_than<size_t>(0, actualCount)) {
2414 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2415 if (actualCount < desiredCount) {
2416 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2417 }
2418
2419 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2420 // will never exceed tail.
2421 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2422
2423 // Determine which block the first element is in
2424 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2425 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2426
2427 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2428 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2429 auto offset = static_cast<size_t>(
2430 static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) /
2431 static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2432 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2433
2434 // Iterate the blocks and dequeue
2435 auto index = firstIndex;
2436 do {
2437 auto firstIndexInBlock = index;
2438 index_t endIndex =
2439 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2440 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount),
2441 endIndex)
2442 ? firstIndex + static_cast<index_t>(actualCount)
2443 : endIndex;
2444 auto block = localBlockIndex->entries[indexIndex].block;
2445 if (MOODYCAMEL_NOEXCEPT_ASSIGN(
2446 T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2447 while (index != endIndex) {
2448 auto& el = *((*block)[index]);
2449 *itemFirst++ = std::move(el);
2450 el.~T();
2451 ++index;
2452 }
2453 } else {
2454 MOODYCAMEL_TRY
2455 {
2456 while (index != endIndex) {
2457 auto& el = *((*block)[index]);
2458 *itemFirst = std::move(el);
2459 ++itemFirst;
2460 el.~T();
2461 ++index;
2462 }
2463 }
2464 MOODYCAMEL_CATCH(...)
2465 {
2466 // It's too late to revert the dequeue, but we can make sure that all
2467 // the dequeued objects are properly destroyed and the block index
2468 // (and empty count) are properly updated before we propagate the exception
2469 do {
2470 block = localBlockIndex->entries[indexIndex].block;
2471 while (index != endIndex) {
2472 (*block)[index++]->~T();
2473 }
2474 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2475 firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2476 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2477
2478 firstIndexInBlock = index;
2479 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2480 static_cast<index_t>(BLOCK_SIZE);
2481 endIndex = details::circular_less_than<index_t>(
2482 firstIndex + static_cast<index_t>(actualCount), endIndex)
2483 ? firstIndex + static_cast<index_t>(actualCount)
2484 : endIndex;
2485 } while (index != firstIndex + actualCount);
2486
2487 MOODYCAMEL_RETHROW;
2488 }
2489 }
2490 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2491 firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2492 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2493 } while (index != firstIndex + actualCount);
2494
2495 return actualCount;
2496 } else {
2497 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2498 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2499 }
2500 }
2501
2502 return 0;
2503 }
2504
2505 private:
2506 struct BlockIndexEntry {
2507 index_t base;
2508 Block* block;
2509 };
2510
2511 struct BlockIndexHeader {
2512 size_t size;
2513 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2514 BlockIndexEntry* entries;
2515 void* prev;
2516 };
2517
2518 bool new_block_index(size_t numberOfFilledSlotsToExpose)
2519 {
2520 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2521
2522 // Create the new block
2523 pr_blockIndexSize <<= 1;
2524 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) +
2525 std::alignment_of<BlockIndexEntry>::value - 1 +
2526 sizeof(BlockIndexEntry) * pr_blockIndexSize));
2527 if (newRawPtr == nullptr) {
2528 pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2529 return false;
2530 }
2531
2532 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(
2533 details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2534
2535 // Copy in all the old indices, if any
2536 size_t j = 0;
2537 if (pr_blockIndexSlotsUsed != 0) {
2538 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2539 do {
2540 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2541 i = (i + 1) & prevBlockSizeMask;
2542 } while (i != pr_blockIndexFront);
2543 }
2544
2545 // Update everything
2546 auto header = new (newRawPtr) BlockIndexHeader;
2547 header->size = pr_blockIndexSize;
2548 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2549 header->entries = newBlockIndexEntries;
2550 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2551
2552 pr_blockIndexFront = j;
2553 pr_blockIndexEntries = newBlockIndexEntries;
2554 pr_blockIndexRaw = newRawPtr;
2555 blockIndex.store(header, std::memory_order_release);
2556
2557 return true;
2558 }
2559
2560 private:
2561 std::atomic<BlockIndexHeader*> blockIndex;
2562
2563 // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2564 size_t pr_blockIndexSlotsUsed;
2565 size_t pr_blockIndexSize;
2566 size_t pr_blockIndexFront; // Next slot (not current)
2567 BlockIndexEntry* pr_blockIndexEntries;
2568 void* pr_blockIndexRaw;
2569
2570#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2571 public:
2572 ExplicitProducer* nextExplicitProducer;
2573
2574 private:
2575#endif
2576
2577#ifdef MCDBGQ_TRACKMEM
2578 friend struct MemStats;
2579#endif
2580 };
2581
2583 // Implicit queue
2585
2586 struct ImplicitProducer : public ProducerBase {
2587 ImplicitProducer(ConcurrentQueue* parent_)
2588 : ProducerBase(parent_, false)
2589 , nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE)
2590 , blockIndex(nullptr)
2591 {
2592 new_block_index();
2593 }
2594
2595 ~ImplicitProducer()
2596 {
2597 // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2598 // completed already; this means that all undequeued elements are placed contiguously across
2599 // contiguous blocks, and that only the first and last remaining blocks can be only partially
2600 // empty (all other remaining blocks must be completely full).
2601
2602#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2603 // Unregister ourselves for thread termination notification
2604 if (!this->inactive.load(std::memory_order_relaxed)) {
2605 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2606 }
2607#endif
2608
2609 // Destroy all remaining elements!
2610 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2611 auto index = this->headIndex.load(std::memory_order_relaxed);
2612 Block* block = nullptr;
2613 assert(index == tail || details::circular_less_than(index, tail));
2614 bool forceFreeLastBlock =
2615 index != tail; // If we enter the loop, then the last (tail) block will not be freed
2616 while (index != tail) {
2617 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2618 if (block != nullptr) {
2619 // Free the old block
2620 this->parent->add_block_to_free_list(block);
2621 }
2622
2623 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2624 }
2625
2626 ((*block)[index])->~T();
2627 ++index;
2628 }
2629 // Even if the queue is empty, there's still one block that's not on the free list
2630 // (unless the head index reached the end of it, in which case the tail will be poised
2631 // to create a new block).
2632 if (this->tailBlock != nullptr &&
2633 (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2634 this->parent->add_block_to_free_list(this->tailBlock);
2635 }
2636
2637 // Destroy block index
2638 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2639 if (localBlockIndex != nullptr) {
2640 for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2641 localBlockIndex->index[i]->~BlockIndexEntry();
2642 }
2643 do {
2644 auto prev = localBlockIndex->prev;
2645 localBlockIndex->~BlockIndexHeader();
2646 (Traits::free)(localBlockIndex);
2647 localBlockIndex = prev;
2648 } while (localBlockIndex != nullptr);
2649 }
2650 }
2651
2652 template <AllocationMode allocMode, typename U> inline bool enqueue(U&& element)
2653 {
2654 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2655 index_t newTailIndex = 1 + currentTailIndex;
2656 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2657 // We reached the end of a block, start a new one
2658 auto head = this->headIndex.load(std::memory_order_relaxed);
2659 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2660 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2661 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2662 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2663 return false;
2664 }
2665#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2666 debug::DebugLock lock(mutex);
2667#endif
2668 // Find out where we'll be inserting this block in the block index
2669 BlockIndexEntry* idxEntry;
2670 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2671 return false;
2672 }
2673
2674 // Get ahold of a new block
2675 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2676 if (newBlock == nullptr) {
2677 rewind_block_index_tail();
2678 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2679 return false;
2680 }
2681#ifdef MCDBGQ_TRACKMEM
2682 newBlock->owner = this;
2683#endif
2684 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2685
2686 MOODYCAMEL_CONSTEXPR_IF(
2687 !MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2688 {
2689 // May throw, try to insert now before we publish the fact that we have this new block
2690 MOODYCAMEL_TRY
2691 {
2692 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2693 }
2694 MOODYCAMEL_CATCH(...)
2695 {
2696 rewind_block_index_tail();
2697 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2698 this->parent->add_block_to_free_list(newBlock);
2699 MOODYCAMEL_RETHROW;
2700 }
2701 }
2702
2703 // Insert the new block into the index
2704 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2705
2706 this->tailBlock = newBlock;
2707
2708 MOODYCAMEL_CONSTEXPR_IF(
2709 !MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2710 {
2711 this->tailIndex.store(newTailIndex, std::memory_order_release);
2712 return true;
2713 }
2714 }
2715
2716 // Enqueue
2717 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2718
2719 this->tailIndex.store(newTailIndex, std::memory_order_release);
2720 return true;
2721 }
2722
2723 template <typename U> bool dequeue(U& element)
2724 {
2725 // See ExplicitProducer::dequeue for rationale and explanation
2726 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2727 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2728 if (details::circular_less_than<index_t>(
2729 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2730 std::atomic_thread_fence(std::memory_order_acquire);
2731
2732 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2733 tail = this->tailIndex.load(std::memory_order_acquire);
2734 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2735 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2736
2737 // Determine which block the element is in
2738 auto entry = get_block_index_entry_for_index(index);
2739
2740 // Dequeue
2741 auto block = entry->value.load(std::memory_order_relaxed);
2742 auto& el = *((*block)[index]);
2743
2744 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2745#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2746 // Note: Acquiring the mutex with every dequeue instead of only when a block
2747 // is released is very sub-optimal, but it is, after all, purely debug code.
2748 debug::DebugLock lock(producer->mutex);
2749#endif
2750 struct Guard {
2751 Block* block;
2752 index_t index;
2753 BlockIndexEntry* entry;
2754 ConcurrentQueue* parent;
2755
2756 ~Guard()
2757 {
2758 (*block)[index]->~T();
2759 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2760 entry->value.store(nullptr, std::memory_order_relaxed);
2761 parent->add_block_to_free_list(block);
2762 }
2763 }
2764 } guard = { block, index, entry, this->parent };
2765
2766 element = std::move(el); // NOLINT
2767 } else {
2768 element = std::move(el); // NOLINT
2769 el.~T(); // NOLINT
2770
2771 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2772 {
2773#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2774 debug::DebugLock lock(mutex);
2775#endif
2776 // Add the block back into the global free pool (and remove from block index)
2777 entry->value.store(nullptr, std::memory_order_relaxed);
2778 }
2779 this->parent->add_block_to_free_list(block); // releases the above store
2780 }
2781 }
2782
2783 return true;
2784 } else {
2785 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2786 }
2787 }
2788
2789 return false;
2790 }
2791
2792#ifdef _MSC_VER
2793#pragma warning(push)
2794#pragma warning(disable : 4706) // assignment within conditional expression
2795#endif
2796 template <AllocationMode allocMode, typename It> bool enqueue_bulk(It itemFirst, size_t count)
2797 {
2798 // First, we need to make sure we have enough room to enqueue all of the elements;
2799 // this means pre-allocating blocks and putting them in the block index (but only if
2800 // all the allocations succeeded).
2801
2802 // Note that the tailBlock we start off with may not be owned by us any more;
2803 // this happens if it was filled up exactly to the top (setting tailIndex to
2804 // the first index of the next block which is not yet allocated), then dequeued
2805 // completely (putting it on the free list) before we enqueue again.
2806
2807 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2808 auto startBlock = this->tailBlock;
2809 Block* firstAllocatedBlock = nullptr;
2810 auto endBlock = this->tailBlock;
2811
2812 // Figure out how many blocks we'll need to allocate, and do so
2813 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2814 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2815 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2816 if (blockBaseDiff > 0) {
2817#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2818 debug::DebugLock lock(mutex);
2819#endif
2820 do {
2821 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2822 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2823
2824 // Find out where we'll be inserting this block in the block index
2825 BlockIndexEntry* idxEntry =
2826 nullptr; // initialization here unnecessary but compiler can't always tell
2827 Block* newBlock;
2828 bool indexInserted = false;
2829 auto head = this->headIndex.load(std::memory_order_relaxed);
2830 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2831 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2832 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2833 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2834
2835 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) ||
2836 (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
2837 nullptr) {
2838 // Index allocation or block allocation failed; revert any other allocations
2839 // and index insertions done so far for this operation
2840 if (indexInserted) {
2841 rewind_block_index_tail();
2842 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2843 }
2844 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2845 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2846 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2847 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2848 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2849 rewind_block_index_tail();
2850 }
2851 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2852 this->tailBlock = startBlock;
2853
2854 return false;
2855 }
2856
2857#ifdef MCDBGQ_TRACKMEM
2858 newBlock->owner = this;
2859#endif
2860 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2861 newBlock->next = nullptr;
2862
2863 // Insert the new block into the index
2864 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2865
2866 // Store the chain of blocks so that we can undo if later allocations fail,
2867 // and so that we can find the blocks when we do the actual enqueueing
2868 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2869 firstAllocatedBlock != nullptr) {
2870 assert(this->tailBlock != nullptr);
2871 this->tailBlock->next = newBlock;
2872 }
2873 this->tailBlock = newBlock;
2874 endBlock = newBlock;
2875 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2876 } while (blockBaseDiff > 0);
2877 }
2878
2879 // Enqueue, one block at a time
2880 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2881 currentTailIndex = startTailIndex;
2882 this->tailBlock = startBlock;
2883 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr ||
2884 count == 0);
2885 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2886 this->tailBlock = firstAllocatedBlock;
2887 }
2888 while (true) {
2889 index_t stopIndex =
2890 (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2891 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2892 stopIndex = newTailIndex;
2893 }
2894 MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
2895 T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2896 {
2897 while (currentTailIndex != stopIndex) {
2898 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2899 }
2900 }
2901 else
2902 {
2903 MOODYCAMEL_TRY
2904 {
2905 while (currentTailIndex != stopIndex) {
2906 new ((*this->tailBlock)[currentTailIndex])
2907 T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(
2908 T,
2909 decltype(*itemFirst),
2910 new (static_cast<T*>(nullptr))
2911 T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2912 ++currentTailIndex;
2913 ++itemFirst;
2914 }
2915 }
2916 MOODYCAMEL_CATCH(...)
2917 {
2918 auto constructedStopIndex = currentTailIndex;
2919 auto lastBlockEnqueued = this->tailBlock;
2920
2922 auto block = startBlock;
2923 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2924 block = firstAllocatedBlock;
2925 }
2926 currentTailIndex = startTailIndex;
2927 while (true) {
2928 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2929 static_cast<index_t>(BLOCK_SIZE);
2930 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2931 stopIndex = constructedStopIndex;
2932 }
2933 while (currentTailIndex != stopIndex) {
2934 (*block)[currentTailIndex++]->~T();
2935 }
2936 if (block == lastBlockEnqueued) {
2937 break;
2938 }
2939 block = block->next;
2940 }
2941 }
2942
2943 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2944 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2945 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2946 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2947 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2948 rewind_block_index_tail();
2949 }
2950 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2951 this->tailBlock = startBlock;
2952 MOODYCAMEL_RETHROW;
2953 }
2954 }
2955
2956 if (this->tailBlock == endBlock) {
2957 assert(currentTailIndex == newTailIndex);
2958 break;
2959 }
2960 this->tailBlock = this->tailBlock->next;
2961 }
2962 this->tailIndex.store(newTailIndex, std::memory_order_release);
2963 return true;
2964 }
2965#ifdef _MSC_VER
2966#pragma warning(pop)
2967#endif
2968
2969 template <typename It> size_t dequeue_bulk(It& itemFirst, size_t max)
2970 {
2971 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2972 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2973 auto desiredCount =
2974 static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2975 if (details::circular_less_than<size_t>(0, desiredCount)) {
2976 desiredCount = desiredCount < max ? desiredCount : max;
2977 std::atomic_thread_fence(std::memory_order_acquire);
2978
2979 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2980
2981 tail = this->tailIndex.load(std::memory_order_acquire);
2982 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2983 if (details::circular_less_than<size_t>(0, actualCount)) {
2984 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2985 if (actualCount < desiredCount) {
2986 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2987 }
2988
2989 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2990 // will never exceed tail.
2991 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2992
2993 // Iterate the blocks and dequeue
2994 auto index = firstIndex;
2995 BlockIndexHeader* localBlockIndex;
2996 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2997 do {
2998 auto blockStartIndex = index;
2999 index_t endIndex =
3000 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
3001 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount),
3002 endIndex)
3003 ? firstIndex + static_cast<index_t>(actualCount)
3004 : endIndex;
3005
3006 auto entry = localBlockIndex->index[indexIndex];
3007 auto block = entry->value.load(std::memory_order_relaxed);
3008 if (MOODYCAMEL_NOEXCEPT_ASSIGN(
3009 T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
3010 while (index != endIndex) {
3011 auto& el = *((*block)[index]);
3012 *itemFirst++ = std::move(el);
3013 el.~T();
3014 ++index;
3015 }
3016 } else {
3017 MOODYCAMEL_TRY
3018 {
3019 while (index != endIndex) {
3020 auto& el = *((*block)[index]);
3021 *itemFirst = std::move(el);
3022 ++itemFirst;
3023 el.~T();
3024 ++index;
3025 }
3026 }
3027 MOODYCAMEL_CATCH(...)
3028 {
3029 do {
3030 entry = localBlockIndex->index[indexIndex];
3031 block = entry->value.load(std::memory_order_relaxed);
3032 while (index != endIndex) {
3033 (*block)[index++]->~T();
3034 }
3035
3036 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
3037 blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
3038#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3039 debug::DebugLock lock(mutex);
3040#endif
3041 entry->value.store(nullptr, std::memory_order_relaxed);
3042 this->parent->add_block_to_free_list(block);
3043 }
3044 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3045
3046 blockStartIndex = index;
3047 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
3048 static_cast<index_t>(BLOCK_SIZE);
3049 endIndex = details::circular_less_than<index_t>(
3050 firstIndex + static_cast<index_t>(actualCount), endIndex)
3051 ? firstIndex + static_cast<index_t>(actualCount)
3052 : endIndex;
3053 } while (index != firstIndex + actualCount);
3054
3055 MOODYCAMEL_RETHROW;
3056 }
3057 }
3058 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
3059 blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
3060 {
3061#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3062 debug::DebugLock lock(mutex);
3063#endif
3064 // Note that the set_many_empty above did a release, meaning that anybody who acquires
3065 // the block we're about to free can use it safely since our writes (and reads!) will
3066 // have happened-before then.
3067 entry->value.store(nullptr, std::memory_order_relaxed);
3068 }
3069 this->parent->add_block_to_free_list(block); // releases the above store
3070 }
3071 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3072 } while (index != firstIndex + actualCount);
3073
3074 return actualCount;
3075 } else {
3076 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
3077 }
3078 }
3079
3080 return 0;
3081 }
3082
3083 private:
3084 // The block size must be > 1, so any number with the low bit set is an invalid block base index
3085 static const index_t INVALID_BLOCK_BASE = 1;
3086
3087 struct BlockIndexEntry {
3088 std::atomic<index_t> key;
3089 std::atomic<Block*> value;
3090 };
3091
3092 struct BlockIndexHeader {
3093 size_t capacity;
3094 std::atomic<size_t> tail;
3095 BlockIndexEntry* entries;
3096 BlockIndexEntry** index;
3097 BlockIndexHeader* prev;
3098 };
3099
3100 template <AllocationMode allocMode>
3101 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
3102 {
3103 auto localBlockIndex =
3104 blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
3105 if (localBlockIndex == nullptr) {
3106 return false; // this can happen if new_block_index failed in the constructor
3107 }
3108 size_t newTail =
3109 (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3110 idxEntry = localBlockIndex->index[newTail];
3111 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
3112 idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
3113
3114 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3115 localBlockIndex->tail.store(newTail, std::memory_order_release);
3116 return true;
3117 }
3118
3119 // No room in the old block index, try to allocate another one!
3120 MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
3121 {
3122 return false;
3123 }
3124 else if (!new_block_index())
3125 {
3126 return false;
3127 }
3128 else
3129 {
3130 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3131 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3132 idxEntry = localBlockIndex->index[newTail];
3133 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
3134 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3135 localBlockIndex->tail.store(newTail, std::memory_order_release);
3136 return true;
3137 }
3138 }
3139
3140 inline void rewind_block_index_tail()
3141 {
3142 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3143 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) &
3144 (localBlockIndex->capacity - 1),
3145 std::memory_order_relaxed);
3146 }
3147
3148 inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
3149 {
3150 BlockIndexHeader* localBlockIndex;
3151 auto idx = get_block_index_index_for_index(index, localBlockIndex);
3152 return localBlockIndex->index[idx];
3153 }
3154
3155 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
3156 {
3157#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3158 debug::DebugLock lock(mutex);
3159#endif
3160 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
3161 localBlockIndex = blockIndex.load(std::memory_order_acquire);
3162 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
3163 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
3164 assert(tailBase != INVALID_BLOCK_BASE);
3165 // Note: Must use division instead of shift because the index may wrap around, causing a negative
3166 // offset, whose negativity we want to preserve
3167 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) /
3168 static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
3169 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
3170 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index &&
3171 localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
3172 return idx;
3173 }
3174
3175 bool new_block_index()
3176 {
3177 auto prev = blockIndex.load(std::memory_order_relaxed);
3178 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
3179 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
3180 auto raw = static_cast<char*>(
3181 (Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
3182 sizeof(BlockIndexEntry) * entryCount + std::alignment_of<BlockIndexEntry*>::value - 1 +
3183 sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
3184 if (raw == nullptr) {
3185 return false;
3186 }
3187
3188 auto header = new (raw) BlockIndexHeader;
3189 auto entries =
3190 reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
3191 auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(
3192 reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
3193 if (prev != nullptr) {
3194 auto prevTail = prev->tail.load(std::memory_order_relaxed);
3195 auto prevPos = prevTail;
3196 size_t i = 0;
3197 do {
3198 prevPos = (prevPos + 1) & (prev->capacity - 1);
3199 index[i++] = prev->index[prevPos];
3200 } while (prevPos != prevTail);
3201 assert(i == prevCapacity);
3202 }
3203 for (size_t i = 0; i != entryCount; ++i) {
3204 new (entries + i) BlockIndexEntry;
3205 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3206 index[prevCapacity + i] = entries + i;
3207 }
3208 header->prev = prev;
3209 header->entries = entries;
3210 header->index = index;
3211 header->capacity = nextBlockIndexCapacity;
3212 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3213
3214 blockIndex.store(header, std::memory_order_release);
3215
3216 nextBlockIndexCapacity <<= 1;
3217
3218 return true;
3219 }
3220
3221 private:
3222 size_t nextBlockIndexCapacity;
3223 std::atomic<BlockIndexHeader*> blockIndex;
3224
3225#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3226 public:
3227 details::ThreadExitListener threadExitListener;
3228
3229 private:
3230#endif
3231
3232#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3233 public:
3234 ImplicitProducer* nextImplicitProducer;
3235
3236 private:
3237#endif
3238
3239#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3240 mutable debug::DebugMutex mutex;
3241#endif
3242#ifdef MCDBGQ_TRACKMEM
3243 friend struct MemStats;
3244#endif
3245 };
3246
3248 // Block pool manipulation
3250
3251 void populate_initial_block_list(size_t blockCount)
3252 {
3253 initialBlockPoolSize = blockCount;
3254 if (initialBlockPoolSize == 0) {
3255 initialBlockPool = nullptr;
3256 return;
3257 }
3258
3259 initialBlockPool = create_array<Block>(blockCount);
3260 if (initialBlockPool == nullptr) {
3261 initialBlockPoolSize = 0;
3262 }
3263 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3264 initialBlockPool[i].dynamicallyAllocated = false;
3265 }
3266 }
3267
3268 inline Block* try_get_block_from_initial_pool()
3269 {
3270 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3271 return nullptr;
3272 }
3273
3274 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3275
3276 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3277 }
3278
3279 inline void add_block_to_free_list(Block* block)
3280 {
3281#ifdef MCDBGQ_TRACKMEM
3282 block->owner = nullptr;
3283#endif
3284 if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
3285 destroy(block);
3286 } else {
3287 freeList.add(block);
3288 }
3289 }
3290
3291 inline void add_blocks_to_free_list(Block* block)
3292 {
3293 while (block != nullptr) {
3294 auto next = block->next;
3295 add_block_to_free_list(block);
3296 block = next;
3297 }
3298 }
3299
3300 inline Block* try_get_block_from_free_list() { return freeList.try_get(); }
3301
3302 // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3303 template <AllocationMode canAlloc> Block* requisition_block()
3304 {
3305 auto block = try_get_block_from_initial_pool();
3306 if (block != nullptr) {
3307 return block;
3308 }
3309
3310 block = try_get_block_from_free_list();
3311 if (block != nullptr) {
3312 return block;
3313 }
3314
3315 MOODYCAMEL_CONSTEXPR_IF(canAlloc == CanAlloc)
3316 {
3317 return create<Block>();
3318 }
3319 else
3320 {
3321 return nullptr;
3322 }
3323 }
3324
3325#ifdef MCDBGQ_TRACKMEM
3326 public:
3327 struct MemStats {
3328 size_t allocatedBlocks;
3329 size_t usedBlocks;
3330 size_t freeBlocks;
3331 size_t ownedBlocksExplicit;
3332 size_t ownedBlocksImplicit;
3333 size_t implicitProducers;
3334 size_t explicitProducers;
3335 size_t elementsEnqueued;
3336 size_t blockClassBytes;
3337 size_t queueClassBytes;
3338 size_t implicitBlockIndexBytes;
3339 size_t explicitBlockIndexBytes;
3340
3341 friend class ConcurrentQueue;
3342
3343 private:
3344 static MemStats getFor(ConcurrentQueue* q)
3345 {
3346 MemStats stats = { 0 };
3347
3348 stats.elementsEnqueued = q->size_approx();
3349
3350 auto block = q->freeList.head_unsafe();
3351 while (block != nullptr) {
3352 ++stats.allocatedBlocks;
3353 ++stats.freeBlocks;
3354 block = block->freeListNext.load(std::memory_order_relaxed);
3355 }
3356
3357 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr;
3358 ptr = ptr->next_prod()) {
3359 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3360 stats.implicitProducers += implicit ? 1 : 0;
3361 stats.explicitProducers += implicit ? 0 : 1;
3362
3363 if (implicit) {
3364 auto prod = static_cast<ImplicitProducer*>(ptr);
3365 stats.queueClassBytes += sizeof(ImplicitProducer);
3366 auto head = prod->headIndex.load(std::memory_order_relaxed);
3367 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3368 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3369 if (hash != nullptr) {
3370 for (size_t i = 0; i != hash->capacity; ++i) {
3371 if (hash->index[i]->key.load(std::memory_order_relaxed) !=
3372 ImplicitProducer::INVALID_BLOCK_BASE &&
3373 hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3374 ++stats.allocatedBlocks;
3375 ++stats.ownedBlocksImplicit;
3376 }
3377 }
3378 stats.implicitBlockIndexBytes +=
3379 hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3380 for (; hash != nullptr; hash = hash->prev) {
3381 stats.implicitBlockIndexBytes +=
3382 sizeof(typename ImplicitProducer::BlockIndexHeader) +
3383 hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3384 }
3385 }
3386 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3387 // auto block = prod->get_block_index_entry_for_index(head);
3388 ++stats.usedBlocks;
3389 }
3390 } else {
3391 auto prod = static_cast<ExplicitProducer*>(ptr);
3392 stats.queueClassBytes += sizeof(ExplicitProducer);
3393 auto tailBlock = prod->tailBlock;
3394 bool wasNonEmpty = false;
3395 if (tailBlock != nullptr) {
3396 auto block = tailBlock;
3397 do {
3398 ++stats.allocatedBlocks;
3399 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3400 ++stats.usedBlocks;
3401 wasNonEmpty = wasNonEmpty || block != tailBlock;
3402 }
3403 ++stats.ownedBlocksExplicit;
3404 block = block->next;
3405 } while (block != tailBlock);
3406 }
3407 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3408 while (index != nullptr) {
3409 stats.explicitBlockIndexBytes +=
3410 sizeof(typename ExplicitProducer::BlockIndexHeader) +
3411 index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3412 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3413 }
3414 }
3415 }
3416
3417 auto freeOnInitialPool =
3418 q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize
3419 ? 0
3420 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3421 stats.allocatedBlocks += freeOnInitialPool;
3422 stats.freeBlocks += freeOnInitialPool;
3423
3424 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3425 stats.queueClassBytes += sizeof(ConcurrentQueue);
3426
3427 return stats;
3428 }
3429 };
3430
3431 // For debugging only. Not thread-safe.
3432 MemStats getMemStats() { return MemStats::getFor(this); }
3433
3434 private:
3435 friend struct MemStats;
3436#endif
3437
3439 // Producer list manipulation
3441
3442 ProducerBase* recycle_or_create_producer(bool isExplicit)
3443 {
3444#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3445 debug::DebugLock lock(implicitProdMutex);
3446#endif
3447 // Try to re-use one first
3448 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3449 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3450 bool expected = true;
3451 if (ptr->inactive.compare_exchange_strong(
3452 expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3453 // We caught one! It's been marked as activated, the caller can have it
3454 return ptr;
3455 }
3456 }
3457 }
3458
3459 return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this))
3460 : create<ImplicitProducer>(this));
3461 }
3462
3463 ProducerBase* add_producer(ProducerBase* producer)
3464 {
3465 // Handle failed memory allocation
3466 if (producer == nullptr) {
3467 return nullptr;
3468 }
3469
3470 producerCount.fetch_add(1, std::memory_order_relaxed);
3471
3472 // Add it to the lock-free list
3473 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3474 do {
3475 producer->next = prevTail;
3476 } while (!producerListTail.compare_exchange_weak(
3477 prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3478
3479#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3480 if (producer->isExplicit) {
3481 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3482 do {
3483 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3484 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit,
3485 static_cast<ExplicitProducer*>(producer),
3486 std::memory_order_release,
3487 std::memory_order_relaxed));
3488 } else {
3489 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3490 do {
3491 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3492 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit,
3493 static_cast<ImplicitProducer*>(producer),
3494 std::memory_order_release,
3495 std::memory_order_relaxed));
3496 }
3497#endif
3498
3499 return producer;
3500 }
3501
3502 void reown_producers()
3503 {
3504 // After another instance is moved-into/swapped-with this one, all the
3505 // producers we stole still think their parents are the other queue.
3506 // So fix them up!
3507 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3508 ptr->parent = this;
3509 }
3510 }
3511
3513 // Implicit producer hash
3515
3516 struct ImplicitProducerKVP {
3517 std::atomic<details::thread_id_t> key;
3518 ImplicitProducer*
3519 value; // No need for atomicity since it's only read by the thread that sets it in the first place
3520
3521 ImplicitProducerKVP()
3522 : value(nullptr)
3523 {}
3524
3525 ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3526 {
3527 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3528 value = other.value;
3529 }
3530
3531 inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3532 {
3533 swap(other);
3534 return *this;
3535 }
3536
3537 inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3538 {
3539 if (this != &other) {
3540 details::swap_relaxed(key, other.key);
3541 std::swap(value, other.value);
3542 }
3543 }
3544 };
3545
3546 template <typename XT, typename XTraits>
3547 friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&,
3548 typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT;
3549
3550 struct ImplicitProducerHash {
3551 size_t capacity;
3552 ImplicitProducerKVP* entries;
3553 ImplicitProducerHash* prev;
3554 };
3555
3556 inline void populate_initial_implicit_producer_hash()
3557 {
3558 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
3559 {
3560 return;
3561 }
3562 else
3563 {
3564 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3565 auto hash = &initialImplicitProducerHash;
3566 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3567 hash->entries = &initialImplicitProducerHashEntries[0];
3568 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3569 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3570 }
3571 hash->prev = nullptr;
3572 implicitProducerHash.store(hash, std::memory_order_relaxed);
3573 }
3574 }
3575
3576 void swap_implicit_producer_hashes(ConcurrentQueue& other)
3577 {
3578 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
3579 {
3580 return;
3581 }
3582 else
3583 {
3584 // Swap (assumes our implicit producer hash is initialized)
3585 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3586 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3587 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3588
3589 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3590
3591 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3592 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3593 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3594 } else {
3595 ImplicitProducerHash* hash;
3596 for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3597 hash->prev != &other.initialImplicitProducerHash;
3598 hash = hash->prev) {
3599 continue;
3600 }
3601 hash->prev = &initialImplicitProducerHash;
3602 }
3603 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3604 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3605 } else {
3606 ImplicitProducerHash* hash;
3607 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3608 hash->prev != &initialImplicitProducerHash;
3609 hash = hash->prev) {
3610 continue;
3611 }
3612 hash->prev = &other.initialImplicitProducerHash;
3613 }
3614 }
3615 }
3616
3617 // Only fails (returns nullptr) if memory allocation fails
3618 ImplicitProducer* get_or_add_implicit_producer()
3619 {
3620 // Note that since the data is essentially thread-local (key is thread ID),
3621 // there's a reduced need for fences (memory ordering is already consistent
3622 // for any individual thread), except for the current table itself.
3623
3624 // Start by looking for the thread ID in the current and all previous hash tables.
3625 // If it's not found, it must not be in there yet, since this same thread would
3626 // have added it previously to one of the tables that we traversed.
3627
3628 // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3629
3630#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3631 debug::DebugLock lock(implicitProdMutex);
3632#endif
3633
3634 auto id = details::thread_id();
3635 auto hashedId = details::hash_thread_id(id);
3636
3637 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3638 assert(mainHash != nullptr); // silence clang-tidy and MSVC warnings (hash cannot be null)
3639 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3640 // Look for the id in this hash
3641 auto index = hashedId;
3642 while (true) { // Not an infinite loop because at least one slot is free in the hash table
3643 index &= hash->capacity - 1u;
3644
3645 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3646 if (probedKey == id) {
3647 // Found it! If we had to search several hashes deep, though, we should lazily add it
3648 // to the current main hash table to avoid the extended search next time.
3649 // Note there's guaranteed to be room in the current hash table since every subsequent
3650 // table implicitly reserves space for all previous tables (there's only one
3651 // implicitProducerHashCount).
3652 auto value = hash->entries[index].value;
3653 if (hash != mainHash) {
3654 index = hashedId;
3655 while (true) {
3656 index &= mainHash->capacity - 1u;
3657 auto empty = details::invalid_thread_id;
3658#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3659 auto reusable = details::invalid_thread_id2;
3660 if (mainHash->entries[index].key.compare_exchange_strong(
3661 empty, id, std::memory_order_seq_cst, std::memory_order_relaxed) ||
3662 mainHash->entries[index].key.compare_exchange_strong(
3663 reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3664#else
3665 if (mainHash->entries[index].key.compare_exchange_strong(
3666 empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3667#endif
3668 mainHash->entries[index].value = value;
3669 break;
3670 }
3671 ++index;
3672 }
3673 }
3674
3675 return value;
3676 }
3677 if (probedKey == details::invalid_thread_id) {
3678 break; // Not in this hash table
3679 }
3680 ++index;
3681 }
3682 }
3683
3684 // Insert!
3685 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3686 while (true) {
3687 // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
3688 if (newCount >= (mainHash->capacity >> 1) &&
3689 !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3690 // We've acquired the resize lock, try to allocate a bigger hash table.
3691 // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3692 // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3693 // locked block).
3694 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3695 if (newCount >= (mainHash->capacity >> 1)) {
3696 size_t newCapacity = mainHash->capacity << 1;
3697 while (newCount >= (newCapacity >> 1)) {
3698 newCapacity <<= 1;
3699 }
3700 auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) +
3701 std::alignment_of<ImplicitProducerKVP>::value - 1 +
3702 sizeof(ImplicitProducerKVP) * newCapacity));
3703 if (raw == nullptr) {
3704 // Allocation failed
3705 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3706 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3707 return nullptr;
3708 }
3709
3710 auto newHash = new (raw) ImplicitProducerHash;
3711 newHash->capacity = static_cast<size_t>(newCapacity);
3712 newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(
3713 details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3714 for (size_t i = 0; i != newCapacity; ++i) {
3715 new (newHash->entries + i) ImplicitProducerKVP;
3716 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3717 }
3718 newHash->prev = mainHash;
3719 implicitProducerHash.store(newHash, std::memory_order_release);
3720 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3721 mainHash = newHash;
3722 } else {
3723 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3724 }
3725 }
3726
3727 // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3728 // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3729 // always be true)
3730 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3731 auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false));
3732 if (producer == nullptr) {
3733 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3734 return nullptr;
3735 }
3736
3737#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3738 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3739 producer->threadExitListener.userData = producer;
3740 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3741#endif
3742
3743 auto index = hashedId;
3744 while (true) {
3745 index &= mainHash->capacity - 1u;
3746 auto empty = details::invalid_thread_id;
3747#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3748 auto reusable = details::invalid_thread_id2;
3749 if (mainHash->entries[index].key.compare_exchange_strong(
3750 reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3751 implicitProducerHashCount.fetch_sub(
3752 1, std::memory_order_relaxed); // already counted as a used slot
3753 mainHash->entries[index].value = producer;
3754 break;
3755 }
3756#endif
3757 if (mainHash->entries[index].key.compare_exchange_strong(
3758 empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3759 mainHash->entries[index].value = producer;
3760 break;
3761 }
3762 ++index;
3763 }
3764 return producer;
3765 }
3766
3767 // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3768 // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3769 // we try to allocate ourselves).
3770 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3771 }
3772 }
3773
3774#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3775 void implicit_producer_thread_exited(ImplicitProducer* producer)
3776 {
3777 // Remove from hash
3778#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3779 debug::DebugLock lock(implicitProdMutex);
3780#endif
3781 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3782 assert(hash !=
3783 nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3784 auto id = details::thread_id();
3785 auto hashedId = details::hash_thread_id(id);
3786 details::thread_id_t probedKey;
3787
3788 // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3789 // trying to add an entry thinking there's a free slot (because they reused a producer)
3790 for (; hash != nullptr; hash = hash->prev) {
3791 auto index = hashedId;
3792 do {
3793 index &= hash->capacity - 1u;
3794 probedKey = id;
3795 if (hash->entries[index].key.compare_exchange_strong(
3796 probedKey, details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3797 break;
3798 }
3799 ++index;
3800 } while (probedKey !=
3801 details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it
3802 // yet, or if we weren't added to this hash in the first place
3803 }
3804
3805 // Mark the queue as being recyclable
3806 producer->inactive.store(true, std::memory_order_release);
3807 }
3808
3809 static void implicit_producer_thread_exited_callback(void* userData)
3810 {
3811 auto producer = static_cast<ImplicitProducer*>(userData);
3812 auto queue = producer->parent;
3813 queue->implicit_producer_thread_exited(producer);
3814 }
3815#endif
3816
3818 // Utility functions
3820
3821 template <typename TAlign> static inline void* aligned_malloc(size_t size)
3822 {
3823 MOODYCAMEL_CONSTEXPR_IF(std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3824 return (Traits::malloc)(size);
3825 else
3826 {
3827 size_t alignment = std::alignment_of<TAlign>::value;
3828 void* raw = (Traits::malloc)(size + alignment - 1 + sizeof(void*));
3829 if (!raw)
3830 return nullptr;
3831 char* ptr = details::align_for<TAlign>(reinterpret_cast<char*>(raw) + sizeof(void*));
3832 *(reinterpret_cast<void**>(ptr) - 1) = raw;
3833 return ptr;
3834 }
3835 }
3836
3837 template <typename TAlign> static inline void aligned_free(void* ptr)
3838 {
3839 MOODYCAMEL_CONSTEXPR_IF(std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3840 return (Traits::free)(ptr);
3841 else(Traits::free)(ptr ? *(reinterpret_cast<void**>(ptr) - 1) : nullptr);
3842 }
3843
3844 template <typename U> static inline U* create_array(size_t count)
3845 {
3846 assert(count > 0);
3847 U* p = static_cast<U*>(aligned_malloc<U>(sizeof(U) * count));
3848 if (p == nullptr)
3849 return nullptr;
3850
3851 for (size_t i = 0; i != count; ++i)
3852 new (p + i) U();
3853 return p;
3854 }
3855
3856 template <typename U> static inline void destroy_array(U* p, size_t count)
3857 {
3858 if (p != nullptr) {
3859 assert(count > 0);
3860 for (size_t i = count; i != 0;)
3861 (p + --i)->~U();
3862 }
3863 aligned_free<U>(p);
3864 }
3865
3866 template <typename U> static inline U* create()
3867 {
3868 void* p = aligned_malloc<U>(sizeof(U));
3869 return p != nullptr ? new (p) U : nullptr;
3870 }
3871
3872 template <typename U, typename A1> static inline U* create(A1&& a1)
3873 {
3874 void* p = aligned_malloc<U>(sizeof(U));
3875 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3876 }
3877
3878 template <typename U> static inline void destroy(U* p)
3879 {
3880 if (p != nullptr)
3881 p->~U();
3882 aligned_free<U>(p);
3883 }
3884
3885 private:
3886 std::atomic<ProducerBase*> producerListTail;
3887 std::atomic<std::uint32_t> producerCount;
3888
3889 std::atomic<size_t> initialBlockPoolIndex;
3890 Block* initialBlockPool;
3891 size_t initialBlockPoolSize;
3892
3893#ifndef MCDBGQ_USEDEBUGFREELIST
3894 FreeList<Block> freeList;
3895#else
3896 debug::DebugFreeList<Block> freeList;
3897#endif
3898
3899 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3900 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3901 ImplicitProducerHash initialImplicitProducerHash;
3902 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3903 std::atomic_flag implicitProducerHashResizeInProgress;
3904
3905 std::atomic<std::uint32_t> nextExplicitConsumerId;
3906 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3907
3908#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3909 debug::DebugMutex implicitProdMutex;
3910#endif
3911
3912#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3913 std::atomic<ExplicitProducer*> explicitProducers;
3914 std::atomic<ImplicitProducer*> implicitProducers;
3915#endif
3916};
3917
3918template <typename T, typename Traits>
3919ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue)
3920 : producer(queue.recycle_or_create_producer(true))
3921{
3922 if (producer != nullptr) {
3923 producer->token = this;
3924 }
3925}
3926
3927template <typename T, typename Traits>
3928ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits>& queue)
3929 : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3930{
3931 if (producer != nullptr) {
3932 producer->token = this;
3933 }
3934}
3935
3936template <typename T, typename Traits>
3937ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
3938 : itemsConsumedFromCurrent(0)
3939 , currentProducer(nullptr)
3940 , desiredProducer(nullptr)
3941{
3942 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3943 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3944}
3945
3946template <typename T, typename Traits>
3947ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits>& queue)
3948 : itemsConsumedFromCurrent(0)
3949 , currentProducer(nullptr)
3950 , desiredProducer(nullptr)
3951{
3952 initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(
3953 1, std::memory_order_release);
3954 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3955}
3956
3957template <typename T, typename Traits>
3958inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
3959{
3960 a.swap(b);
3961}
3962
3963inline void swap(ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT
3964{
3965 a.swap(b);
3966}
3967
3968inline void swap(ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT
3969{
3970 a.swap(b);
3971}
3972
3973template <typename T, typename Traits>
3974inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a,
3975 typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT
3976{
3977 a.swap(b);
3978}
3979
3980} // namespace moodycamel
3981
3982#if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3983#pragma warning(pop)
3984#endif
3985
3986#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3987#pragma GCC diagnostic pop
3988#endif
Definition: blockingconcurrentqueue.h:23
Definition: concurrentqueue.h:845
Definition: concurrentqueue.h:639
Definition: concurrentqueue.h:408
Definition: concurrentqueue.h:795
Definition: concurrentqueue.h:735
Definition: concurrentqueue.h:630
Definition: concurrentqueue.h:525
Definition: concurrentqueue.h:380
Definition: concurrentqueue.h:549
Definition: concurrentqueue.h:327
Definition: concurrentqueue.h:618
Definition: concurrentqueue.h:604
Definition: concurrentqueue.h:708
Definition: concurrentqueue.h:726
Definition: concurrentqueue.h:85
Definition: concurrentqueue.h:395