SourceXtractorPlusPlus
0.11
Please provide a description of the project.
SEImplementation
src
lib
Measurement
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17
/*
18
* MultiThreadedMeasurement.cpp
19
*
20
* Created on: May 23, 2018
21
* Author: mschefer
22
*/
23
24
#include <iostream>
25
#include <chrono>
26
#include <atomic>
27
#include <
ElementsKernel/Logging.h
>
28
#include <csignal>
29
30
#include "
SEImplementation/Plugin/SourceIDs/SourceID.h
"
31
#include "
SEImplementation/Measurement/MultithreadedMeasurement.h
"
32
33
using namespace
SourceXtractor
;
34
35
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Multithreading"
);
36
37
std::recursive_mutex
MultithreadedMeasurement::g_global_mutex
;
38
39
void
MultithreadedMeasurement::startThreads
() {
40
// Start worker threads
41
m_active_threads
=
m_worker_threads_nb
;
42
for
(
int
i=0; i<
m_worker_threads_nb
; i++) {
43
m_worker_threads
.
emplace_back
(std::make_shared<std::thread>(
workerThreadStatic
,
this
, i));
44
}
45
46
// Start output thread
47
m_output_thread
= std::make_shared<std::thread>(
outputThreadStatic
,
this
,
m_worker_threads_nb
);
48
}
49
50
void
MultithreadedMeasurement::waitForThreads
() {
51
logger
.
debug
() <<
"Waiting for worker threads"
;
52
53
// set flag to indicate no new input will be coming
54
{
55
std::unique_lock<std::mutex>
input_lock(
m_input_queue_mutex
);
56
m_input_done
=
true
;
57
m_new_input
.
notify_all
();
58
}
59
60
// Wait for all threads to finish
61
for
(
int
i=0; i<
m_worker_threads_nb
; i++) {
62
m_worker_threads
[i]->join();
63
}
64
m_output_thread
->
join
();
65
66
logger
.
debug
() <<
"All worker threads done!"
;
67
}
68
69
void
MultithreadedMeasurement::handleMessage
(
const
std::shared_ptr<SourceGroupInterface>
& source_group) {
70
std::unique_lock<std::mutex>
input_lock(
m_input_queue_mutex
);
71
72
//Force computation of SourceID here, where the order is still deterministic
73
for
(
auto
& source : *source_group) {
74
source.getProperty<
SourceID
>();
75
}
76
77
// put the new SourceGroup into the input queue
78
m_input_queue
.emplace_back(
m_group_counter
++, source_group);
79
80
// notify one worker thread that there is an available input
81
m_new_input
.
notify_one
();
82
}
83
84
void
MultithreadedMeasurement::workerThreadStatic
(
MultithreadedMeasurement
* measurement,
int
id
) {
85
logger
.
debug
() <<
"Starting worker thread "
<< id;
86
try
{
87
measurement->
workerThreadLoop
();
88
}
89
catch
(
const
Elements::Exception
&
e
) {
90
logger
.
fatal
() <<
"Worker thread "
<<
id
<<
" got an exception!"
;
91
logger
.
fatal
() <<
e
.what();
92
if
(!measurement->
m_abort_raised
.exchange(
true
)) {
93
logger
.
fatal
() <<
"Aborting the execution"
;
94
::raise(SIGTERM);
95
}
96
}
97
logger
.
debug
() <<
"Stopping worker thread "
<< id;
98
}
99
100
void
MultithreadedMeasurement::outputThreadStatic
(
MultithreadedMeasurement
* measurement,
int
id
) {
101
logger
.
debug
() <<
"Starting output thread "
<< id;
102
try
{
103
measurement->
outputThreadLoop
();
104
}
105
catch
(
const
Elements::Exception
&
e
) {
106
logger
.
fatal
() <<
"Output thread got an exception!"
;
107
logger
.
fatal
() <<
e
.what();
108
if
(!measurement->
m_abort_raised
.exchange(
true
)) {
109
logger
.
fatal
() <<
"Aborting the execution"
;
110
::raise(SIGTERM);
111
}
112
}
113
logger
.
debug
() <<
"Stopping output thread "
<< id;
114
}
115
116
void
MultithreadedMeasurement::workerThreadLoop
() {
117
while
(
true
) {
118
int
order_number;
119
std::shared_ptr<SourceGroupInterface>
source_group;
120
{
121
std::unique_lock<std::mutex>
input_lock(
m_input_queue_mutex
);
122
123
// We should end the thread once we're done with all input
124
if
(
m_input_done
&&
m_input_queue
.empty()) {
125
break
;
126
}
127
128
// If the queue is empty but we expect more data, wait
129
if
(
m_input_queue
.empty()) {
130
m_new_input
.
wait_for
(input_lock,
std::chrono::milliseconds
(100));
131
continue
;
132
}
133
134
order_number =
m_input_queue
.front().first;
135
source_group =
m_input_queue
.front().second;
136
m_input_queue
.pop_front();
137
}
138
139
// Trigger measurements
140
for
(
auto
& source : *source_group) {
141
m_source_to_row
(source);
142
}
143
144
{
145
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
146
m_output_queue
.emplace_back(order_number, source_group);
147
source_group =
nullptr
;
148
m_new_output
.
notify_one
();
149
}
150
}
151
152
// Before ending the thread, decrement active threads counter
153
{
154
std::unique_lock<std::mutex>
active_threads_lock(
m_active_threads_mutex
);
155
m_active_threads
--;
156
}
157
}
158
159
void
MultithreadedMeasurement::outputThreadLoop
() {
160
while
(
true
) {
161
{
162
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
163
164
// Wait for something in the output queue
165
if
(
m_output_queue
.empty()) {
166
m_new_output
.
wait_for
(output_lock,
std::chrono::milliseconds
(100));
167
}
168
169
// Process the output queue
170
while
(!
m_output_queue
.empty()) {
171
notifyObservers
(
m_output_queue
.front().second);
172
m_output_queue
.pop_front();
173
}
174
}
175
176
{
177
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
178
std::unique_lock<std::mutex>
active_threads_lock(
m_active_threads_mutex
);
179
if
(
m_active_threads
<= 0 &&
m_output_queue
.empty()) {
180
break
;
181
}
182
}
183
}
184
}
185
186
SourceXtractor::Observable< std::shared_ptr< SourceGroupInterface > >::notifyObservers
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition:
Observable.h:71
std::shared_ptr< SourceGroupInterface >
Elements::Logging
SourceXtractor::MultithreadedMeasurement::m_new_input
std::condition_variable m_new_input
Definition:
MultithreadedMeasurement.h:74
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::shared_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:64
std::chrono::milliseconds
std::recursive_mutex
STL class.
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement, int id)
Definition:
MultithreadedMeasurement.cpp:100
SourceXtractor::MultithreadedMeasurement::handleMessage
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
Definition:
MultithreadedMeasurement.cpp:69
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:73
SourceXtractor::MultithreadedMeasurement::m_worker_threads
std::vector< std::shared_ptr< std::thread > > m_worker_threads
Definition:
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::g_global_mutex
static std::recursive_mutex g_global_mutex
Definition:
MultithreadedMeasurement.h:54
SourceXtractor::MultithreadedMeasurement::m_input_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_input_queue
Definition:
MultithreadedMeasurement.h:75
SourceID.h
SourceXtractor::SourceID
Definition:
SourceID.h:33
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:159
SourceXtractor
Definition:
Aperture.h:30
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:73
std::unique_lock
STL class.
SourceXtractor::MultithreadedMeasurement::m_worker_threads_nb
int m_worker_threads_nb
Definition:
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:62
Elements::Exception
Elements::Logging::debug
void debug(const std::string &logMessage)
SourceXtractor::MultithreadedMeasurement::workerThreadLoop
void workerThreadLoop()
Definition:
MultithreadedMeasurement.cpp:116
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:80
SourceXtractor::MultithreadedMeasurement::m_active_threads_mutex
std::mutex m_active_threads_mutex
Definition:
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:78
Elements::Logging::fatal
void fatal(const std::string &logMessage)
std::condition_variable::wait_for
T wait_for(T... args)
SourceXtractor::logger
static Elements::Logging logger
Definition:
PluginManager.cpp:45
e
constexpr double e
std::condition_variable::notify_one
T notify_one(T... args)
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
std::vector::emplace_back
T emplace_back(T... args)
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:79
SourceXtractor::MultithreadedMeasurement::waitForThreads
void waitForThreads() override
Definition:
MultithreadedMeasurement.cpp:50
SourceXtractor::MultithreadedMeasurement::m_input_queue_mutex
std::mutex m_input_queue_mutex
Definition:
MultithreadedMeasurement.h:76
MultithreadedMeasurement.h
Logging.h
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:39
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:37
SourceXtractor::MultithreadedMeasurement::workerThreadStatic
static void workerThreadStatic(MultithreadedMeasurement *measurement, int id)
Definition:
MultithreadedMeasurement.cpp:84
SourceXtractor::MultithreadedMeasurement::m_active_threads
int m_active_threads
Definition:
MultithreadedMeasurement.h:69
std::condition_variable::notify_all
T notify_all(T... args)
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:72
std::thread::join
T join(T... args)
Generated by
1.8.18