SeqAn3
The Modern C++ library for sequence analysis.
buffer_queue.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2019, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2019, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <atomic>
16 #include <cmath>
17 #include <mutex>
18 #include <new>
19 #include <shared_mutex>
20 #include <type_traits>
21 #include <vector>
22 
26 #include <seqan3/std/algorithm>
27 #include <seqan3/std/concepts>
28 #include <seqan3/std/new>
29 #include <seqan3/std/ranges>
30 #include <seqan3/std/span>
31 
32 namespace seqan3::contrib
33 {
34 
36 enum class queue_op_status : uint8_t
37 {
38  success = 0,
39  empty,
40  full,
41  closed
42 };
43 
44 enum struct buffer_queue_policy : uint8_t
45 {
46  fixed,
47  dynamic
48 };
49 
50 template <std::Semiregular value_t,
51  SequenceContainer buffer_t = std::vector<value_t>,
52  buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
53 class buffer_queue
54 {
55 public:
56 
57  using buffer_type = buffer_t;
58  using value_type = typename buffer_type::value_type;
59  using size_type = typename buffer_type::size_type;
60  using reference = void;
61  using const_reference = void;
62 
63  // Default constructor sets capacity to 1 (still empty)
64  buffer_queue() : buffer_queue{0u}
65  {}
66  buffer_queue(buffer_queue const &) = delete;
67  buffer_queue(buffer_queue &&) = default;
68  buffer_queue & operator=(buffer_queue const &) = delete;
69  buffer_queue & operator=(buffer_queue &&) = default;
70  ~buffer_queue() = default;
71 
72  // you can set the initial capacity here
73  explicit buffer_queue(size_type const init_capacity)
74  {
75  data.resize(init_capacity + 1);
76  roundSize = static_cast<size_type>(1) << static_cast<size_type>(std::log2(data.size() - 1) + 1);
77  }
78 
79  template <std::ranges::InputRange range_type>
81  buffer_queue(size_type const init_capacity, range_type && r) : buffer_queue{init_capacity}
82  {
84  }
85 
89  template <typename value2_t>
91  void push(value2_t && value)
92  {
93  detail::spin_delay delay{};
94 
95  for (;;)
96  {
97  auto status = try_push(std::forward<value2_t>(value));
98  if (status == queue_op_status::closed)
99  throw queue_op_status::closed;
100  else if (status == queue_op_status::success)
101  return;
102 
103  assert(status != queue_op_status::empty);
104  assert(status == queue_op_status::full);
105  delay.wait(); // pause and then try again.
106  }
107  } // throws if closed
108 
109  template <typename value2_t>
111  queue_op_status wait_push(value2_t && value)
112  {
113  detail::spin_delay delay{};
114 
115  for (;;)
116  {
117  auto status = try_push(std::forward<value2_t>(value));
118  // wait until queue is not full anymore..
119  if (status != queue_op_status::full)
120  return status;
121 
122  assert(status != queue_op_status::empty);
123  assert(status == queue_op_status::full);
124  delay.wait(); // pause and then try again.
125  }
126  }
127 
128  value_type value_pop() // throws if closed
129  {
130  detail::spin_delay delay{};
131 
132  value_type value{};
133  for (;;)
134  {
135  auto status = try_pop(value);
136 
137  if (status == queue_op_status::closed)
138  throw queue_op_status::closed;
139  else if (status == queue_op_status::success)
140  return value;
141 
142  assert(status != queue_op_status::full);
143  assert(status == queue_op_status::empty);
144  delay.wait(); // pause and then try again.
145  }
146  }
147 
148  queue_op_status wait_pop(value_type & value)
149  {
150  detail::spin_delay delay{};
151 
152  queue_op_status status;
153  for (;;)
154  {
155  status = try_pop(value);
156 
157  if (status == queue_op_status::closed || status == queue_op_status::success)
158  break;
159 
160  assert(status != queue_op_status::full);
161  assert(status == queue_op_status::empty);
162  delay.wait(); // pause and then try again.
163  }
164  return status;
165  }
167 
171  template <typename value2_t>
173  queue_op_status try_push(value2_t &&);
174 
175  queue_op_status try_pop(value_t &);
177 
181  void close() noexcept
182  {
183  closed_flag.store(true, std::memory_order_release);
184  }
185 
186  bool is_closed() const noexcept
187  {
188  return closed_flag.load(std::memory_order_acquire);
189  }
190 
191  bool is_empty() const noexcept
192  {
193  std::unique_lock write_lock(mutex);
194  return headPos == tailPos;
195  }
196 
197  bool is_full() const noexcept
198  {
199  std::unique_lock write_lock(mutex);
200  return size_impl() == data.max_size();
201  }
202 
203  size_type size() const noexcept
204  {
205  std::unique_lock write_lock(mutex);
206  return size_impl();
207  }
209 private:
210 
211  // Needed in two functions that both acquire exclusive rights.
212  size_type size_impl() const noexcept
213  {
214  size_type mask = roundSize - 1;
215  if ((headPos & mask) <= (tailPos & mask))
216  return tailPos - headPos;
217  else
218  return tailPos - headPos - (roundSize - data.size());
219  }
220 
221  size_type cyclic_increment(size_type value,
222  size_type modulo,
223  size_type roundSize)
224  {
225  // invariants:
226  // - roundSize is a power of 2
227  // - (value % roundSize) is in [0, modulo)
228  //
229  // return the next greater value that fulfils the invariants
230  // increment write position & roundSize 2^b- 1 -> all bits set.
231  if ((++value & (roundSize - 1)) >= modulo)
232  value += roundSize - modulo;
233  return value;
234  }
235 
236  template <typename value2_t>
238  (buffer_policy == buffer_queue_policy::fixed)
239  bool overflow(value2_t &&)
240  {
241  return false;
242  }
243 
244  template <typename value2_t>
246  (buffer_policy == buffer_queue_policy::dynamic)
247  bool overflow(value2_t && value);
248 
250  buffer_t data;
258 };
259 
260 // Specifies a fixed size buffer queue.
261 template <std::Semiregular value_t, SequenceContainer buffer_t = std::vector<value_t>>
262 using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
263 
264 // Specifies a dynamic size buffer queue (growable).
265 template <std::Semiregular value_t, SequenceContainer buffer_t = std::vector<value_t>>
266 using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
267 
268 // ============================================================================
269 // Metafunctions
270 // ============================================================================
271 
272 // ============================================================================
273 // Functions
274 // ============================================================================
275 
276 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
277 template <typename value2_t>
279  (buffer_policy == buffer_queue_policy::dynamic)
280 inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
281 {
282  // try to extend capacity
283  std::unique_lock write_lock{mutex};
284  size_type cap = data.size();
285  size_type roundSize = this->roundSize;
286  size_type headPos = this->headPos;
287  size_type tailPos = this->tailPos;
288 
289  assert(tailPos == this->tailWritePos);
290  assert(headPos == this->headReadPos);
291 
292  bool valueWasAppended = false;
293 
294  // did we reach the capacity limit (another thread could have done the upgrade already)?
295  if (cyclic_increment(tailPos, cap, roundSize) >= headPos + roundSize)
296  {
297  if (cap != 0)
298  {
299  // tailPos & roundSize - 1 == tailPos % capacity
300  auto it = std::ranges::begin(data) + (tailPos & (roundSize - 1));
301  *it = std::forward<value2_t>(value);
302  tailPos = headPos + roundSize;
303  valueWasAppended = true;
304  }
305 
306  assert(tailPos == headPos + roundSize);
307 
308  // get positions of head/tail in current data sequence
309  size_type headIdx = headPos & (roundSize - 1);
310  size_type tailIdx = tailPos & (roundSize - 1);
311 
312  // increase capacity
313  data.resize(cap + 1);
314  size_type delta = data.size() - cap;
315  assert(delta == 1);
316  roundSize = static_cast<size_type>(1) << ((data.size() > 1) ? static_cast<size_type>(std::log2(data.size() - 1) + 1) : 1);
317 
318  std::ranges::move_backward(std::span{data.data() + headIdx, data.data() + cap}, data.data() + data.size());
319  if (cap != 0)
320  {
321  this->headReadPos = this->headPos = headIdx + delta;
322  this->tailWritePos = this->tailPos = tailIdx + roundSize;
323  }
324  this->roundSize = roundSize;
325  }
326  return valueWasAppended;
327 }
328 
329 // ----------------------------------------------------------------------------
330 // Function try_pop()
331 // ----------------------------------------------------------------------------
332 
333 /*
334  * @fn ConcurrentQueue#tryPopFront
335  * @headerfile <seqan/parallel.h>
336  * @brief Try to dequeue a value from a queue.
337  *
338  * @signature bool tryPopFront(result, queue[, parallelTag]);
339  *
340  *
341  * @param[in,out] queue A queue.
342  * @param[out] result The dequeued value (if available).
343  * @param[in] parallelTag The concurrency scheme. If multiple threads dequeue values concurrently this tag must be
344  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
345  * @endlink tag can only be used if one thread calls <tt>popFront</tt> at a time.
346  * Default is @link ParallelismTags#Parallel @endlink.
347  * @return bool Returns <tt>true</tt> if a value could be dequeued and <tt>false</tt> otherwise.
348  */
349 
350 //
351 // [ ? ] [ 4 ] [ 3 ] [ 8 ] [ 0 ] [ x ] [ ? ]
352 // | ^
353 // v |
354 // head headRead tail tailWrite
355 //
356 // empty = (head == tail)
357 // full = (tail + 1 == head)
358 //
359 // valid data between [headRead, tail)
360 // currently filled [tail, tailWrite)
361 // currently removed [head, headRead)
362 
363 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
364 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
365 {
366  // try to extract a value
367  std::shared_lock read_lock{mutex};
368 
369  size_type cap = data.size();
370  size_type roundSize = this->roundSize;
371  size_type headReadPos;
372  size_type newHeadReadPos;
373  detail::spin_delay spinDelay;
374 
375  // wait for queue to become filled
376  while (true)
377  {
378  headReadPos = this->headReadPos;
379  size_type tailPos = this->tailPos;
380 
381  assert(headReadPos <= tailPos);
382 
383  // return if queue is empty
384  if (headReadPos == tailPos)
385  {
386  if (is_closed()) // if empty and closed, no more data is expected.
387  return queue_op_status::closed;
388  return queue_op_status::empty;
389  }
390 
391  newHeadReadPos = cyclic_increment(headReadPos, cap, roundSize);
392 
393  if (this->headReadPos.compare_exchange_weak(headReadPos, newHeadReadPos))
394  break;
395 
396  spinDelay.wait();
397  }
398 
399  // extract value and destruct it in the data string
400  result = std::ranges::iter_move(data.begin() + (headReadPos & (roundSize - 1)));
401 
402  // wait for pending previous reads and synchronize headPos to headReadPos
403  {
404  detail::spin_delay delay{};
405  size_type old = headReadPos;
406  while (!this->headPos.compare_exchange_weak(old, newHeadReadPos))
407  {
408  old = headReadPos;
409  delay.wait(); // add adapting delay in case of high contention.
410  }
411  }
412 
413  return queue_op_status::success;
414 }
415 
416 // ----------------------------------------------------------------------------
417 // Function try_push()
418 // ----------------------------------------------------------------------------
419 
420 /*
421  * @fn ConcurrentQueue#appendValue
422  * @headerfile <seqan/parallel.h>
423  * @brief Enqueue a value to a queue.
424  *
425  * @signature void appendValue(queue, val[, expandTag[, parallelTag]);
426  *
427  *
428  * @param[in,out] queue A queue.
429  * @param[in] val The value to enqueue.
430  * @param[in] expandTag The overflow strategy. If @link OverflowStrategyTags#Generous @endlink the queue will be
431  * automatically resized if the capacity is exceeded, otherwise the thread spinlocks until
432  * the element can be enqueued.
433  * Default is the @link DefaultOverflowImplicit @endlink result for the <tt>queue</tt> type.
434  * @param[in] parallelTag The concurrency scheme. If multiple threads enqueue values concurrently this tag must be
435  * @link ParallelismTags#Parallel @endlink. The more efficient @link ParallelismTags#Serial
436  * @endlink tag can only be used if one thread calls <tt>appendValue</tt> at a time.
437  * Default is @link ParallelismTags#Parallel @endlink.
438  */
439 //
440 template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy>
441 template <typename value2_t>
443 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
444 {
445  // try to push the value
446  {
447  detail::spin_delay delay{};
448 
449  std::shared_lock read_lock(mutex);
450 
451  if (is_closed())
452  return queue_op_status::closed;
453 
454  size_type cap = data.size();
455  size_type roundSize = this->roundSize;
456 
457  while (true)
458  {
459  size_type tailWritePos = this->tailWritePos;
460  size_type newTailWritePos = cyclic_increment(tailWritePos, cap, roundSize);
461  size_type headPos = this->headPos;
462 
463  assert(newTailWritePos <= (headPos + roundSize + 1));
464 
465  // break if we have a wrap around, i.e. queue is full
466  if (newTailWritePos >= headPos + roundSize)
467  break;
468 
469  if (this->tailWritePos.compare_exchange_weak(tailWritePos, newTailWritePos))
470  {
471  auto it = std::ranges::begin(data) + (tailWritePos & (roundSize - 1));
472  *it = std::forward<value2_t>(value);
473  // here we construct the value into the reserved storage.
474 
475  // wait for pending previous writes and synchronise tailPos to tailWritePos
476  {
477  detail::spin_delay delay{};
478  size_type old = tailWritePos;
479  while (!this->tailPos.compare_exchange_weak(old, newTailWritePos))
480  {
481  old = tailWritePos;
482  delay.wait(); // add adapting delay in case of high contention.
483  }
484  }
485  return queue_op_status::success;
486  }
487 
488  delay.wait();
489  }
490  }
491 
492  // if possible extend capacity and return.
493  if (overflow(std::forward<value2_t>(value)))
494  {
495  return queue_op_status::success; // always return success, since the queue dynamically resizes and cannot be full.
496  }
497 
498  // We could not extend the queue so it must be full.
499  return queue_op_status::full;
500 }
502 } // namespace seqan3::contrib
Provides C++17/20 additions to the <new> header, if they are not already available.
Subsumes std::Copyable and std::DefaultConstructible.
::ranges::size size
Alias for ranges::size. Obtains the size of a range whose size can be calculated in constant time...
Definition: ranges:189
::ranges::data data
Alias for ranges::data. Returns a pointer the block of data of a ContiguousRange. ...
Definition: ranges:184
::ranges::iter_move iter_move
Alias for ranges::iter_move. Casts the result of dereferencing an object to its associated rvalue ref...
Definition: iterator:336
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition: new:33
The Concepts library.
Definition: buffer_queue.hpp:32
Adaptations of concepts from the Ranges TS.
::ranges::begin begin
Alias for ranges::begin. Returns an iterator to the beginning of a range.
Definition: ranges:174
Provides std::span from the C++20 standard library.
::ranges::copy copy
Alias for ranges::copy. Copies a range of elements to a new location.
Definition: algorithm:44
T fixed(T... args)
::ranges::move_backward move_backward
Alias for ranges::move_backward. Moves a range of elements backward to a new location starting from t...
Definition: algorithm:84
Adaptations of algorithms from the Ranges TS.
::ranges::empty empty
Alias for ranges::empty. Checks whether a range is empty.
Definition: ranges:194
Provides seqan3::detail::spin_delay.
Provides various transformation traits used by the range module.
Adaptations of concepts from the standard library.
The concept std::ConvertibleTo<From, To> specifies that an expression of the type and value category ...