SourceXtractorPlusPlus
0.11
Please provide a description of the project.
SEImplementation
SEImplementation
Measurement
MultithreadedMeasurement.h
Go to the documentation of this file.
1
17
/*
18
* Multithreadedmeasurement->h
19
*
20
* Created on: May 17, 2018
21
* Author: mschefer
22
*/
23
24
#ifndef _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
25
#define _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_
26
27
#include <atomic>
28
#include <thread>
29
#include <mutex>
30
#include <condition_variable>
31
#include <atomic>
32
33
#include "
SEFramework/Pipeline/Measurement.h
"
34
35
namespace
SourceXtractor
{
36
37
class
MultithreadedMeasurement
:
public
Measurement
{
38
public
:
39
40
using
SourceToRowConverter
=
std::function
<
Euclid::Table::Row
(
const
SourceInterface
&)>;
41
MultithreadedMeasurement
(
SourceToRowConverter
source_to_row,
int
worker_threads_nb)
42
:
m_source_to_row
(source_to_row),
43
m_worker_threads_nb
(worker_threads_nb),
44
m_active_threads
(0),
45
m_group_counter
(0),
46
m_input_done
(false),
m_abort_raised
(false) {}
47
48
void
handleMessage
(
const
std::shared_ptr<SourceGroupInterface>
& source_group)
override
;
49
50
void
startThreads
()
override
;
51
void
waitForThreads
()
override
;
52
53
public
:
54
static
std::recursive_mutex
g_global_mutex
;
55
56
private
:
57
static
void
workerThreadStatic
(
MultithreadedMeasurement
* measurement,
int
id
);
58
static
void
outputThreadStatic
(
MultithreadedMeasurement
* measurement,
int
id
);
59
void
workerThreadLoop
();
60
void
outputThreadLoop
();
61
62
SourceToRowConverter
m_source_to_row
;
63
64
std::shared_ptr<std::thread>
m_output_thread
;
65
66
int
m_worker_threads_nb
;
67
std::vector<std::shared_ptr<std::thread>
>
m_worker_threads
;
68
69
int
m_active_threads
;
70
std::mutex
m_active_threads_mutex
;
71
72
int
m_group_counter
;
73
std::atomic_bool
m_input_done
,
m_abort_raised
;
74
std::condition_variable
m_new_input
;
75
std::list<std::pair<int, std::shared_ptr<SourceGroupInterface>
>>
m_input_queue
;
76
std::mutex
m_input_queue_mutex
;
77
78
std::condition_variable
m_new_output
;
79
std::list<std::pair<int, std::shared_ptr<SourceGroupInterface>
>>
m_output_queue
;
80
std::mutex
m_output_queue_mutex
;
81
};
82
83
}
84
85
#endif
/* _SEIMPLEMENTATION_OUTPUT_MULTITHREADEDMEASUREMENT_H_ */
std::shared_ptr< SourceGroupInterface >
std::list
STL class.
std::vector
STL class.
SourceXtractor::MultithreadedMeasurement::m_new_input
std::condition_variable m_new_input
Definition:
MultithreadedMeasurement.h:74
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::shared_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:64
std::recursive_mutex
STL class.
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement, int id)
Definition:
MultithreadedMeasurement.cpp:100
SourceXtractor::MultithreadedMeasurement::handleMessage
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
Definition:
MultithreadedMeasurement.cpp:69
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:73
SourceXtractor::MultithreadedMeasurement::m_worker_threads
std::vector< std::shared_ptr< std::thread > > m_worker_threads
Definition:
MultithreadedMeasurement.h:67
std::function< Euclid::Table::Row(const SourceInterface &)>
SourceXtractor::MultithreadedMeasurement::g_global_mutex
static std::recursive_mutex g_global_mutex
Definition:
MultithreadedMeasurement.h:54
SourceXtractor::MultithreadedMeasurement::m_input_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_input_queue
Definition:
MultithreadedMeasurement.h:75
SourceXtractor::Measurement
Definition:
Measurement.h:36
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:159
SourceXtractor
Definition:
Aperture.h:30
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:73
Measurement.h
SourceXtractor::MultithreadedMeasurement::m_worker_threads_nb
int m_worker_threads_nb
Definition:
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:62
SourceXtractor::MultithreadedMeasurement::workerThreadLoop
void workerThreadLoop()
Definition:
MultithreadedMeasurement.cpp:116
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:80
SourceXtractor::MultithreadedMeasurement::MultithreadedMeasurement
MultithreadedMeasurement(SourceToRowConverter source_to_row, int worker_threads_nb)
Definition:
MultithreadedMeasurement.h:41
SourceXtractor::MultithreadedMeasurement::m_active_threads_mutex
std::mutex m_active_threads_mutex
Definition:
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:78
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:79
SourceXtractor::MultithreadedMeasurement::waitForThreads
void waitForThreads() override
Definition:
MultithreadedMeasurement.cpp:50
SourceXtractor::MultithreadedMeasurement::m_input_queue_mutex
std::mutex m_input_queue_mutex
Definition:
MultithreadedMeasurement.h:76
std::condition_variable
std::mutex
STL class.
Euclid::Table::Row
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:39
SourceXtractor::SourceInterface
The SourceInterface is an abstract "source" that has properties attached to it.
Definition:
SourceInterface.h:46
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:37
SourceXtractor::MultithreadedMeasurement::workerThreadStatic
static void workerThreadStatic(MultithreadedMeasurement *measurement, int id)
Definition:
MultithreadedMeasurement.cpp:84
SourceXtractor::MultithreadedMeasurement::m_active_threads
int m_active_threads
Definition:
MultithreadedMeasurement.h:69
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:72
Generated by
1.8.18