SeqAn3
The Modern C++ library for sequence analysis.
execution_handler_parallel.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2019, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2019, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <functional>
16 #include <thread>
17 #include <type_traits>
18 #include <vector>
19 
22 #include <seqan3/core/platform.hpp>
23 #include <seqan3/std/concepts>
24 #include <seqan3/std/ranges>
25 
26 namespace seqan3::detail
27 {
28 
44 class execution_handler_parallel
45 {
46 private:
48  using task_type = std::function<void()>;
49 
50 public:
63  execution_handler_parallel(size_t const thread_count) : state{std::make_unique<internal_state>()}
64  {
65  auto * q = &(state->queue);
66  for (size_t i = 0; i < thread_count; ++i)
67  {
68  state->thread_pool.emplace_back([q] ()
69  {
70  for (;;)
71  {
72  task_type task;
73  if (q->wait_pop(task) == contrib::queue_op_status::closed)
74  return;
75 
76  task();
77  }
78  });
79  }
80  }
81 
83  execution_handler_parallel() : execution_handler_parallel{std::thread::hardware_concurrency()}
84  {}
85 
86  execution_handler_parallel(execution_handler_parallel const &) = delete;
87  execution_handler_parallel(execution_handler_parallel &&) = default;
88  execution_handler_parallel & operator=(execution_handler_parallel const &) = delete;
89  execution_handler_parallel & operator=(execution_handler_parallel &&) = default;
90 
92  ~execution_handler_parallel()
93  {
94  if (state != nullptr)
95  wait();
96  }
98 
102  template <typename fn_type, typename first_range_type, typename second_range_type, typename delegate_type>
105  std::Invocable<delegate_type, std::invoke_result_t<fn_type,
106  size_t const,
107  first_range_type,
108  second_range_type>>
110  void execute(fn_type && func,
111  size_t const idx,
112  first_range_type first_range,
113  second_range_type second_range,
114  delegate_type && delegate)
115  {
116  static_assert(std::ranges::View<first_range_type>, "Expected a view!");
117  static_assert(std::ranges::View<second_range_type>, "Expected a view!");
118 
119  assert(state != nullptr);
120 
121  task_type task = [=, first_range = std::move(first_range), second_range = std::move(second_range)] ()
122  {
123  delegate(func(idx, std::move(first_range), std::move(second_range)));
124  };
125  // Asynchronously pushes the task to the queue.
126  [[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
127  assert(status == contrib::queue_op_status::success);
128  }
129 
131  void wait()
132  {
133  assert(state != nullptr);
134 
135  if (!state->is_waiting)
136  {
137  state->is_waiting = true;
138  state->queue.close();
139 
140  for (auto & t : state->thread_pool)
141  {
142  if (t.joinable())
143  t.join();
144  }
145  }
146  }
147 
148 private:
150  struct internal_state
151  {
153  std::vector<std::thread> thread_pool{};
155  contrib::fixed_buffer_queue<task_type> queue{10000};
157  bool is_waiting{false};
158  };
159 
161  std::unique_ptr<internal_state> state{nullptr};
162 };
163 
164 } // namespace seqan3
Provides platform and dependency checks.
Provides seqan3::buffer_queue.
Specifies the requirements of a Range type that has constant time copy, move and assignment operators...
Specifies whether the given callable is invocable with the given arguments.
T hardware_concurrency(T... args)
The Concepts library.
Adaptations of concepts from the Ranges TS.
Provides seqan3::detail::reader_writer_manager.
Definition: aligned_sequence_concept.hpp:35