blob: 210dbb766ac27d3fae5210d5e2541e131c6423df [file] [log] [blame]
Anthony Barbier871448e2017-03-24 14:54:29 +00001/*
2 * Copyright (c) 2016, 2017 ARM Limited.
3 *
4 * SPDX-License-Identifier: MIT
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24#include "arm_compute/runtime/CPP/CPPScheduler.h"
25
26#include "arm_compute/core/CPP/ICPPKernel.h"
27#include "arm_compute/core/Error.h"
28#include "arm_compute/core/Helpers.h"
29#include "arm_compute/core/Utils.h"
30
31#include <iostream>
32#include <semaphore.h>
33#include <system_error>
34#include <thread>
35
36using namespace arm_compute;
37
38#ifdef NO_MULTI_THREADING
39namespace
40{
41void delete_threads(Thread *t)
42{
43}
44}
45#else /* NO_MULTI_THREADING */
46class arm_compute::Thread
47{
48public:
49 /** Start a new thread
50 */
51 Thread();
52 Thread(const Thread &) = delete;
53 Thread &operator=(const Thread &) = delete;
54 Thread(Thread &&) = delete;
55 Thread &operator=(Thread &&) = delete;
56 /** Make the thread join
57 */
58 ~Thread();
59 /** Request the worker thread to start executing the given kernel
60 * This function will return as soon as the kernel has been sent to the worker thread.
61 * wait() needs to be called to ensure the execution is complete.
62 */
63 void start(ICPPKernel *kernel, const Window &window);
64 /** Wait for the current kernel execution to complete
65 */
66 void wait();
67 /** Function ran by the worker thread
68 */
69 void worker_thread();
70
71private:
72 std::thread _thread;
73 ICPPKernel *_kernel{ nullptr };
74 Window _window;
75 sem_t _wait_for_work;
76 sem_t _job_complete;
77 std::exception_ptr _current_exception;
78};
79
80Thread::Thread()
81 : _thread(), _window(), _wait_for_work(), _job_complete(), _current_exception(nullptr)
82{
83 int ret = sem_init(&_wait_for_work, 0, 0);
84 ARM_COMPUTE_ERROR_ON(ret < 0);
85 ARM_COMPUTE_UNUSED(ret);
86
87 ret = sem_init(&_job_complete, 0, 0);
88 ARM_COMPUTE_ERROR_ON(ret < 0);
89 ARM_COMPUTE_UNUSED(ret);
90
91 _thread = std::thread(&Thread::worker_thread, this);
92}
93
94Thread::~Thread()
95{
96 ARM_COMPUTE_ERROR_ON(!_thread.joinable());
97
98 start(nullptr, Window());
99 _thread.join();
100
101 int ret = sem_destroy(&_wait_for_work);
102 ARM_COMPUTE_ERROR_ON(ret < 0);
103 ARM_COMPUTE_UNUSED(ret);
104
105 ret = sem_destroy(&_job_complete);
106 ARM_COMPUTE_ERROR_ON(ret < 0);
107 ARM_COMPUTE_UNUSED(ret);
108}
109
110void Thread::start(ICPPKernel *kernel, const Window &window)
111{
112 _kernel = kernel;
113 _window = window;
114 int ret = sem_post(&_wait_for_work);
115 ARM_COMPUTE_UNUSED(ret);
116 ARM_COMPUTE_ERROR_ON(ret < 0);
117}
118
119void Thread::wait()
120{
121 int ret = sem_wait(&_job_complete);
122 ARM_COMPUTE_UNUSED(ret);
123 ARM_COMPUTE_ERROR_ON(ret < 0);
124 if(_current_exception)
125 {
126 std::rethrow_exception(_current_exception);
127 }
128}
129
130void Thread::worker_thread()
131{
132 while(sem_wait(&_wait_for_work) >= 0)
133 {
134 _current_exception = nullptr;
135 // Time to exit
136 if(_kernel == nullptr)
137 {
138 return;
139 }
140
141 try
142 {
143 _window.validate();
144 _kernel->run(_window);
145 }
146 catch(...)
147 {
148 _current_exception = std::current_exception();
149 }
150 int ret = sem_post(&_job_complete);
151 ARM_COMPUTE_UNUSED(ret);
152 ARM_COMPUTE_ERROR_ON(ret < 0);
153 }
154
155 ARM_COMPUTE_ERROR("Wait failed");
156}
157
158namespace
159{
160void delete_threads(Thread *t)
161{
162 delete[] t;
163}
164} // namespace
165#endif /* NO_MULTI_THREADING */
166
167CPPScheduler &CPPScheduler::get()
168{
169 static CPPScheduler scheduler;
170 return scheduler;
171}
172
173CPPScheduler::CPPScheduler()
174 : _num_threads(0), _threads(nullptr, delete_threads)
175{
176 force_number_of_threads(0);
177}
178
179void CPPScheduler::force_number_of_threads(int num_threads)
180{
181#ifdef NO_MULTI_THREADING
182 ARM_COMPUTE_ERROR_ON(num_threads > 1);
183 _num_threads = 1;
184#else /* NO_MULTI_THREADING */
185 _num_threads = num_threads > 0 ? num_threads : std::thread::hardware_concurrency();
186 ARM_COMPUTE_ERROR_ON(_num_threads < 1);
187
188 if(_num_threads > 1)
189 {
190 _threads = std::unique_ptr<Thread[], void (*)(Thread *)>(new Thread[_num_threads - 1], delete_threads);
191 }
192 else
193 {
194 _threads = nullptr;
195 }
196#endif /* NO_MULTI_THREADING */
197}
198
199void CPPScheduler::multithread(ICPPKernel *kernel, const size_t split_dimension)
200{
201 ARM_COMPUTE_ERROR_ON_MSG(!kernel, "The child class didn't set the kernel");
202
203 /** [Scheduler example] */
204 const Window &max_window = kernel->window();
205 const int num_iterations = max_window.num_iterations(split_dimension);
206 int num_threads = std::min(num_iterations, _num_threads);
207
208 if(!kernel->is_parallelisable() || 1 == num_threads)
209 {
210 kernel->run(max_window);
211 }
212#ifndef NO_MULTI_THREADING
213 else
214 {
215 for(int t = 0; t < num_threads; ++t)
216 {
217 Window win = max_window.split_window(split_dimension, t, num_threads);
218 win.set_thread_id(t);
219 win.set_num_threads(num_threads);
220
221 if(t != num_threads - 1)
222 {
223 _threads[t].start(kernel, win);
224 }
225 else
226 {
227 kernel->run(win);
228 }
229 }
230
231 try
232 {
233 for(int t = 1; t < num_threads; ++t)
234 {
235 _threads[t - 1].wait();
236 }
237 }
238 catch(const std::system_error &e)
239 {
240 std::cout << "Caught system_error with code " << e.code() << " meaning " << e.what() << '\n';
241 }
242 }
243#endif /* NO_MULTI_THREADING */
244 /** [Scheduler example] */
245}