barretenberg
Loading...
Searching...
No Matches
blockingconcurrentqueue.h
1// Provides an efficient blocking version of moodycamel::ConcurrentQueue.
2// ©2015-2020 Cameron Desrochers. Distributed under the terms of the simplified
3// BSD license, available at the top of concurrentqueue.h.
4// Also dual-licensed under the Boost Software License (see LICENSE.md)
5// Uses Jeff Preshing's semaphore implementation (under the terms of its
6// separate zlib license, see lightweightsemaphore.h).
7
8#pragma once
9
10#include "concurrentqueue.h"
11#include "lightweightsemaphore.h"
12
13#include <type_traits>
14#include <cerrno>
15#include <memory>
16#include <chrono>
17#include <ctime>
18
19namespace moodycamel {
20// This is a blocking version of the queue. It has an almost identical interface to
21// the normal non-blocking version, with the addition of various wait_dequeue() methods
22// and the removal of producer-specific dequeue methods.
23template <typename T, typename Traits = ConcurrentQueueDefaultTraits> class BlockingConcurrentQueue {
24 private:
25 typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
27
28 public:
31
32 typedef typename ConcurrentQueue::index_t index_t;
33 typedef typename ConcurrentQueue::size_t size_t;
34 typedef typename std::make_signed<size_t>::type ssize_t;
35
36 static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
37 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD =
38 ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
39 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
40 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
41 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
42 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE =
43 ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
44 static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
45
46 public:
47 // Creates a queue with at least `capacity` element slots; note that the
48 // actual number of elements that can be inserted without additional memory
49 // allocation depends on the number of producers and the block size (e.g. if
50 // the block size is equal to `capacity`, only a single block will be allocated
51 // up-front, which means only a single producer will be able to enqueue elements
52 // without an extra allocation -- blocks aren't shared between producers).
53 // This method is not thread safe -- it is up to the user to ensure that the
54 // queue is fully constructed before it starts being used by other threads (this
55 // includes making the memory effects of construction visible, possibly with a
56 // memory barrier).
57 explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
58 : inner(capacity)
59 , sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS),
60 &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
61 {
62 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) ==
63 &((BlockingConcurrentQueue*)1)->inner &&
64 "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
65 if (!sema) {
66 MOODYCAMEL_THROW(std::bad_alloc());
67 }
68 }
69
70 BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
71 : inner(minCapacity, maxExplicitProducers, maxImplicitProducers)
72 , sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS),
73 &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
74 {
75 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) ==
76 &((BlockingConcurrentQueue*)1)->inner &&
77 "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
78 if (!sema) {
79 MOODYCAMEL_THROW(std::bad_alloc());
80 }
81 }
82
83 // Disable copying and copy assignment
84 BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
85 BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
86
87 // Moving is supported, but note that it is *not* a thread-safe operation.
88 // Nobody can use the queue while it's being moved, and the memory effects
89 // of that move must be propagated to other threads before they can use it.
90 // Note: When a queue is moved, its tokens are still valid but can only be
91 // used with the destination queue (i.e. semantically they are moved along
92 // with the queue itself).
93 BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT : inner(std::move(other.inner)),
94 sema(std::move(other.sema))
95 {}
96
97 inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
98 {
99 return swap_internal(other);
100 }
101
102 // Swaps this queue's state with the other's. Not thread-safe.
103 // Swapping two queues does not invalidate their tokens, however
104 // the tokens that were created for one queue must be used with
105 // only the swapped queue (i.e. the tokens are tied to the
106 // queue's movable state, not the object itself).
107 inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT { swap_internal(other); }
108
109 private:
111 {
112 if (this == &other) {
113 return *this;
114 }
115
116 inner.swap(other.inner);
117 sema.swap(other.sema);
118 return *this;
119 }
120
121 public:
122 // Enqueues a single item (by copying it).
123 // Allocates memory if required. Only fails if memory allocation fails (or implicit
124 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
125 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
126 // Thread-safe.
127 inline bool enqueue(T const& item)
128 {
129 if ((details::likely)(inner.enqueue(item))) {
130 sema->signal();
131 return true;
132 }
133 return false;
134 }
135
136 // Enqueues a single item (by moving it, if possible).
137 // Allocates memory if required. Only fails if memory allocation fails (or implicit
138 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
139 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
140 // Thread-safe.
141 inline bool enqueue(T&& item)
142 {
143 if ((details::likely)(inner.enqueue(std::move(item)))) {
144 sema->signal();
145 return true;
146 }
147 return false;
148 }
149
150 // Enqueues a single item (by copying it) using an explicit producer token.
151 // Allocates memory if required. Only fails if memory allocation fails (or
152 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
153 // Thread-safe.
154 inline bool enqueue(producer_token_t const& token, T const& item)
155 {
156 if ((details::likely)(inner.enqueue(token, item))) {
157 sema->signal();
158 return true;
159 }
160 return false;
161 }
162
163 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
164 // Allocates memory if required. Only fails if memory allocation fails (or
165 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
166 // Thread-safe.
167 inline bool enqueue(producer_token_t const& token, T&& item)
168 {
169 if ((details::likely)(inner.enqueue(token, std::move(item)))) {
170 sema->signal();
171 return true;
172 }
173 return false;
174 }
175
176 // Enqueues several items.
177 // Allocates memory if required. Only fails if memory allocation fails (or
178 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
179 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
180 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
181 // Thread-safe.
182 template <typename It> inline bool enqueue_bulk(It itemFirst, size_t count)
183 {
184 if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
185 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
186 return true;
187 }
188 return false;
189 }
190
191 // Enqueues several items using an explicit producer token.
192 // Allocates memory if required. Only fails if memory allocation fails
193 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
194 // Note: Use std::make_move_iterator if the elements should be moved
195 // instead of copied.
196 // Thread-safe.
197 template <typename It> inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
198 {
199 if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
200 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
201 return true;
202 }
203 return false;
204 }
205
206 // Enqueues a single item (by copying it).
207 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
208 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
209 // is 0).
210 // Thread-safe.
211 inline bool try_enqueue(T const& item)
212 {
213 if (inner.try_enqueue(item)) {
214 sema->signal();
215 return true;
216 }
217 return false;
218 }
219
220 // Enqueues a single item (by moving it, if possible).
221 // Does not allocate memory (except for one-time implicit producer).
222 // Fails if not enough room to enqueue (or implicit production is
223 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
224 // Thread-safe.
225 inline bool try_enqueue(T&& item)
226 {
227 if (inner.try_enqueue(std::move(item))) {
228 sema->signal();
229 return true;
230 }
231 return false;
232 }
233
234 // Enqueues a single item (by copying it) using an explicit producer token.
235 // Does not allocate memory. Fails if not enough room to enqueue.
236 // Thread-safe.
237 inline bool try_enqueue(producer_token_t const& token, T const& item)
238 {
239 if (inner.try_enqueue(token, item)) {
240 sema->signal();
241 return true;
242 }
243 return false;
244 }
245
246 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
247 // Does not allocate memory. Fails if not enough room to enqueue.
248 // Thread-safe.
249 inline bool try_enqueue(producer_token_t const& token, T&& item)
250 {
251 if (inner.try_enqueue(token, std::move(item))) {
252 sema->signal();
253 return true;
254 }
255 return false;
256 }
257
258 // Enqueues several items.
259 // Does not allocate memory (except for one-time implicit producer).
260 // Fails if not enough room to enqueue (or implicit production is
261 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
262 // Note: Use std::make_move_iterator if the elements should be moved
263 // instead of copied.
264 // Thread-safe.
265 template <typename It> inline bool try_enqueue_bulk(It itemFirst, size_t count)
266 {
267 if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
268 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
269 return true;
270 }
271 return false;
272 }
273
274 // Enqueues several items using an explicit producer token.
275 // Does not allocate memory. Fails if not enough room to enqueue.
276 // Note: Use std::make_move_iterator if the elements should be moved
277 // instead of copied.
278 // Thread-safe.
279 template <typename It> inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
280 {
281 if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
282 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
283 return true;
284 }
285 return false;
286 }
287
288 // Attempts to dequeue from the queue.
289 // Returns false if all producer streams appeared empty at the time they
290 // were checked (so, the queue is likely but not guaranteed to be empty).
291 // Never allocates. Thread-safe.
292 template <typename U> inline bool try_dequeue(U& item)
293 {
294 if (sema->tryWait()) {
295 while (!inner.try_dequeue(item)) {
296 continue;
297 }
298 return true;
299 }
300 return false;
301 }
302
303 // Attempts to dequeue from the queue using an explicit consumer token.
304 // Returns false if all producer streams appeared empty at the time they
305 // were checked (so, the queue is likely but not guaranteed to be empty).
306 // Never allocates. Thread-safe.
307 template <typename U> inline bool try_dequeue(consumer_token_t& token, U& item)
308 {
309 if (sema->tryWait()) {
310 while (!inner.try_dequeue(token, item)) {
311 continue;
312 }
313 return true;
314 }
315 return false;
316 }
317
318 // Attempts to dequeue several elements from the queue.
319 // Returns the number of items actually dequeued.
320 // Returns 0 if all producer streams appeared empty at the time they
321 // were checked (so, the queue is likely but not guaranteed to be empty).
322 // Never allocates. Thread-safe.
323 template <typename It> inline size_t try_dequeue_bulk(It itemFirst, size_t max)
324 {
325 size_t count = 0;
326 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
327 while (count != max) {
328 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
329 }
330 return count;
331 }
332
333 // Attempts to dequeue several elements from the queue using an explicit consumer token.
334 // Returns the number of items actually dequeued.
335 // Returns 0 if all producer streams appeared empty at the time they
336 // were checked (so, the queue is likely but not guaranteed to be empty).
337 // Never allocates. Thread-safe.
338 template <typename It> inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
339 {
340 size_t count = 0;
341 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
342 while (count != max) {
343 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
344 }
345 return count;
346 }
347
348 // Blocks the current thread until there's something to dequeue, then
349 // dequeues it.
350 // Never allocates. Thread-safe.
351 template <typename U> inline void wait_dequeue(U& item)
352 {
353 while (!sema->wait()) {
354 continue;
355 }
356 while (!inner.try_dequeue(item)) {
357 continue;
358 }
359 }
360
361 // Blocks the current thread until either there's something to dequeue
362 // or the timeout (specified in microseconds) expires. Returns false
363 // without setting `item` if the timeout expires, otherwise assigns
364 // to `item` and returns true.
365 // Using a negative timeout indicates an indefinite timeout,
366 // and is thus functionally equivalent to calling wait_dequeue.
367 // Never allocates. Thread-safe.
368 template <typename U> inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
369 {
370 if (!sema->wait(timeout_usecs)) {
371 return false;
372 }
373 while (!inner.try_dequeue(item)) {
374 continue;
375 }
376 return true;
377 }
378
379 // Blocks the current thread until either there's something to dequeue
380 // or the timeout expires. Returns false without setting `item` if the
381 // timeout expires, otherwise assigns to `item` and returns true.
382 // Never allocates. Thread-safe.
383 template <typename U, typename Rep, typename Period>
384 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
385 {
386 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
387 }
388
389 // Blocks the current thread until there's something to dequeue, then
390 // dequeues it using an explicit consumer token.
391 // Never allocates. Thread-safe.
392 template <typename U> inline void wait_dequeue(consumer_token_t& token, U& item)
393 {
394 while (!sema->wait()) {
395 continue;
396 }
397 while (!inner.try_dequeue(token, item)) {
398 continue;
399 }
400 }
401
402 // Blocks the current thread until either there's something to dequeue
403 // or the timeout (specified in microseconds) expires. Returns false
404 // without setting `item` if the timeout expires, otherwise assigns
405 // to `item` and returns true.
406 // Using a negative timeout indicates an indefinite timeout,
407 // and is thus functionally equivalent to calling wait_dequeue.
408 // Never allocates. Thread-safe.
409 template <typename U> inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
410 {
411 if (!sema->wait(timeout_usecs)) {
412 return false;
413 }
414 while (!inner.try_dequeue(token, item)) {
415 continue;
416 }
417 return true;
418 }
419
420 // Blocks the current thread until either there's something to dequeue
421 // or the timeout expires. Returns false without setting `item` if the
422 // timeout expires, otherwise assigns to `item` and returns true.
423 // Never allocates. Thread-safe.
424 template <typename U, typename Rep, typename Period>
425 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
426 {
427 return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
428 }
429
430 // Attempts to dequeue several elements from the queue.
431 // Returns the number of items actually dequeued, which will
432 // always be at least one (this method blocks until the queue
433 // is non-empty) and at most max.
434 // Never allocates. Thread-safe.
435 template <typename It> inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
436 {
437 size_t count = 0;
438 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
439 while (count != max) {
440 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
441 }
442 return count;
443 }
444
445 // Attempts to dequeue several elements from the queue.
446 // Returns the number of items actually dequeued, which can
447 // be 0 if the timeout expires while waiting for elements,
448 // and at most max.
449 // Using a negative timeout indicates an indefinite timeout,
450 // and is thus functionally equivalent to calling wait_dequeue_bulk.
451 // Never allocates. Thread-safe.
452 template <typename It> inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
453 {
454 size_t count = 0;
455 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
456 while (count != max) {
457 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
458 }
459 return count;
460 }
461
462 // Attempts to dequeue several elements from the queue.
463 // Returns the number of items actually dequeued, which can
464 // be 0 if the timeout expires while waiting for elements,
465 // and at most max.
466 // Never allocates. Thread-safe.
467 template <typename It, typename Rep, typename Period>
468 inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
469 {
470 return wait_dequeue_bulk_timed<It&>(
471 itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
472 }
473
474 // Attempts to dequeue several elements from the queue using an explicit consumer token.
475 // Returns the number of items actually dequeued, which will
476 // always be at least one (this method blocks until the queue
477 // is non-empty) and at most max.
478 // Never allocates. Thread-safe.
479 template <typename It> inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
480 {
481 size_t count = 0;
482 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
483 while (count != max) {
484 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
485 }
486 return count;
487 }
488
489 // Attempts to dequeue several elements from the queue using an explicit consumer token.
490 // Returns the number of items actually dequeued, which can
491 // be 0 if the timeout expires while waiting for elements,
492 // and at most max.
493 // Using a negative timeout indicates an indefinite timeout,
494 // and is thus functionally equivalent to calling wait_dequeue_bulk.
495 // Never allocates. Thread-safe.
496 template <typename It>
497 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
498 {
499 size_t count = 0;
500 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
501 while (count != max) {
502 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
503 }
504 return count;
505 }
506
507 // Attempts to dequeue several elements from the queue using an explicit consumer token.
508 // Returns the number of items actually dequeued, which can
509 // be 0 if the timeout expires while waiting for elements,
510 // and at most max.
511 // Never allocates. Thread-safe.
512 template <typename It, typename Rep, typename Period>
513 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token,
514 It itemFirst,
515 size_t max,
516 std::chrono::duration<Rep, Period> const& timeout)
517 {
518 return wait_dequeue_bulk_timed<It&>(
519 token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
520 }
521
522 // Returns an estimate of the total number of elements currently in the queue. This
523 // estimate is only accurate if the queue has completely stabilized before it is called
524 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
525 // visible on the calling thread, and no further operations start while this method is
526 // being called).
527 // Thread-safe.
528 inline size_t size_approx() const { return (size_t)sema->availableApprox(); }
529
530 // Returns true if the underlying atomic variables used by
531 // the queue are lock-free (they should be on most platforms).
532 // Thread-safe.
533 static constexpr bool is_lock_free() { return ConcurrentQueue::is_lock_free(); }
534
535 private:
536 template <typename U, typename A1, typename A2> static inline U* create(A1&& a1, A2&& a2)
537 {
538 void* p = (Traits::malloc)(sizeof(U));
539 return p != nullptr ? new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) : nullptr;
540 }
541
542 template <typename U> static inline void destroy(U* p)
543 {
544 if (p != nullptr) {
545 p->~U();
546 }
547 (Traits::free)(p);
548 }
549
550 private:
551 ConcurrentQueue inner;
552 std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
553};
554
555template <typename T, typename Traits>
556inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
557{
558 a.swap(b);
559}
560
561} // end namespace moodycamel
Definition: blockingconcurrentqueue.h:23
Definition: concurrentqueue.h:845
Definition: lightweightsemaphore.h:246
Definition: concurrentqueue.h:795
Definition: concurrentqueue.h:735