34#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
38#pragma GCC diagnostic push
39#pragma GCC diagnostic ignored "-Wconversion"
41#ifdef MCDBGQ_USE_RELACY
42#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
46#if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
50#pragma warning(disable : 4127)
54#include "TargetConditionals.h"
57#ifdef MCDBGQ_USE_RELACY
58#include "relacy/relacy_std.hpp"
59#include "relacy_shims.h"
86 typedef thread_id_t thread_id_numeric_size_t;
87 typedef thread_id_t thread_id_hash_t;
88 static thread_id_hash_t prehash(thread_id_t
const& x) {
return x; }
92#if defined(MCDBGQ_USE_RELACY)
95typedef std::uint32_t thread_id_t;
96static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
97static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
98static inline thread_id_t thread_id()
100 return rl::thread_index();
104#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
107extern "C" __declspec(dllimport)
unsigned long __stdcall GetCurrentThreadId(
void);
108namespace moodycamel {
110static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
111 "Expected size of unsigned long to be 32 bits on Windows");
112typedef std::uint32_t thread_id_t;
113static const thread_id_t invalid_thread_id = 0;
114static const thread_id_t invalid_thread_id2 =
117static inline thread_id_t thread_id()
119 return static_cast<thread_id_t
>(::GetCurrentThreadId());
123#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || \
124 defined(MOODYCAMEL_NO_THREAD_LOCAL)
125namespace moodycamel {
127static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
128 "std::thread::id is expected to be either 4 or 8 bytes");
130typedef std::thread::id thread_id_t;
131static const thread_id_t invalid_thread_id;
136static inline thread_id_t thread_id()
138 return std::this_thread::get_id();
141template <std::
size_t>
struct thread_id_size {};
142template <>
struct thread_id_size<4> {
143 typedef std::uint32_t numeric_t;
145template <>
struct thread_id_size<8> {
146 typedef std::uint64_t numeric_t;
149template <>
struct thread_id_converter<thread_id_t> {
150 typedef thread_id_size<
sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
152 typedef std::size_t thread_id_hash_t;
154 typedef thread_id_numeric_size_t thread_id_hash_t;
157 static thread_id_hash_t prehash(thread_id_t
const& x)
160 return std::hash<std::thread::id>()(x);
162 return *
reinterpret_cast<thread_id_hash_t const*
>(&x);
172#if defined(__GNUC__) || defined(__INTEL_COMPILER)
173#define MOODYCAMEL_THREADLOCAL __thread
174#elif defined(_MSC_VER)
175#define MOODYCAMEL_THREADLOCAL __declspec(thread)
178#define MOODYCAMEL_THREADLOCAL thread_local
180namespace moodycamel {
182typedef std::uintptr_t thread_id_t;
183static const thread_id_t invalid_thread_id = 0;
184static const thread_id_t invalid_thread_id2 =
186inline thread_id_t thread_id()
188 static MOODYCAMEL_THREADLOCAL
int x;
189 return reinterpret_cast<thread_id_t
>(&x);
196#ifndef MOODYCAMEL_CONSTEXPR_IF
197#if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
198#define MOODYCAMEL_CONSTEXPR_IF if constexpr
199#define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
201#define MOODYCAMEL_CONSTEXPR_IF if
202#define MOODYCAMEL_MAYBE_UNUSED
207#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
208#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
209 (!defined(_MSC_VER) && !defined(__GNUC__))
210#define MOODYCAMEL_EXCEPTIONS_ENABLED
213#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
214#define MOODYCAMEL_TRY try
215#define MOODYCAMEL_CATCH(...) catch (__VA_ARGS__)
216#define MOODYCAMEL_RETHROW throw
217#define MOODYCAMEL_THROW(expr) throw(expr)
219#define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF(true)
220#define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF(false)
221#define MOODYCAMEL_RETHROW
222#define MOODYCAMEL_THROW(expr)
225#ifndef MOODYCAMEL_NOEXCEPT
226#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
227#define MOODYCAMEL_NOEXCEPT
228#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
229#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
230#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
233#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
234#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
235 (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value \
236 ? std::is_trivially_move_constructible<type>::value \
237 : std::is_trivially_copy_constructible<type>::value)
238#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
239 ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value \
240 ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value \
241 : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && \
242 MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
243#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
244#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
245#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
246 (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value \
247 ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value \
248 : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
249#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
250 ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value \
251 ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value \
252 : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && \
253 MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
255#define MOODYCAMEL_NOEXCEPT noexcept
256#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
257#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
261#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
262#ifdef MCDBGQ_USE_RELACY
263#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
268#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && \
269 (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && \
270 (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && \
271 (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
273#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
282#ifndef MOODYCAMEL_DELETE_FUNCTION
283#if defined(_MSC_VER) && _MSC_VER < 1800
284#define MOODYCAMEL_DELETE_FUNCTION
286#define MOODYCAMEL_DELETE_FUNCTION = delete
290namespace moodycamel {
292#ifndef MOODYCAMEL_ALIGNAS
294#if defined(_MSC_VER) && _MSC_VER <= 1800
295#define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
296#define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
297#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
298template <
int Align,
typename T>
struct Vs2013Aligned {};
299template <
typename T>
struct Vs2013Aligned<1, T> {
300 typedef __declspec(align(1)) T type;
302template <typename T> struct Vs2013Aligned<2, T> {
303 typedef __declspec(align(2)) T type;
305template <typename T> struct Vs2013Aligned<4, T> {
306 typedef __declspec(align(4)) T type;
308template <typename T> struct Vs2013Aligned<8, T> {
309 typedef __declspec(align(8)) T type;
311template <typename T> struct Vs2013Aligned<16, T> {
312 typedef __declspec(align(16)) T type;
314template <typename T> struct Vs2013Aligned<32, T> {
315 typedef __declspec(align(32)) T type;
317template <typename T> struct Vs2013Aligned<64, T> {
318 typedef __declspec(align(64)) T type;
320template <typename T> struct Vs2013Aligned<128, T> {
321 typedef __declspec(align(128)) T type;
323template <typename T> struct Vs2013Aligned<256, T> {
324 typedef __declspec(align(256)) T type;
330#define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
331#define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
332#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
341#define MOODYCAMEL_NO_TSAN
342#if defined(__has_feature)
343#if __has_feature(thread_sanitizer)
344#undef MOODYCAMEL_NO_TSAN
345#define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
350namespace moodycamel {
353static inline bool(likely)(
bool x)
355 return __builtin_expect((x),
true);
357static inline bool(unlikely)(
bool x)
359 return __builtin_expect((x),
false);
362static inline bool(likely)(
bool x)
366static inline bool(unlikely)(
bool x)
374#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
375#include "internal/concurrentqueue_internal_debug.h"
378namespace moodycamel {
381 static_assert(std::is_integral<T>::value,
"const_numeric_max can only be used with integers");
382 static const T value = std::numeric_limits<T>::is_signed
383 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
static_cast<T
>(1)
384 :
static_cast<T
>(-1);
387#if defined(__GLIBCXX__)
388typedef ::max_align_t std_max_align_t;
390typedef std::max_align_t std_max_align_t;
410 typedef std::size_t size_t;
422 typedef std::size_t index_t;
429 static const size_t BLOCK_SIZE = 32;
436 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
440 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
444 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
450 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
455 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
467 static const int MAX_SEMA_SPINS = 10000;
474 static const bool RECYCLE_ALLOCATED_BLOCKS =
false;
476#ifndef MCDBGQ_USE_RELACY
479#if defined(malloc) || defined(free)
482 static inline void* WORKAROUND_malloc(
size_t size) {
return malloc(size); }
483 static inline void WORKAROUND_free(
void* ptr) {
return free(ptr); }
484 static inline void*(malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
485 static inline void(free)(
void* ptr) {
return WORKAROUND_free(ptr); }
487 static inline void* malloc(
size_t size) {
return std::malloc(size); }
488 static inline void free(
void* ptr) {
return std::free(ptr); }
493 static inline void* malloc(
size_t size) {
return rl::rl_malloc(size, $); }
494 static inline void free(
void* ptr) {
return rl::rl_free(ptr, $); }
510class ConcurrentQueueTests;
515 std::atomic<bool> inactive;
526 static inline std::uint32_t hash(std::uint32_t h)
536 return h ^ (h >> 16);
540 static inline std::uint64_t hash(std::uint64_t h)
543 h *= 0xff51afd7ed558ccd;
545 h *= 0xc4ceb9fe1a85ec53;
546 return h ^ (h >> 33);
551static inline size_t hash_thread_id(thread_id_t
id)
553 static_assert(
sizeof(thread_id_t) <= 8,
"Expected a platform where thread IDs are at most 64-bit values");
554 return static_cast<size_t>(
hash_32_or_64<
sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
558template <
typename T>
static inline bool circular_less_than(T a, T b)
560 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
561 "circular_less_than is intended to be used only with unsigned integer types");
562 return static_cast<T
>(a - b) >
static_cast<T
>(
static_cast<T
>(1) << (
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1)));
569template <
typename U>
static inline char* align_for(
char* ptr)
571 const std::size_t alignment = std::alignment_of<U>::value;
572 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
575template <
typename T>
static inline T ceil_to_pow_2(T x)
577 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
578 "ceil_to_pow_2 is intended to be used only with unsigned integer types");
585 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
592template <
typename T>
static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
594 T temp = std::move(left.load(std::memory_order_relaxed));
595 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
596 right.store(std::move(temp), std::memory_order_relaxed);
599template <
typename T>
static inline T
const& nomove(T
const& x)
605 template <
typename T>
static inline T
const& eval(T
const& x) {
return x; }
609 template <
typename U>
static inline auto eval(U&& x) ->
decltype(std::forward<U>(x)) {
return std::forward<U>(x); }
612template <
typename It>
static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT->decltype(*it)
617#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
623#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
624#ifdef MCDBGQ_USE_RELACY
625typedef RelacyThreadExitListener ThreadExitListener;
626typedef RelacyThreadExitNotifier ThreadExitNotifier;
628class ThreadExitNotifier;
631 typedef void (*callback_t)(
void*);
643 auto& tlsInst = instance();
644 std::lock_guard<std::mutex> guard(mutex());
645 listener->next = tlsInst.tail;
646 listener->chain = &tlsInst;
647 tlsInst.tail = listener;
652 std::lock_guard<std::mutex> guard(mutex());
653 if (!listener->chain) {
656 auto& tlsInst = *listener->chain;
657 listener->chain =
nullptr;
659 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
660 if (ptr == listener) {
678 assert(
this == &instance() &&
679 "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that "
680 "MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
681 std::lock_guard<std::mutex> guard(mutex());
682 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
683 ptr->chain =
nullptr;
684 ptr->callback(ptr->userData);
695 static inline std::mutex& mutex()
698 static std::mutex mutex;
712 enum { value = ATOMIC_CHAR_LOCK_FREE };
715 enum { value = ATOMIC_SHORT_LOCK_FREE };
718 enum { value = ATOMIC_INT_LOCK_FREE };
721 enum { value = ATOMIC_LONG_LOCK_FREE };
724 enum { value = ATOMIC_LLONG_LOCK_FREE };
728 enum { value = ATOMIC_BOOL_LOCK_FREE };
731 enum { value = ATOMIC_POINTER_LOCK_FREE };
742 other.producer =
nullptr;
743 if (producer !=
nullptr) {
744 producer->token =
this;
756 std::swap(producer, other.producer);
757 if (producer !=
nullptr) {
758 producer->token =
this;
760 if (other.producer !=
nullptr) {
761 other.producer->token = &other;
773 inline bool valid()
const {
return producer !=
nullptr; }
777 if (producer !=
nullptr) {
778 producer->token =
nullptr;
779 producer->inactive.store(
true, std::memory_order_release);
789 friend class ConcurrentQueueTests;
801 lastKnownGlobalOffset(other.lastKnownGlobalOffset),
802 itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
803 currentProducer(other.currentProducer),
804 desiredProducer(other.desiredProducer)
815 std::swap(initialOffset, other.initialOffset);
816 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
817 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
818 std::swap(currentProducer, other.currentProducer);
819 std::swap(desiredProducer, other.desiredProducer);
828 friend class ConcurrentQueueTests;
831 std::uint32_t initialOffset;
832 std::uint32_t lastKnownGlobalOffset;
833 std::uint32_t itemsConsumedFromCurrent;
841template <
typename T,
typename Traits>
845template <
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue {
850 typedef typename Traits::index_t index_t;
851 typedef typename Traits::size_t size_t;
853 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
854 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
855 static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
856 static const size_t EXPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
857 static const size_t IMPLICIT_INITIAL_INDEX_SIZE =
static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
858 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE =
859 static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
860 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
861 static_cast<std::uint32_t
>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
864#pragma warning(disable : 4307)
865#pragma warning(disable : 4309)
867 static const size_t MAX_SUBQUEUE_SIZE =
870 : ((
static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
875 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
876 "Traits::size_t must be an unsigned integral type");
877 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
878 "Traits::index_t must be an unsigned integral type");
879 static_assert(
sizeof(index_t) >=
sizeof(
size_t),
"Traits::index_t must be at least as wide as Traits::size_t");
880 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
881 "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
882 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) &&
883 !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
884 "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
885 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) &&
886 !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
887 "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
888 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) &&
889 !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
890 "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
891 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) ||
892 !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
893 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
895 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
896 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
910 : producerListTail(
nullptr)
912 , initialBlockPoolIndex(0)
913 , nextExplicitConsumerId(0)
914 , globalExplicitConsumerOffset(0)
916 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
917 populate_initial_implicit_producer_hash();
918 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
920#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
925 explicitProducers.store(
nullptr, std::memory_order_relaxed);
926 implicitProducers.store(
nullptr, std::memory_order_relaxed);
933 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
934 : producerListTail(
nullptr)
936 , initialBlockPoolIndex(0)
937 , nextExplicitConsumerId(0)
938 , globalExplicitConsumerOffset(0)
940 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
941 populate_initial_implicit_producer_hash();
942 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) +
943 2 * (maxExplicitProducers + maxImplicitProducers);
944 populate_initial_block_list(blocks);
946#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
947 explicitProducers.store(
nullptr, std::memory_order_relaxed);
948 implicitProducers.store(
nullptr, std::memory_order_relaxed);
958 auto ptr = producerListTail.load(std::memory_order_relaxed);
959 while (ptr !=
nullptr) {
960 auto next = ptr->next_prod();
961 if (ptr->token !=
nullptr) {
962 ptr->token->producer =
nullptr;
969 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0)
971 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
972 while (hash !=
nullptr) {
973 auto prev = hash->prev;
974 if (prev !=
nullptr) {
975 for (
size_t i = 0; i != hash->capacity; ++i) {
976 hash->entries[i].~ImplicitProducerKVP();
978 hash->~ImplicitProducerHash();
979 (Traits::free)(hash);
986 auto block = freeList.head_unsafe();
987 while (block !=
nullptr) {
988 auto next = block->freeListNext.load(std::memory_order_relaxed);
989 if (block->dynamicallyAllocated) {
996 destroy_array(initialBlockPool, initialBlockPoolSize);
1010 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
1011 producerCount(other.producerCount.load(std::memory_order_relaxed)),
1012 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
1013 initialBlockPool(other.initialBlockPool),
1014 initialBlockPoolSize(other.initialBlockPoolSize),
1015 freeList(std::move(other.freeList)),
1016 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
1017 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
1020 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1021 populate_initial_implicit_producer_hash();
1022 swap_implicit_producer_hashes(other);
1024 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
1025 other.producerCount.store(0, std::memory_order_relaxed);
1026 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
1027 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
1029#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1030 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1031 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
1032 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1033 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
1036 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
1037 other.initialBlockPoolSize = 0;
1038 other.initialBlockPool =
nullptr;
1050 inline void swap(
ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT { swap_internal(other); }
1055 if (
this == &other) {
1059 details::swap_relaxed(producerListTail, other.producerListTail);
1060 details::swap_relaxed(producerCount, other.producerCount);
1061 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
1062 std::swap(initialBlockPool, other.initialBlockPool);
1063 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
1064 freeList.swap(other.freeList);
1065 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
1066 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
1068 swap_implicit_producer_hashes(other);
1071 other.reown_producers();
1073#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1074 details::swap_relaxed(explicitProducers, other.explicitProducers);
1075 details::swap_relaxed(implicitProducers, other.implicitProducers);
1087 inline bool enqueue(T
const& item)
1089 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1090 else return inner_enqueue<CanAlloc>(item);
1098 inline bool enqueue(T&& item)
1100 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1101 else return inner_enqueue<CanAlloc>(std::move(item));
1108 inline bool enqueue(
producer_token_t const& token, T
const& item) {
return inner_enqueue<CanAlloc>(token, item); }
1116 return inner_enqueue<CanAlloc>(token, std::move(item));
1125 template <
typename It>
bool enqueue_bulk(It itemFirst,
size_t count)
1127 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1128 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1137 template <
typename It>
bool enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
1139 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1147 inline bool try_enqueue(T
const& item)
1149 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1150 else return inner_enqueue<CannotAlloc>(item);
1158 inline bool try_enqueue(T&& item)
1160 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1161 else return inner_enqueue<CannotAlloc>(std::move(item));
1169 return inner_enqueue<CannotAlloc>(token, item);
1177 return inner_enqueue<CannotAlloc>(token, std::move(item));
1187 template <
typename It>
bool try_enqueue_bulk(It itemFirst,
size_t count)
1189 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
return false;
1190 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1198 template <
typename It>
bool try_enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
1200 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1207 template <
typename U>
bool try_dequeue(U& item)
1211 size_t nonEmptyCount = 0;
1212 ProducerBase* best =
nullptr;
1213 size_t bestSize = 0;
1214 for (
auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr !=
nullptr;
1215 ptr = ptr->next_prod()) {
1216 auto size = ptr->size_approx();
1218 if (size > bestSize) {
1228 if (nonEmptyCount > 0) {
1229 if ((details::likely)(best->dequeue(item))) {
1232 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1233 if (ptr != best && ptr->dequeue(item)) {
1250 template <
typename U>
bool try_dequeue_non_interleaved(U& item)
1252 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1253 if (ptr->dequeue(item)) {
1274 if (token.desiredProducer ==
nullptr ||
1275 token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1276 if (!update_current_producer_after_rotation(token)) {
1283 if (
static_cast<ProducerBase*
>(token.currentProducer)->dequeue(item)) {
1284 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1285 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1290 auto tail = producerListTail.load(std::memory_order_acquire);
1291 auto ptr =
static_cast<ProducerBase*
>(token.currentProducer)->next_prod();
1292 if (ptr ==
nullptr) {
1295 while (ptr !=
static_cast<ProducerBase*
>(token.currentProducer)) {
1296 if (ptr->dequeue(item)) {
1297 token.currentProducer = ptr;
1298 token.itemsConsumedFromCurrent = 1;
1301 ptr = ptr->next_prod();
1302 if (ptr ==
nullptr) {
1314 template <
typename It>
size_t try_dequeue_bulk(It itemFirst,
size_t max)
1317 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1318 count += ptr->dequeue_bulk(itemFirst, max - count);
1331 template <
typename It>
size_t try_dequeue_bulk(
consumer_token_t& token, It itemFirst,
size_t max)
1333 if (token.desiredProducer ==
nullptr ||
1334 token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1335 if (!update_current_producer_after_rotation(token)) {
1340 size_t count =
static_cast<ProducerBase*
>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1342 if ((token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(max)) >=
1343 EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1344 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1348 token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(count);
1351 auto tail = producerListTail.load(std::memory_order_acquire);
1352 auto ptr =
static_cast<ProducerBase*
>(token.currentProducer)->next_prod();
1353 if (ptr ==
nullptr) {
1356 while (ptr !=
static_cast<ProducerBase*
>(token.currentProducer)) {
1357 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1359 if (dequeued != 0) {
1360 token.currentProducer = ptr;
1361 token.itemsConsumedFromCurrent =
static_cast<std::uint32_t
>(dequeued);
1363 if (dequeued == max) {
1367 ptr = ptr->next_prod();
1368 if (ptr ==
nullptr) {
1381 template <
typename U>
inline bool try_dequeue_from_producer(
producer_token_t const& producer, U& item)
1383 return static_cast<ExplicitProducer*
>(producer.producer)->dequeue(item);
1393 template <
typename It>
1394 inline size_t try_dequeue_bulk_from_producer(
producer_token_t const& producer, It itemFirst,
size_t max)
1396 return static_cast<ExplicitProducer*
>(producer.producer)->dequeue_bulk(itemFirst, max);
1405 size_t size_approx()
const
1408 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1409 size += ptr->size_approx();
1417 static constexpr bool is_lock_free()
1423 typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value == 2;
1429 struct ExplicitProducer;
1430 friend struct ExplicitProducer;
1431 struct ImplicitProducer;
1432 friend struct ImplicitProducer;
1433 friend class ConcurrentQueueTests;
1435 enum AllocationMode { CanAlloc, CannotAlloc };
1441 template <AllocationMode canAlloc,
typename U>
inline bool inner_enqueue(
producer_token_t const& token, U&& element)
1443 return static_cast<ExplicitProducer*
>(token.producer)
1444 ->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1447 template <AllocationMode canAlloc,
typename U>
inline bool inner_enqueue(U&& element)
1449 auto producer = get_or_add_implicit_producer();
1450 return producer ==
nullptr
1452 : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1455 template <AllocationMode canAlloc,
typename It>
1456 inline bool inner_enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
1458 return static_cast<ExplicitProducer*
>(token.producer)
1459 ->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1462 template <AllocationMode canAlloc,
typename It>
inline bool inner_enqueue_bulk(It itemFirst,
size_t count)
1464 auto producer = get_or_add_implicit_producer();
1465 return producer ==
nullptr
1467 : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1470 inline bool update_current_producer_after_rotation(
consumer_token_t& token)
1473 auto tail = producerListTail.load(std::memory_order_acquire);
1474 if (token.desiredProducer ==
nullptr && tail ==
nullptr) {
1477 auto prodCount = producerCount.load(std::memory_order_relaxed);
1478 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1479 if ((details::unlikely)(token.desiredProducer ==
nullptr)) {
1483 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1484 token.desiredProducer = tail;
1485 for (std::uint32_t i = 0; i != offset; ++i) {
1486 token.desiredProducer =
static_cast<ProducerBase*
>(token.desiredProducer)->next_prod();
1487 if (token.desiredProducer ==
nullptr) {
1488 token.desiredProducer = tail;
1493 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1494 if (delta >= prodCount) {
1495 delta = delta % prodCount;
1497 for (std::uint32_t i = 0; i != delta; ++i) {
1498 token.desiredProducer =
static_cast<ProducerBase*
>(token.desiredProducer)->next_prod();
1499 if (token.desiredProducer ==
nullptr) {
1500 token.desiredProducer = tail;
1504 token.lastKnownGlobalOffset = globalOffset;
1505 token.currentProducer = token.desiredProducer;
1506 token.itemsConsumedFromCurrent = 0;
1514 template <
typename N>
struct FreeListNode {
1517 , freeListNext(
nullptr)
1520 std::atomic<std::uint32_t> freeListRefs;
1521 std::atomic<N*> freeListNext;
1527 template <
typename N>
1530 : freeListHead(
nullptr)
1532 FreeList(FreeList&& other)
1533 : freeListHead(other.freeListHead.load(std::memory_order_relaxed))
1535 other.freeListHead.store(
nullptr, std::memory_order_relaxed);
1537 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1539 FreeList(FreeList
const&) MOODYCAMEL_DELETE_FUNCTION;
1540 FreeList& operator=(FreeList
const&) MOODYCAMEL_DELETE_FUNCTION;
1542 inline void add(N* node)
1544#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1545 debug::DebugLock lock(mutex);
1549 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1552 add_knowing_refcount_is_zero(node);
1558#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1559 debug::DebugLock lock(mutex);
1561 auto head = freeListHead.load(std::memory_order_acquire);
1562 while (head !=
nullptr) {
1563 auto prevHead = head;
1564 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1565 if ((refs & REFS_MASK) == 0 ||
1566 !head->freeListRefs.compare_exchange_strong(
1567 refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1568 head = freeListHead.load(std::memory_order_acquire);
1574 auto next = head->freeListNext.load(std::memory_order_relaxed);
1575 if (freeListHead.compare_exchange_strong(
1576 head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1580 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1583 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1590 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1591 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1592 add_knowing_refcount_is_zero(prevHead);
1600 N* head_unsafe()
const {
return freeListHead.load(std::memory_order_relaxed); }
1603 inline void add_knowing_refcount_is_zero(N* node)
1613 auto head = freeListHead.load(std::memory_order_relaxed);
1615 node->freeListNext.store(head, std::memory_order_relaxed);
1616 node->freeListRefs.store(1, std::memory_order_release);
1617 if (!freeListHead.compare_exchange_strong(
1618 head, node, std::memory_order_release, std::memory_order_relaxed)) {
1620 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1631 std::atomic<N*> freeListHead;
1633 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1634 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1636#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1637 debug::DebugMutex mutex;
1645 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1650 , elementsCompletelyDequeued(0)
1652 , freeListNext(
nullptr)
1653 , dynamicallyAllocated(
true)
1655#ifdef MCDBGQ_TRACKMEM
1660 template <InnerQueueContext context>
inline bool is_empty()
const
1662 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1665 for (
size_t i = 0; i < BLOCK_SIZE; ++i) {
1666 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1672 std::atomic_thread_fence(std::memory_order_acquire);
1678 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1679 std::atomic_thread_fence(std::memory_order_acquire);
1682 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1688 template <InnerQueueContext context>
inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
1690 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1693 assert(!emptyFlags[BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1))].load(
1694 std::memory_order_relaxed));
1695 emptyFlags[BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1))].store(
1696 true, std::memory_order_release);
1702 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1703 assert(prevVal < BLOCK_SIZE);
1704 return prevVal == BLOCK_SIZE - 1;
1710 template <InnerQueueContext context>
inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i,
size_t count)
1712 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1715 std::atomic_thread_fence(std::memory_order_release);
1716 i = BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t
>(BLOCK_SIZE - 1)) - count + 1;
1717 for (
size_t j = 0; j != count; ++j) {
1718 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1719 emptyFlags[i + j].store(
true, std::memory_order_relaxed);
1726 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1727 assert(prevVal + count <= BLOCK_SIZE);
1728 return prevVal + count == BLOCK_SIZE;
1732 template <InnerQueueContext context>
inline void set_all_empty()
1734 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1737 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1738 emptyFlags[i].store(
true, std::memory_order_relaxed);
1744 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1748 template <InnerQueueContext context>
inline void reset_empty()
1750 MOODYCAMEL_CONSTEXPR_IF(context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD)
1753 for (
size_t i = 0; i != BLOCK_SIZE; ++i) {
1754 emptyFlags[i].store(
false, std::memory_order_relaxed);
1760 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1764 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT
1766 return static_cast<T*
>(
static_cast<void*
>(elements)) +
1767 static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1));
1769 inline T
const* operator[](index_t idx)
const MOODYCAMEL_NOEXCEPT
1771 return static_cast<T const*
>(
static_cast<void const*
>(elements)) +
1772 static_cast<size_t>(idx &
static_cast<index_t
>(BLOCK_SIZE - 1));
1776 static_assert(std::alignment_of<T>::value <=
sizeof(T),
1777 "The queue does not support types with an alignment greater than their size at this time");
1778 MOODYCAMEL_ALIGNED_TYPE_LIKE(
char[
sizeof(T) * BLOCK_SIZE], T) elements;
1782 std::atomic<size_t> elementsCompletelyDequeued;
1783 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1786 std::atomic<std::uint32_t> freeListRefs;
1787 std::atomic<Block*> freeListNext;
1788 bool dynamicallyAllocated;
1790#ifdef MCDBGQ_TRACKMEM
1794 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value,
1795 "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1797#ifdef MCDBGQ_TRACKMEM
1812 , dequeueOptimisticCount(0)
1813 , dequeueOvercommit(0)
1814 , tailBlock(
nullptr)
1815 , isExplicit(isExplicit_)
1819 virtual ~ProducerBase() {}
1821 template <
typename U>
inline bool dequeue(U& element)
1824 return static_cast<ExplicitProducer*
>(
this)->dequeue(element);
1826 return static_cast<ImplicitProducer*
>(
this)->dequeue(element);
1830 template <
typename It>
inline size_t dequeue_bulk(It& itemFirst,
size_t max)
1833 return static_cast<ExplicitProducer*
>(
this)->dequeue_bulk(itemFirst, max);
1835 return static_cast<ImplicitProducer*
>(
this)->dequeue_bulk(itemFirst, max);
1839 inline ProducerBase* next_prod()
const {
return static_cast<ProducerBase*
>(next); }
1841 inline size_t size_approx()
const
1843 auto tail = tailIndex.load(std::memory_order_relaxed);
1844 auto head = headIndex.load(std::memory_order_relaxed);
1845 return details::circular_less_than(head, tail) ?
static_cast<size_t>(tail - head) : 0;
1848 inline index_t getTail()
const {
return tailIndex.load(std::memory_order_relaxed); }
1851 std::atomic<index_t> tailIndex;
1852 std::atomic<index_t> headIndex;
1854 std::atomic<index_t> dequeueOptimisticCount;
1855 std::atomic<index_t> dequeueOvercommit;
1864#ifdef MCDBGQ_TRACKMEM
1865 friend struct MemStats;
1873 struct ExplicitProducer :
public ProducerBase {
1875 : ProducerBase(parent_,
true)
1876 , blockIndex(
nullptr)
1877 , pr_blockIndexSlotsUsed(0)
1878 , pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1)
1879 , pr_blockIndexFront(0)
1880 , pr_blockIndexEntries(
nullptr)
1881 , pr_blockIndexRaw(
nullptr)
1883 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1884 if (poolBasedIndexSize > pr_blockIndexSize) {
1885 pr_blockIndexSize = poolBasedIndexSize;
1897 if (this->tailBlock !=
nullptr) {
1899 Block* halfDequeuedBlock =
nullptr;
1900 if ((this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0) {
1904 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1905 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1906 this->headIndex.load(std::memory_order_relaxed))) {
1907 i = (i + 1) & (pr_blockIndexSize - 1);
1909 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1910 this->headIndex.load(std::memory_order_relaxed)));
1911 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1916 auto block = this->tailBlock;
1918 block = block->next;
1919 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1924 if (block == halfDequeuedBlock) {
1925 i =
static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
1926 static_cast<index_t
>(BLOCK_SIZE - 1));
1931 auto lastValidIndex =
1932 (this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0
1934 :
static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) &
1935 static_cast<index_t
>(BLOCK_SIZE - 1));
1936 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1937 (*block)[i++]->~T();
1939 }
while (block != this->tailBlock);
1943 if (this->tailBlock !=
nullptr) {
1944 auto block = this->tailBlock;
1946 auto nextBlock = block->next;
1947 this->parent->add_block_to_free_list(block);
1949 }
while (block != this->tailBlock);
1953 auto header =
static_cast<BlockIndexHeader*
>(pr_blockIndexRaw);
1954 while (header !=
nullptr) {
1955 auto prev =
static_cast<BlockIndexHeader*
>(header->prev);
1956 header->~BlockIndexHeader();
1957 (Traits::free)(header);
1962 template <AllocationMode allocMode,
typename U>
inline bool enqueue(U&& element)
1964 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1965 index_t newTailIndex = 1 + currentTailIndex;
1966 if ((currentTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
1968 auto startBlock = this->tailBlock;
1969 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1970 if (this->tailBlock !=
nullptr &&
1971 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1973 this->tailBlock = this->tailBlock->next;
1974 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1985 auto head = this->headIndex.load(std::memory_order_relaxed);
1986 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1987 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
1989 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1996 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
2001 MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
2005 else if (!new_block_index(pr_blockIndexSlotsUsed))
2012 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2013 if (newBlock ==
nullptr) {
2016#ifdef MCDBGQ_TRACKMEM
2017 newBlock->owner =
this;
2019 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2020 if (this->tailBlock ==
nullptr) {
2021 newBlock->next = newBlock;
2023 newBlock->next = this->tailBlock->next;
2024 this->tailBlock->next = newBlock;
2026 this->tailBlock = newBlock;
2027 ++pr_blockIndexSlotsUsed;
2030 MOODYCAMEL_CONSTEXPR_IF(
2031 !MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new (
static_cast<T*
>(
nullptr)) T(std::forward<U>(element))))
2037 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2039 MOODYCAMEL_CATCH(...)
2043 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2044 this->tailBlock = startBlock ==
nullptr ? this->tailBlock : startBlock;
2051 (void)originalBlockIndexSlotsUsed;
2055 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2056 entry.base = currentTailIndex;
2057 entry.block = this->tailBlock;
2058 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
2059 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2061 MOODYCAMEL_CONSTEXPR_IF(
2062 !MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new (
static_cast<T*
>(
nullptr)) T(std::forward<U>(element))))
2064 this->tailIndex.store(newTailIndex, std::memory_order_release);
2070 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2072 this->tailIndex.store(newTailIndex, std::memory_order_release);
2076 template <
typename U>
bool dequeue(U& element)
2078 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2079 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2080 if (details::circular_less_than<index_t>(
2081 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2099 std::atomic_thread_fence(std::memory_order_acquire);
2102 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2116 tail = this->tailIndex.load(std::memory_order_acquire);
2117 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2128 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2132 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2133 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2138 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2139 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
2140 auto offset =
static_cast<size_t>(
2141 static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase) /
2142 static_cast<typename std::make_signed<index_t>::type
>(BLOCK_SIZE));
2144 localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2147 auto& el = *((*block)[index]);
2148 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2157 (*block)[index]->~T();
2158 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2160 } guard = { block, index };
2162 element = std::move(el);
2164 element = std::move(el);
2166 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2172 this->dequeueOvercommit.fetch_add(
2173 1, std::memory_order_release);
2181 template <AllocationMode allocMode,
typename It>
2182 bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst,
size_t count)
2187 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2188 auto startBlock = this->tailBlock;
2189 auto originalBlockIndexFront = pr_blockIndexFront;
2190 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2192 Block* firstAllocatedBlock =
nullptr;
2195 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1)) -
2196 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2197 index_t currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2198 if (blockBaseDiff > 0) {
2200 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr &&
2201 this->tailBlock->next != firstAllocatedBlock &&
2202 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2203 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2204 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2206 this->tailBlock = this->tailBlock->next;
2207 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2209 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2210 entry.base = currentTailIndex;
2211 entry.block = this->tailBlock;
2212 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2216 while (blockBaseDiff > 0) {
2217 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2218 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2220 auto head = this->headIndex.load(std::memory_order_relaxed);
2221 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2222 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2224 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2225 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2226 MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
2229 pr_blockIndexFront = originalBlockIndexFront;
2230 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2231 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2234 else if (full || !new_block_index(originalBlockIndexSlotsUsed))
2237 pr_blockIndexFront = originalBlockIndexFront;
2238 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2239 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2246 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2250 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2251 if (newBlock ==
nullptr) {
2252 pr_blockIndexFront = originalBlockIndexFront;
2253 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2254 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2258#ifdef MCDBGQ_TRACKMEM
2259 newBlock->owner =
this;
2261 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2262 if (this->tailBlock ==
nullptr) {
2263 newBlock->next = newBlock;
2265 newBlock->next = this->tailBlock->next;
2266 this->tailBlock->next = newBlock;
2268 this->tailBlock = newBlock;
2269 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2271 ++pr_blockIndexSlotsUsed;
2273 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2274 entry.base = currentTailIndex;
2275 entry.block = this->tailBlock;
2276 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2281 auto block = firstAllocatedBlock;
2283 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2284 if (block == this->tailBlock) {
2287 block = block->next;
2290 MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
2291 T,
decltype(*itemFirst),
new (
static_cast<T*
>(
nullptr)) T(details::deref_noexcept(itemFirst))))
2293 blockIndex.load(std::memory_order_relaxed)
2294 ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2299 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2300 currentTailIndex = startTailIndex;
2301 auto endBlock = this->tailBlock;
2302 this->tailBlock = startBlock;
2303 assert((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr ||
2305 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2306 this->tailBlock = firstAllocatedBlock;
2310 (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2311 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2312 stopIndex = newTailIndex;
2314 MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
2315 T,
decltype(*itemFirst),
new (
static_cast<T*
>(
nullptr)) T(details::deref_noexcept(itemFirst))))
2317 while (currentTailIndex != stopIndex) {
2318 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2325 while (currentTailIndex != stopIndex) {
2333 new ((*this->tailBlock)[currentTailIndex])
2336 decltype(*itemFirst),
2337 new (
static_cast<T*
>(
nullptr))
2338 T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2343 MOODYCAMEL_CATCH(...)
2348 auto constructedStopIndex = currentTailIndex;
2349 auto lastBlockEnqueued = this->tailBlock;
2351 pr_blockIndexFront = originalBlockIndexFront;
2352 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2353 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2356 auto block = startBlock;
2357 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
2358 block = firstAllocatedBlock;
2360 currentTailIndex = startTailIndex;
2362 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2363 static_cast<index_t
>(BLOCK_SIZE);
2364 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2365 stopIndex = constructedStopIndex;
2367 while (currentTailIndex != stopIndex) {
2368 (*block)[currentTailIndex++]->~T();
2370 if (block == lastBlockEnqueued) {
2373 block = block->next;
2380 if (this->tailBlock == endBlock) {
2381 assert(currentTailIndex == newTailIndex);
2384 this->tailBlock = this->tailBlock->next;
2387 MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(
2388 T,
decltype(*itemFirst),
new (
static_cast<T*
>(
nullptr)) T(details::deref_noexcept(itemFirst))))
2390 if (firstAllocatedBlock !=
nullptr)
2391 blockIndex.load(std::memory_order_relaxed)
2392 ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2395 this->tailIndex.store(newTailIndex, std::memory_order_release);
2399 template <
typename It>
size_t dequeue_bulk(It& itemFirst,
size_t max)
2401 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2402 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2404 static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2405 if (details::circular_less_than<size_t>(0, desiredCount)) {
2406 desiredCount = desiredCount < max ? desiredCount : max;
2407 std::atomic_thread_fence(std::memory_order_acquire);
2409 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2411 tail = this->tailIndex.load(std::memory_order_acquire);
2412 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2413 if (details::circular_less_than<size_t>(0, actualCount)) {
2414 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2415 if (actualCount < desiredCount) {
2416 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2421 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2424 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2425 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2427 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2428 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2429 auto offset =
static_cast<size_t>(
2430 static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase) /
2431 static_cast<typename std::make_signed<index_t>::type
>(BLOCK_SIZE));
2432 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2435 auto index = firstIndex;
2437 auto firstIndexInBlock = index;
2439 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2440 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount),
2442 ? firstIndex +
static_cast<index_t
>(actualCount)
2444 auto block = localBlockIndex->entries[indexIndex].block;
2445 if (MOODYCAMEL_NOEXCEPT_ASSIGN(
2446 T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2447 while (index != endIndex) {
2448 auto& el = *((*block)[index]);
2449 *itemFirst++ = std::move(el);
2456 while (index != endIndex) {
2457 auto& el = *((*block)[index]);
2458 *itemFirst = std::move(el);
2464 MOODYCAMEL_CATCH(...)
2470 block = localBlockIndex->entries[indexIndex].block;
2471 while (index != endIndex) {
2472 (*block)[index++]->~T();
2474 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2475 firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2476 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2478 firstIndexInBlock = index;
2479 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2480 static_cast<index_t
>(BLOCK_SIZE);
2481 endIndex = details::circular_less_than<index_t>(
2482 firstIndex +
static_cast<index_t
>(actualCount), endIndex)
2483 ? firstIndex +
static_cast<index_t
>(actualCount)
2485 }
while (index != firstIndex + actualCount);
2490 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(
2491 firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2492 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2493 }
while (index != firstIndex + actualCount);
2498 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2506 struct BlockIndexEntry {
2511 struct BlockIndexHeader {
2513 std::atomic<size_t> front;
2514 BlockIndexEntry* entries;
2518 bool new_block_index(
size_t numberOfFilledSlotsToExpose)
2520 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2523 pr_blockIndexSize <<= 1;
2524 auto newRawPtr =
static_cast<char*
>((Traits::malloc)(
sizeof(BlockIndexHeader) +
2525 std::alignment_of<BlockIndexEntry>::value - 1 +
2526 sizeof(BlockIndexEntry) * pr_blockIndexSize));
2527 if (newRawPtr ==
nullptr) {
2528 pr_blockIndexSize >>= 1;
2532 auto newBlockIndexEntries =
reinterpret_cast<BlockIndexEntry*
>(
2533 details::align_for<BlockIndexEntry>(newRawPtr +
sizeof(BlockIndexHeader)));
2537 if (pr_blockIndexSlotsUsed != 0) {
2538 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2540 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2541 i = (i + 1) & prevBlockSizeMask;
2542 }
while (i != pr_blockIndexFront);
2546 auto header =
new (newRawPtr) BlockIndexHeader;
2547 header->size = pr_blockIndexSize;
2548 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2549 header->entries = newBlockIndexEntries;
2550 header->prev = pr_blockIndexRaw;
2552 pr_blockIndexFront = j;
2553 pr_blockIndexEntries = newBlockIndexEntries;
2554 pr_blockIndexRaw = newRawPtr;
2555 blockIndex.store(header, std::memory_order_release);
2561 std::atomic<BlockIndexHeader*> blockIndex;
2564 size_t pr_blockIndexSlotsUsed;
2565 size_t pr_blockIndexSize;
2566 size_t pr_blockIndexFront;
2567 BlockIndexEntry* pr_blockIndexEntries;
2568 void* pr_blockIndexRaw;
2570#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2572 ExplicitProducer* nextExplicitProducer;
2577#ifdef MCDBGQ_TRACKMEM
2578 friend struct MemStats;
2586 struct ImplicitProducer :
public ProducerBase {
2588 : ProducerBase(parent_,
false)
2589 , nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE)
2590 , blockIndex(
nullptr)
2602#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2604 if (!this->inactive.load(std::memory_order_relaxed)) {
2605 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2610 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2611 auto index = this->headIndex.load(std::memory_order_relaxed);
2612 Block* block =
nullptr;
2613 assert(index == tail || details::circular_less_than(index, tail));
2614 bool forceFreeLastBlock =
2616 while (index != tail) {
2617 if ((index &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 || block ==
nullptr) {
2618 if (block !=
nullptr) {
2620 this->parent->add_block_to_free_list(block);
2623 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2626 ((*block)[index])->~T();
2632 if (this->tailBlock !=
nullptr &&
2633 (forceFreeLastBlock || (tail &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0)) {
2634 this->parent->add_block_to_free_list(this->tailBlock);
2638 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2639 if (localBlockIndex !=
nullptr) {
2640 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2641 localBlockIndex->index[i]->~BlockIndexEntry();
2644 auto prev = localBlockIndex->prev;
2645 localBlockIndex->~BlockIndexHeader();
2646 (Traits::free)(localBlockIndex);
2647 localBlockIndex = prev;
2648 }
while (localBlockIndex !=
nullptr);
2652 template <AllocationMode allocMode,
typename U>
inline bool enqueue(U&& element)
2654 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2655 index_t newTailIndex = 1 + currentTailIndex;
2656 if ((currentTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
2658 auto head = this->headIndex.load(std::memory_order_relaxed);
2659 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2660 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2662 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2665#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2666 debug::DebugLock lock(mutex);
2669 BlockIndexEntry* idxEntry;
2670 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2675 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2676 if (newBlock ==
nullptr) {
2677 rewind_block_index_tail();
2678 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2681#ifdef MCDBGQ_TRACKMEM
2682 newBlock->owner =
this;
2684 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2686 MOODYCAMEL_CONSTEXPR_IF(
2687 !MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new (
static_cast<T*
>(
nullptr)) T(std::forward<U>(element))))
2692 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2694 MOODYCAMEL_CATCH(...)
2696 rewind_block_index_tail();
2697 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2698 this->parent->add_block_to_free_list(newBlock);
2704 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2706 this->tailBlock = newBlock;
2708 MOODYCAMEL_CONSTEXPR_IF(
2709 !MOODYCAMEL_NOEXCEPT_CTOR(T, U,
new (
static_cast<T*
>(
nullptr)) T(std::forward<U>(element))))
2711 this->tailIndex.store(newTailIndex, std::memory_order_release);
2717 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2719 this->tailIndex.store(newTailIndex, std::memory_order_release);
2723 template <
typename U>
bool dequeue(U& element)
2726 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2727 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2728 if (details::circular_less_than<index_t>(
2729 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2730 std::atomic_thread_fence(std::memory_order_acquire);
2732 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2733 tail = this->tailIndex.load(std::memory_order_acquire);
2734 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2735 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2738 auto entry = get_block_index_entry_for_index(index);
2741 auto block = entry->value.load(std::memory_order_relaxed);
2742 auto& el = *((*block)[index]);
2744 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2745#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2748 debug::DebugLock lock(producer->mutex);
2753 BlockIndexEntry* entry;
2758 (*block)[index]->~T();
2759 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2760 entry->value.store(
nullptr, std::memory_order_relaxed);
2761 parent->add_block_to_free_list(block);
2764 } guard = { block, index, entry, this->parent };
2766 element = std::move(el);
2768 element = std::move(el);
2771 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2773#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2774 debug::DebugLock lock(mutex);
2777 entry->value.store(
nullptr, std::memory_order_relaxed);
2779 this->parent->add_block_to_free_list(block);
2785 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2793#pragma warning(push)
2794#pragma warning(disable : 4706)
2796 template <AllocationMode allocMode,
typename It>
bool enqueue_bulk(It itemFirst,
size_t count)
2807 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2808 auto startBlock = this->tailBlock;
2809 Block* firstAllocatedBlock =
nullptr;
2810 auto endBlock = this->tailBlock;
2813 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1)) -
2814 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2815 index_t currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2816 if (blockBaseDiff > 0) {
2817#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2818 debug::DebugLock lock(mutex);
2821 blockBaseDiff -=
static_cast<index_t
>(BLOCK_SIZE);
2822 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2825 BlockIndexEntry* idxEntry =
2828 bool indexInserted =
false;
2829 auto head = this->headIndex.load(std::memory_order_relaxed);
2830 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2831 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2833 (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2835 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) ||
2836 (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
2840 if (indexInserted) {
2841 rewind_block_index_tail();
2842 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2844 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2845 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2846 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2847 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2848 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2849 rewind_block_index_tail();
2851 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2852 this->tailBlock = startBlock;
2857#ifdef MCDBGQ_TRACKMEM
2858 newBlock->owner =
this;
2860 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2861 newBlock->next =
nullptr;
2864 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2868 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0 ||
2869 firstAllocatedBlock !=
nullptr) {
2870 assert(this->tailBlock !=
nullptr);
2871 this->tailBlock->next = newBlock;
2873 this->tailBlock = newBlock;
2874 endBlock = newBlock;
2875 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2876 }
while (blockBaseDiff > 0);
2880 index_t newTailIndex = startTailIndex +
static_cast<index_t
>(count);
2881 currentTailIndex = startTailIndex;
2882 this->tailBlock = startBlock;
2883 assert((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr ||
2885 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2886 this->tailBlock = firstAllocatedBlock;
2890 (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
2891 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2892 stopIndex = newTailIndex;
2894 MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
2895 T,
decltype(*itemFirst),
new (
static_cast<T*
>(
nullptr)) T(details::deref_noexcept(itemFirst))))
2897 while (currentTailIndex != stopIndex) {
2898 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2905 while (currentTailIndex != stopIndex) {
2906 new ((*this->tailBlock)[currentTailIndex])
2909 decltype(*itemFirst),
2910 new (
static_cast<T*
>(
nullptr))
2911 T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2916 MOODYCAMEL_CATCH(...)
2918 auto constructedStopIndex = currentTailIndex;
2919 auto lastBlockEnqueued = this->tailBlock;
2922 auto block = startBlock;
2923 if ((startTailIndex &
static_cast<index_t
>(BLOCK_SIZE - 1)) == 0) {
2924 block = firstAllocatedBlock;
2926 currentTailIndex = startTailIndex;
2928 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2929 static_cast<index_t
>(BLOCK_SIZE);
2930 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2931 stopIndex = constructedStopIndex;
2933 while (currentTailIndex != stopIndex) {
2934 (*block)[currentTailIndex++]->~T();
2936 if (block == lastBlockEnqueued) {
2939 block = block->next;
2943 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t
>(BLOCK_SIZE - 1);
2944 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2945 currentTailIndex +=
static_cast<index_t
>(BLOCK_SIZE);
2946 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2947 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2948 rewind_block_index_tail();
2950 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2951 this->tailBlock = startBlock;
2956 if (this->tailBlock == endBlock) {
2957 assert(currentTailIndex == newTailIndex);
2960 this->tailBlock = this->tailBlock->next;
2962 this->tailIndex.store(newTailIndex, std::memory_order_release);
2969 template <
typename It>
size_t dequeue_bulk(It& itemFirst,
size_t max)
2971 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2972 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2974 static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2975 if (details::circular_less_than<size_t>(0, desiredCount)) {
2976 desiredCount = desiredCount < max ? desiredCount : max;
2977 std::atomic_thread_fence(std::memory_order_acquire);
2979 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2981 tail = this->tailIndex.load(std::memory_order_acquire);
2982 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2983 if (details::circular_less_than<size_t>(0, actualCount)) {
2984 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2985 if (actualCount < desiredCount) {
2986 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2991 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2994 auto index = firstIndex;
2995 BlockIndexHeader* localBlockIndex;
2996 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2998 auto blockStartIndex = index;
3000 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
static_cast<index_t
>(BLOCK_SIZE);
3001 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t
>(actualCount),
3003 ? firstIndex +
static_cast<index_t
>(actualCount)
3006 auto entry = localBlockIndex->index[indexIndex];
3007 auto block = entry->value.load(std::memory_order_relaxed);
3008 if (MOODYCAMEL_NOEXCEPT_ASSIGN(
3009 T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
3010 while (index != endIndex) {
3011 auto& el = *((*block)[index]);
3012 *itemFirst++ = std::move(el);
3019 while (index != endIndex) {
3020 auto& el = *((*block)[index]);
3021 *itemFirst = std::move(el);
3027 MOODYCAMEL_CATCH(...)
3030 entry = localBlockIndex->index[indexIndex];
3031 block = entry->value.load(std::memory_order_relaxed);
3032 while (index != endIndex) {
3033 (*block)[index++]->~T();
3036 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
3037 blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
3038#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3039 debug::DebugLock lock(mutex);
3041 entry->value.store(
nullptr, std::memory_order_relaxed);
3042 this->parent->add_block_to_free_list(block);
3044 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3046 blockStartIndex = index;
3047 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
3048 static_cast<index_t
>(BLOCK_SIZE);
3049 endIndex = details::circular_less_than<index_t>(
3050 firstIndex +
static_cast<index_t
>(actualCount), endIndex)
3051 ? firstIndex +
static_cast<index_t
>(actualCount)
3053 }
while (index != firstIndex + actualCount);
3058 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(
3059 blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
3061#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3062 debug::DebugLock lock(mutex);
3067 entry->value.store(
nullptr, std::memory_order_relaxed);
3069 this->parent->add_block_to_free_list(block);
3071 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3072 }
while (index != firstIndex + actualCount);
3076 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
3085 static const index_t INVALID_BLOCK_BASE = 1;
3087 struct BlockIndexEntry {
3088 std::atomic<index_t> key;
3089 std::atomic<Block*> value;
3092 struct BlockIndexHeader {
3094 std::atomic<size_t> tail;
3095 BlockIndexEntry* entries;
3096 BlockIndexEntry** index;
3097 BlockIndexHeader* prev;
3100 template <AllocationMode allocMode>
3101 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
3103 auto localBlockIndex =
3104 blockIndex.load(std::memory_order_relaxed);
3105 if (localBlockIndex ==
nullptr) {
3109 (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3110 idxEntry = localBlockIndex->index[newTail];
3111 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
3112 idxEntry->value.load(std::memory_order_relaxed) ==
nullptr) {
3114 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3115 localBlockIndex->tail.store(newTail, std::memory_order_release);
3120 MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
3124 else if (!new_block_index())
3130 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3131 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3132 idxEntry = localBlockIndex->index[newTail];
3133 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
3134 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3135 localBlockIndex->tail.store(newTail, std::memory_order_release);
3140 inline void rewind_block_index_tail()
3142 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3143 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) &
3144 (localBlockIndex->capacity - 1),
3145 std::memory_order_relaxed);
3148 inline BlockIndexEntry* get_block_index_entry_for_index(index_t index)
const
3150 BlockIndexHeader* localBlockIndex;
3151 auto idx = get_block_index_index_for_index(index, localBlockIndex);
3152 return localBlockIndex->index[idx];
3155 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex)
const
3157#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3158 debug::DebugLock lock(mutex);
3160 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
3161 localBlockIndex = blockIndex.load(std::memory_order_acquire);
3162 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
3163 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
3164 assert(tailBase != INVALID_BLOCK_BASE);
3167 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) /
3168 static_cast<typename std::make_signed<index_t>::type
>(BLOCK_SIZE));
3169 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
3170 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index &&
3171 localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) !=
nullptr);
3175 bool new_block_index()
3177 auto prev = blockIndex.load(std::memory_order_relaxed);
3178 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
3179 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
3180 auto raw =
static_cast<char*
>(
3181 (Traits::malloc)(
sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
3182 sizeof(BlockIndexEntry) * entryCount + std::alignment_of<BlockIndexEntry*>::value - 1 +
3183 sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
3184 if (raw ==
nullptr) {
3188 auto header =
new (raw) BlockIndexHeader;
3190 reinterpret_cast<BlockIndexEntry*
>(details::align_for<BlockIndexEntry>(raw +
sizeof(BlockIndexHeader)));
3191 auto index =
reinterpret_cast<BlockIndexEntry**
>(details::align_for<BlockIndexEntry*>(
3192 reinterpret_cast<char*
>(entries) +
sizeof(BlockIndexEntry) * entryCount));
3193 if (prev !=
nullptr) {
3194 auto prevTail = prev->tail.load(std::memory_order_relaxed);
3195 auto prevPos = prevTail;
3198 prevPos = (prevPos + 1) & (prev->capacity - 1);
3199 index[i++] = prev->index[prevPos];
3200 }
while (prevPos != prevTail);
3201 assert(i == prevCapacity);
3203 for (
size_t i = 0; i != entryCount; ++i) {
3204 new (entries + i) BlockIndexEntry;
3205 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3206 index[prevCapacity + i] = entries + i;
3208 header->prev = prev;
3209 header->entries = entries;
3210 header->index = index;
3211 header->capacity = nextBlockIndexCapacity;
3212 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3214 blockIndex.store(header, std::memory_order_release);
3216 nextBlockIndexCapacity <<= 1;
3222 size_t nextBlockIndexCapacity;
3223 std::atomic<BlockIndexHeader*> blockIndex;
3225#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3232#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3234 ImplicitProducer* nextImplicitProducer;
3239#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3240 mutable debug::DebugMutex mutex;
3242#ifdef MCDBGQ_TRACKMEM
3243 friend struct MemStats;
3251 void populate_initial_block_list(
size_t blockCount)
3253 initialBlockPoolSize = blockCount;
3254 if (initialBlockPoolSize == 0) {
3255 initialBlockPool =
nullptr;
3259 initialBlockPool = create_array<Block>(blockCount);
3260 if (initialBlockPool ==
nullptr) {
3261 initialBlockPoolSize = 0;
3263 for (
size_t i = 0; i < initialBlockPoolSize; ++i) {
3264 initialBlockPool[i].dynamicallyAllocated =
false;
3268 inline Block* try_get_block_from_initial_pool()
3270 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3274 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3276 return index < initialBlockPoolSize ? (initialBlockPool + index) :
nullptr;
3279 inline void add_block_to_free_list(Block* block)
3281#ifdef MCDBGQ_TRACKMEM
3282 block->owner =
nullptr;
3284 if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
3287 freeList.add(block);
3291 inline void add_blocks_to_free_list(Block* block)
3293 while (block !=
nullptr) {
3294 auto next = block->next;
3295 add_block_to_free_list(block);
3300 inline Block* try_get_block_from_free_list() {
return freeList.try_get(); }
3303 template <AllocationMode canAlloc> Block* requisition_block()
3305 auto block = try_get_block_from_initial_pool();
3306 if (block !=
nullptr) {
3310 block = try_get_block_from_free_list();
3311 if (block !=
nullptr) {
3315 MOODYCAMEL_CONSTEXPR_IF(canAlloc == CanAlloc)
3317 return create<Block>();
3325#ifdef MCDBGQ_TRACKMEM
3328 size_t allocatedBlocks;
3331 size_t ownedBlocksExplicit;
3332 size_t ownedBlocksImplicit;
3333 size_t implicitProducers;
3334 size_t explicitProducers;
3335 size_t elementsEnqueued;
3336 size_t blockClassBytes;
3337 size_t queueClassBytes;
3338 size_t implicitBlockIndexBytes;
3339 size_t explicitBlockIndexBytes;
3346 MemStats stats = { 0 };
3348 stats.elementsEnqueued = q->size_approx();
3350 auto block = q->freeList.head_unsafe();
3351 while (block !=
nullptr) {
3352 ++stats.allocatedBlocks;
3354 block = block->freeListNext.load(std::memory_order_relaxed);
3357 for (
auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr !=
nullptr;
3358 ptr = ptr->next_prod()) {
3359 bool implicit =
dynamic_cast<ImplicitProducer*
>(ptr) !=
nullptr;
3360 stats.implicitProducers += implicit ? 1 : 0;
3361 stats.explicitProducers += implicit ? 0 : 1;
3364 auto prod =
static_cast<ImplicitProducer*
>(ptr);
3365 stats.queueClassBytes +=
sizeof(ImplicitProducer);
3366 auto head = prod->headIndex.load(std::memory_order_relaxed);
3367 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3368 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3369 if (hash !=
nullptr) {
3370 for (
size_t i = 0; i != hash->capacity; ++i) {
3371 if (hash->index[i]->key.load(std::memory_order_relaxed) !=
3372 ImplicitProducer::INVALID_BLOCK_BASE &&
3373 hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3374 ++stats.allocatedBlocks;
3375 ++stats.ownedBlocksImplicit;
3378 stats.implicitBlockIndexBytes +=
3379 hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3380 for (; hash !=
nullptr; hash = hash->prev) {
3381 stats.implicitBlockIndexBytes +=
3382 sizeof(
typename ImplicitProducer::BlockIndexHeader) +
3383 hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3386 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3391 auto prod =
static_cast<ExplicitProducer*
>(ptr);
3392 stats.queueClassBytes +=
sizeof(ExplicitProducer);
3393 auto tailBlock = prod->tailBlock;
3394 bool wasNonEmpty =
false;
3395 if (tailBlock !=
nullptr) {
3396 auto block = tailBlock;
3398 ++stats.allocatedBlocks;
3399 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3401 wasNonEmpty = wasNonEmpty || block != tailBlock;
3403 ++stats.ownedBlocksExplicit;
3404 block = block->next;
3405 }
while (block != tailBlock);
3407 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3408 while (index !=
nullptr) {
3409 stats.explicitBlockIndexBytes +=
3410 sizeof(
typename ExplicitProducer::BlockIndexHeader) +
3411 index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3412 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3417 auto freeOnInitialPool =
3418 q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize
3420 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3421 stats.allocatedBlocks += freeOnInitialPool;
3422 stats.freeBlocks += freeOnInitialPool;
3424 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3432 MemStats getMemStats() {
return MemStats::getFor(
this); }
3435 friend struct MemStats;
3442 ProducerBase* recycle_or_create_producer(
bool isExplicit)
3444#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3445 debug::DebugLock lock(implicitProdMutex);
3448 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3449 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3450 bool expected =
true;
3451 if (ptr->inactive.compare_exchange_strong(
3452 expected,
false, std::memory_order_acquire, std::memory_order_relaxed)) {
3459 return add_producer(isExplicit ?
static_cast<ProducerBase*
>(create<ExplicitProducer>(
this))
3460 : create<ImplicitProducer>(
this));
3463 ProducerBase* add_producer(ProducerBase* producer)
3466 if (producer ==
nullptr) {
3470 producerCount.fetch_add(1, std::memory_order_relaxed);
3473 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3475 producer->next = prevTail;
3476 }
while (!producerListTail.compare_exchange_weak(
3477 prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3479#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3480 if (producer->isExplicit) {
3481 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3483 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3484 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit,
3485 static_cast<ExplicitProducer*
>(producer),
3486 std::memory_order_release,
3487 std::memory_order_relaxed));
3489 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3491 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3492 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit,
3493 static_cast<ImplicitProducer*
>(producer),
3494 std::memory_order_release,
3495 std::memory_order_relaxed));
3502 void reown_producers()
3507 for (
auto ptr = producerListTail.load(std::memory_order_relaxed); ptr !=
nullptr; ptr = ptr->next_prod()) {
3516 struct ImplicitProducerKVP {
3517 std::atomic<details::thread_id_t> key;
3521 ImplicitProducerKVP()
3525 ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3527 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3528 value = other.value;
3531 inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3537 inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3539 if (
this != &other) {
3540 details::swap_relaxed(key, other.key);
3541 std::swap(value, other.value);
3546 template <
typename XT,
typename XTraits>
3550 struct ImplicitProducerHash {
3552 ImplicitProducerKVP* entries;
3553 ImplicitProducerHash* prev;
3556 inline void populate_initial_implicit_producer_hash()
3558 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
3564 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3565 auto hash = &initialImplicitProducerHash;
3566 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3567 hash->entries = &initialImplicitProducerHashEntries[0];
3568 for (
size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3569 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3571 hash->prev =
nullptr;
3572 implicitProducerHash.store(hash, std::memory_order_relaxed);
3578 MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
3585 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3586 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3587 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3589 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3591 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3592 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3593 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3595 ImplicitProducerHash* hash;
3596 for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3597 hash->prev != &other.initialImplicitProducerHash;
3598 hash = hash->prev) {
3601 hash->prev = &initialImplicitProducerHash;
3603 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3604 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3606 ImplicitProducerHash* hash;
3607 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3608 hash->prev != &initialImplicitProducerHash;
3609 hash = hash->prev) {
3612 hash->prev = &other.initialImplicitProducerHash;
3618 ImplicitProducer* get_or_add_implicit_producer()
3630#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3631 debug::DebugLock lock(implicitProdMutex);
3634 auto id = details::thread_id();
3635 auto hashedId = details::hash_thread_id(
id);
3637 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3638 assert(mainHash !=
nullptr);
3639 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3641 auto index = hashedId;
3643 index &= hash->capacity - 1u;
3645 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3646 if (probedKey ==
id) {
3652 auto value = hash->entries[index].value;
3653 if (hash != mainHash) {
3656 index &= mainHash->capacity - 1u;
3657 auto empty = details::invalid_thread_id;
3658#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3659 auto reusable = details::invalid_thread_id2;
3660 if (mainHash->entries[index].key.compare_exchange_strong(
3661 empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed) ||
3662 mainHash->entries[index].key.compare_exchange_strong(
3663 reusable,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3665 if (mainHash->entries[index].key.compare_exchange_strong(
3666 empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3668 mainHash->entries[index].value = value;
3677 if (probedKey == details::invalid_thread_id) {
3685 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3688 if (newCount >= (mainHash->capacity >> 1) &&
3689 !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3694 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3695 if (newCount >= (mainHash->capacity >> 1)) {
3696 size_t newCapacity = mainHash->capacity << 1;
3697 while (newCount >= (newCapacity >> 1)) {
3700 auto raw =
static_cast<char*
>((Traits::malloc)(
sizeof(ImplicitProducerHash) +
3701 std::alignment_of<ImplicitProducerKVP>::value - 1 +
3702 sizeof(ImplicitProducerKVP) * newCapacity));
3703 if (raw ==
nullptr) {
3705 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3706 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3710 auto newHash =
new (raw) ImplicitProducerHash;
3711 newHash->capacity =
static_cast<size_t>(newCapacity);
3712 newHash->entries =
reinterpret_cast<ImplicitProducerKVP*
>(
3713 details::align_for<ImplicitProducerKVP>(raw +
sizeof(ImplicitProducerHash)));
3714 for (
size_t i = 0; i != newCapacity; ++i) {
3715 new (newHash->entries + i) ImplicitProducerKVP;
3716 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3718 newHash->prev = mainHash;
3719 implicitProducerHash.store(newHash, std::memory_order_release);
3720 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3723 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3730 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3731 auto producer =
static_cast<ImplicitProducer*
>(recycle_or_create_producer(
false));
3732 if (producer ==
nullptr) {
3733 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3737#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3738 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3739 producer->threadExitListener.userData = producer;
3740 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3743 auto index = hashedId;
3745 index &= mainHash->capacity - 1u;
3746 auto empty = details::invalid_thread_id;
3747#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3748 auto reusable = details::invalid_thread_id2;
3749 if (mainHash->entries[index].key.compare_exchange_strong(
3750 reusable,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3751 implicitProducerHashCount.fetch_sub(
3752 1, std::memory_order_relaxed);
3753 mainHash->entries[index].value = producer;
3757 if (mainHash->entries[index].key.compare_exchange_strong(
3758 empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3759 mainHash->entries[index].value = producer;
3770 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3774#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3775 void implicit_producer_thread_exited(ImplicitProducer* producer)
3778#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3779 debug::DebugLock lock(implicitProdMutex);
3781 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3784 auto id = details::thread_id();
3785 auto hashedId = details::hash_thread_id(
id);
3786 details::thread_id_t probedKey;
3790 for (; hash !=
nullptr; hash = hash->prev) {
3791 auto index = hashedId;
3793 index &= hash->capacity - 1u;
3795 if (hash->entries[index].key.compare_exchange_strong(
3796 probedKey, details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3800 }
while (probedKey !=
3801 details::invalid_thread_id);
3806 producer->inactive.store(
true, std::memory_order_release);
3809 static void implicit_producer_thread_exited_callback(
void* userData)
3811 auto producer =
static_cast<ImplicitProducer*
>(userData);
3812 auto queue = producer->parent;
3813 queue->implicit_producer_thread_exited(producer);
3821 template <
typename TAlign>
static inline void* aligned_malloc(
size_t size)
3823 MOODYCAMEL_CONSTEXPR_IF(std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3824 return (Traits::malloc)(size);
3827 size_t alignment = std::alignment_of<TAlign>::value;
3828 void* raw = (Traits::malloc)(size + alignment - 1 +
sizeof(
void*));
3831 char* ptr = details::align_for<TAlign>(
reinterpret_cast<char*
>(raw) +
sizeof(
void*));
3832 *(
reinterpret_cast<void**
>(ptr) - 1) = raw;
3837 template <
typename TAlign>
static inline void aligned_free(
void* ptr)
3839 MOODYCAMEL_CONSTEXPR_IF(std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3840 return (Traits::free)(ptr);
3841 else(Traits::free)(ptr ? *(
reinterpret_cast<void**
>(ptr) - 1) :
nullptr);
3844 template <
typename U>
static inline U* create_array(
size_t count)
3847 U* p =
static_cast<U*
>(aligned_malloc<U>(
sizeof(U) * count));
3851 for (
size_t i = 0; i != count; ++i)
3856 template <
typename U>
static inline void destroy_array(U* p,
size_t count)
3860 for (
size_t i = count; i != 0;)
3866 template <
typename U>
static inline U* create()
3868 void* p = aligned_malloc<U>(
sizeof(U));
3869 return p !=
nullptr ?
new (p) U :
nullptr;
3872 template <
typename U,
typename A1>
static inline U* create(A1&& a1)
3874 void* p = aligned_malloc<U>(
sizeof(U));
3875 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
3878 template <
typename U>
static inline void destroy(U* p)
3886 std::atomic<ProducerBase*> producerListTail;
3887 std::atomic<std::uint32_t> producerCount;
3889 std::atomic<size_t> initialBlockPoolIndex;
3890 Block* initialBlockPool;
3891 size_t initialBlockPoolSize;
3893#ifndef MCDBGQ_USEDEBUGFREELIST
3894 FreeList<Block> freeList;
3896 debug::DebugFreeList<Block> freeList;
3899 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3900 std::atomic<size_t> implicitProducerHashCount;
3901 ImplicitProducerHash initialImplicitProducerHash;
3902 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3903 std::atomic_flag implicitProducerHashResizeInProgress;
3905 std::atomic<std::uint32_t> nextExplicitConsumerId;
3906 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3908#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3909 debug::DebugMutex implicitProdMutex;
3912#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3913 std::atomic<ExplicitProducer*> explicitProducers;
3914 std::atomic<ImplicitProducer*> implicitProducers;
3918template <
typename T,
typename Traits>
3920 : producer(queue.recycle_or_create_producer(
true))
3922 if (producer !=
nullptr) {
3923 producer->token =
this;
3927template <
typename T,
typename Traits>
3931 if (producer !=
nullptr) {
3932 producer->token =
this;
3936template <
typename T,
typename Traits>
3938 : itemsConsumedFromCurrent(0)
3939 , currentProducer(
nullptr)
3940 , desiredProducer(
nullptr)
3942 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3943 lastKnownGlobalOffset =
static_cast<std::uint32_t
>(-1);
3946template <
typename T,
typename Traits>
3948 : itemsConsumedFromCurrent(0)
3949 , currentProducer(
nullptr)
3950 , desiredProducer(
nullptr)
3953 1, std::memory_order_release);
3954 lastKnownGlobalOffset =
static_cast<std::uint32_t
>(-1);
3957template <
typename T,
typename Traits>
3973template <
typename T,
typename Traits>
3982#if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3986#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3987#pragma GCC diagnostic pop
Definition: blockingconcurrentqueue.h:23
Definition: concurrentqueue.h:845
Definition: concurrentqueue.h:639
Definition: concurrentqueue.h:408
Definition: concurrentqueue.h:795
Definition: concurrentqueue.h:735
Definition: concurrentqueue.h:513
Definition: concurrentqueue.h:630
Definition: concurrentqueue.h:525
Definition: concurrentqueue.h:380
Definition: concurrentqueue.h:549
Definition: concurrentqueue.h:327
Definition: concurrentqueue.h:618
Definition: concurrentqueue.h:604
Definition: concurrentqueue.h:708
Definition: concurrentqueue.h:726
Definition: concurrentqueue.h:85
Definition: concurrentqueue.h:395