10#include "concurrentqueue.h"
11#include "lightweightsemaphore.h"
32 typedef typename ConcurrentQueue::index_t index_t;
33 typedef typename ConcurrentQueue::size_t size_t;
34 typedef typename std::make_signed<size_t>::type ssize_t;
36 static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
37 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
38 ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
39 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
40 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
41 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
42 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
43 ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
44 static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
59 , sema(create<LightweightSemaphore, ssize_t, int>(0, (
int)Traits::MAX_SEMA_SPINS),
60 &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
64 "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
66 MOODYCAMEL_THROW(std::bad_alloc());
71 : inner(minCapacity, maxExplicitProducers, maxImplicitProducers)
72 , sema(create<LightweightSemaphore, ssize_t, int>(0, (
int)Traits::MAX_SEMA_SPINS),
73 &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
77 "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
79 MOODYCAMEL_THROW(std::bad_alloc());
94 sema(std::move(other.sema))
99 return swap_internal(other);
112 if (
this == &other) {
116 inner.swap(other.inner);
117 sema.swap(other.sema);
127 inline bool enqueue(T
const& item)
129 if ((details::likely)(inner.enqueue(item))) {
141 inline bool enqueue(T&& item)
143 if ((details::likely)(inner.enqueue(std::move(item)))) {
156 if ((details::likely)(inner.enqueue(token, item))) {
169 if ((details::likely)(inner.enqueue(token, std::move(item)))) {
182 template <
typename It>
inline bool enqueue_bulk(It itemFirst,
size_t count)
184 if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
185 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
197 template <
typename It>
inline bool enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
199 if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
200 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
211 inline bool try_enqueue(T
const& item)
213 if (inner.try_enqueue(item)) {
225 inline bool try_enqueue(T&& item)
227 if (inner.try_enqueue(std::move(item))) {
239 if (inner.try_enqueue(token, item)) {
251 if (inner.try_enqueue(token, std::move(item))) {
265 template <
typename It>
inline bool try_enqueue_bulk(It itemFirst,
size_t count)
267 if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
268 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
279 template <
typename It>
inline bool try_enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
281 if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
282 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
292 template <
typename U>
inline bool try_dequeue(U& item)
294 if (sema->tryWait()) {
295 while (!inner.try_dequeue(item)) {
307 template <
typename U>
inline bool try_dequeue(
consumer_token_t& token, U& item)
309 if (sema->tryWait()) {
310 while (!inner.try_dequeue(token, item)) {
323 template <
typename It>
inline size_t try_dequeue_bulk(It itemFirst,
size_t max)
326 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
327 while (count != max) {
328 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
338 template <
typename It>
inline size_t try_dequeue_bulk(
consumer_token_t& token, It itemFirst,
size_t max)
341 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
342 while (count != max) {
343 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
351 template <
typename U>
inline void wait_dequeue(U& item)
353 while (!sema->wait()) {
356 while (!inner.try_dequeue(item)) {
368 template <
typename U>
inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
370 if (!sema->wait(timeout_usecs)) {
373 while (!inner.try_dequeue(item)) {
383 template <
typename U,
typename Rep,
typename Period>
384 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period>
const& timeout)
386 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
392 template <
typename U>
inline void wait_dequeue(
consumer_token_t& token, U& item)
394 while (!sema->wait()) {
397 while (!inner.try_dequeue(token, item)) {
409 template <
typename U>
inline bool wait_dequeue_timed(
consumer_token_t& token, U& item, std::int64_t timeout_usecs)
411 if (!sema->wait(timeout_usecs)) {
414 while (!inner.try_dequeue(token, item)) {
424 template <
typename U,
typename Rep,
typename Period>
425 inline bool wait_dequeue_timed(
consumer_token_t& token, U& item, std::chrono::duration<Rep, Period>
const& timeout)
427 return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
435 template <
typename It>
inline size_t wait_dequeue_bulk(It itemFirst,
size_t max)
438 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
439 while (count != max) {
440 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
452 template <
typename It>
inline size_t wait_dequeue_bulk_timed(It itemFirst,
size_t max, std::int64_t timeout_usecs)
455 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
456 while (count != max) {
457 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
467 template <
typename It,
typename Rep,
typename Period>
468 inline size_t wait_dequeue_bulk_timed(It itemFirst,
size_t max, std::chrono::duration<Rep, Period>
const& timeout)
470 return wait_dequeue_bulk_timed<It&>(
471 itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
479 template <
typename It>
inline size_t wait_dequeue_bulk(
consumer_token_t& token, It itemFirst,
size_t max)
482 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
483 while (count != max) {
484 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
496 template <
typename It>
497 inline size_t wait_dequeue_bulk_timed(
consumer_token_t& token, It itemFirst,
size_t max, std::int64_t timeout_usecs)
500 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
501 while (count != max) {
502 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
512 template <
typename It,
typename Rep,
typename Period>
516 std::chrono::duration<Rep, Period>
const& timeout)
518 return wait_dequeue_bulk_timed<It&>(
519 token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
528 inline size_t size_approx()
const {
return (
size_t)sema->availableApprox(); }
533 static constexpr bool is_lock_free() {
return ConcurrentQueue::is_lock_free(); }
536 template <
typename U,
typename A1,
typename A2>
static inline U* create(A1&& a1, A2&& a2)
538 void* p = (Traits::malloc)(
sizeof(U));
539 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) :
nullptr;
542 template <
typename U>
static inline void destroy(U* p)
555template <
typename T,
typename Traits>
Definition: blockingconcurrentqueue.h:23
Definition: concurrentqueue.h:845
Definition: lightweightsemaphore.h:246
Definition: concurrentqueue.h:795
Definition: concurrentqueue.h:735