Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #include "tbb_stddef.h"
21 #include "atomic.h"
22 #include "spin_mutex.h"
23 #include "null_mutex.h"
24 #include "spin_rw_mutex.h"
25 #include "null_rw_mutex.h"
26 #include "task.h"
28 #include "tbb_exception.h"
31 #include "tbb_profiling.h"
32 #include "task_arena.h"
33 
34 #if __TBB_PREVIEW_ASYNC_MSG
35 #include <vector> // std::vector in internal::async_storage
36 #include <memory> // std::shared_ptr in async_msg
37 #endif
38 
39 #if __TBB_PREVIEW_STREAMING_NODE
40 // For streaming_node
41 #include <array> // std::array
42 #include <unordered_map> // std::unordered_map
43 #include <type_traits> // std::decay, std::true_type, std::false_type
44 #endif // __TBB_PREVIEW_STREAMING_NODE
45 
46 #if TBB_DEPRECATED_FLOW_ENQUEUE
47 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
48 #else
49 #define FLOW_SPAWN(a) tbb::task::spawn((a))
50 #endif
51 
52 // use the VC10 or gcc version of tuple if it is available.
53 #if __TBB_CPP11_TUPLE_PRESENT
54  #include <tuple>
55 namespace tbb {
56  namespace flow {
57  using std::tuple;
58  using std::tuple_size;
59  using std::tuple_element;
60  using std::get;
61  }
62 }
63 #else
64  #include "compat/tuple"
65 #endif
66 
67 #include<list>
68 #include<queue>
69 
80 namespace tbb {
81 namespace flow {
82 
84 enum concurrency { unlimited = 0, serial = 1 };
85 
86 namespace interface10 {
87 
89 struct null_type {};
90 
92 class continue_msg {};
93 
95 template< typename T > class sender;
96 template< typename T > class receiver;
98 } // namespaceX
99 namespace interface11 {
100 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
101 }
102 namespace interface10 {
103 template< typename R, typename B > class run_and_put_task;
104 
105 namespace internal {
107 template<typename T, typename M> class successor_cache;
108 template<typename T, typename M> class broadcast_cache;
109 template<typename T, typename M> class round_robin_cache;
110 template<typename T, typename M> class predecessor_cache;
111 template<typename T, typename M> class reservable_predecessor_cache;
113 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
114 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
115 // C == receiver< ... > or sender< ... >, depending.
116 template<typename C>
117 class edge_container {
118 
119 public:
120  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
122  void add_edge(C &s) {
123  built_edges.push_back(&s);
124  }
126  void delete_edge(C &s) {
127  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
128  if (*i == &s) {
129  (void)built_edges.erase(i);
130  return; // only remove one predecessor per request
131  }
132  }
133  }
135  void copy_edges(edge_list_type &v) {
136  v = built_edges;
137  }
139  size_t edge_count() {
140  return (size_t)(built_edges.size());
141  }
143  void clear() {
144  built_edges.clear();
145  }
147  // methods remove the statement from all predecessors/successors liste in the edge
148  // container.
149  template< typename S > void sender_extract(S &s);
150  template< typename R > void receiver_extract(R &r);
152 private:
153  edge_list_type built_edges;
154 }; // class edge_container
155 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
157 } // namespace internal
158 
159 } // namespace interface10
160 } // namespace flow
161 } // namespace tbb
165 
166 namespace tbb {
167 namespace flow {
168 namespace interface10 {
170 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
171 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
172  // if no RHS task, don't change left.
173  if (right == NULL) return left;
174  // right != NULL
175  if (left == NULL) return right;
176  if (left == SUCCESSFULLY_ENQUEUED) return right;
177  // left contains a task
178  if (right != SUCCESSFULLY_ENQUEUED) {
179  // both are valid tasks
181  return right;
182  }
183  return left;
184 }
186 #if __TBB_PREVIEW_ASYNC_MSG
188 template < typename T > class async_msg;
190 namespace internal {
192 template < typename T > class async_storage;
194 template< typename T, typename = void >
197  typedef T filtered_type;
199  static const bool is_async_type = false;
201  static const void* to_void_ptr(const T& t) {
202  return static_cast<const void*>(&t);
203  }
205  static void* to_void_ptr(T& t) {
206  return static_cast<void*>(&t);
207  }
209  static const T& from_void_ptr(const void* p) {
210  return *static_cast<const T*>(p);
211  }
213  static T& from_void_ptr(void* p) {
214  return *static_cast<T*>(p);
215  }
217  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
218  if (is_async) {
219  // This (T) is NOT async and incoming 'A<X> t' IS async
220  // Get data from async_msg
222  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
223  // finalize() must be called after subscribe() because set() can be called in finalize()
224  // and 'this_recv' client must be subscribed by this moment
225  msg.finalize();
226  return new_task;
227  }
228  else {
229  // Incoming 't' is NOT async
230  return this_recv->try_put_task(from_void_ptr(p));
231  }
232  }
233 };
235 template< typename T >
236 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
237  typedef T async_type;
238  typedef typename T::async_msg_data_type filtered_type;
240  static const bool is_async_type = true;
242  // Receiver-classes use const interfaces
243  static const void* to_void_ptr(const T& t) {
244  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
245  }
247  static void* to_void_ptr(T& t) {
248  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
249  }
251  // Sender-classes use non-const interfaces
252  static const T& from_void_ptr(const void* p) {
253  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
254  }
256  static T& from_void_ptr(void* p) {
257  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
258  }
260  // Used in receiver<T> class
261  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
262  if (is_async) {
263  // Both are async
264  return this_recv->try_put_task(from_void_ptr(p));
265  }
266  else {
267  // This (T) is async and incoming 'X t' is NOT async
268  // Create async_msg for X
270  const T msg(t);
271  return this_recv->try_put_task(msg);
272  }
273  }
274 };
275 
276 class untyped_receiver;
279  template< typename, typename > friend class internal::predecessor_cache;
280  template< typename, typename > friend class internal::reservable_predecessor_cache;
281 public:
285  virtual ~untyped_sender() {}
287  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
288 
289  // TODO: Prevent untyped successor registration
290 
292  virtual bool register_successor( successor_type &r ) = 0;
295  virtual bool remove_successor( successor_type &r ) = 0;
298  virtual bool try_release( ) { return false; }
301  virtual bool try_consume( ) { return false; }
302 
303 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
304  typedef internal::edge_container<successor_type> built_successors_type;
306  typedef built_successors_type::edge_list_type successor_list_type;
307  virtual built_successors_type &built_successors() = 0;
308  virtual void internal_add_built_successor( successor_type & ) = 0;
309  virtual void internal_delete_built_successor( successor_type & ) = 0;
310  virtual void copy_successors( successor_list_type &) = 0;
311  virtual size_t successor_count() = 0;
312 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
313 protected:
315  template< typename X >
316  bool try_get( X &t ) {
318  }
321  template< typename X >
322  bool try_reserve( X &t ) {
324  }
326  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
327  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
328 };
331  template< typename, typename > friend class run_and_put_task;
332 
333  template< typename, typename > friend class internal::broadcast_cache;
334  template< typename, typename > friend class internal::round_robin_cache;
335  template< typename, typename > friend class internal::successor_cache;
337 #if __TBB_PREVIEW_OPENCL_NODE
338  template< typename, typename > friend class proxy_dependency_receiver;
339 #endif /* __TBB_PREVIEW_OPENCL_NODE */
340 public:
345  virtual ~untyped_receiver() {}
348  template<typename X>
349  bool try_put(const X& t) {
350  task *res = try_put_task(t);
351  if (!res) return false;
353  return true;
354  }
356  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
358  // TODO: Prevent untyped predecessor registration
361  virtual bool register_predecessor( predecessor_type & ) { return false; }
364  virtual bool remove_predecessor( predecessor_type & ) { return false; }
365 
366 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
367  typedef internal::edge_container<predecessor_type> built_predecessors_type;
368  typedef built_predecessors_type::edge_list_type predecessor_list_type;
369  virtual built_predecessors_type &built_predecessors() = 0;
370  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
371  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
372  virtual void copy_predecessors( predecessor_list_type & ) = 0;
373  virtual size_t predecessor_count() = 0;
374 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
375 protected:
376  template<typename X>
377  task *try_put_task(const X& t) {
379  }
381  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
383  virtual graph& graph_reference() = 0;
385  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
388  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
389 
390  virtual bool is_continue_receiver() { return false; }
391 };
393 } // namespace internal
396 template< typename T >
398 public:
400  typedef T output_type;
403 
405  virtual bool try_get( T & ) { return false; }
408  virtual bool try_reserve( T & ) { return false; }
410 protected:
411  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
412  // Both async OR both are NOT async
415  }
416  // Else: this (T) is async OR incoming 't' is async
417  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
418  return false;
419  }
421  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
422  // Both async OR both are NOT async
425  }
426  // Else: this (T) is async OR incoming 't' is async
427  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
428  return false;
429  }
430 }; // class sender<T>
433 template< typename T >
435  template< typename > friend class internal::async_storage;
436  template< typename, typename > friend struct internal::async_helpers;
437 public:
439  typedef T input_type;
446  }
450  }
452 protected:
453  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
455  }
458  virtual task *try_put_task(const T& t) = 0;
460 }; // class receiver<T>
462 #else // __TBB_PREVIEW_ASYNC_MSG
465 template< typename T >
466 class sender {
467 public:
469  typedef T output_type;
474  virtual ~sender() {}
476  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
479  virtual bool register_successor( successor_type &r ) = 0;
482  virtual bool remove_successor( successor_type &r ) = 0;
483 
485  virtual bool try_get( T & ) { return false; }
488  virtual bool try_reserve( T & ) { return false; }
491  virtual bool try_release( ) { return false; }
494  virtual bool try_consume( ) { return false; }
496 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
497  typedef typename internal::edge_container<successor_type> built_successors_type;
499  typedef typename built_successors_type::edge_list_type successor_list_type;
500  virtual built_successors_type &built_successors() = 0;
501  virtual void internal_add_built_successor( successor_type & ) = 0;
502  virtual void internal_delete_built_successor( successor_type & ) = 0;
503  virtual void copy_successors( successor_list_type &) = 0;
504  virtual size_t successor_count() = 0;
505 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
506 }; // class sender<T>
507 
509 template< typename T >
510 class receiver {
511 public:
513  typedef T input_type;
516  typedef sender<T> predecessor_type;
519  virtual ~receiver() {}
522  bool try_put( const T& t ) {
523  task *res = try_put_task(t);
524  if (!res) return false;
526  return true;
527  }
528 
530 protected:
531  template< typename R, typename B > friend class run_and_put_task;
532  template< typename X, typename Y > friend class internal::broadcast_cache;
533  template< typename X, typename Y > friend class internal::round_robin_cache;
534  virtual task *try_put_task(const T& t) = 0;
535  virtual graph& graph_reference() = 0;
536 public:
537  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
540  virtual bool register_predecessor( predecessor_type & ) { return false; }
543  virtual bool remove_predecessor( predecessor_type & ) { return false; }
545 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
546  typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
547  typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
548  virtual built_predecessors_type &built_predecessors() = 0;
549  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
550  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
551  virtual void copy_predecessors( predecessor_list_type & ) = 0;
552  virtual size_t predecessor_count() = 0;
553 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
555 protected:
558 
559  template<typename TT, typename M> friend class internal::successor_cache;
560  virtual bool is_continue_receiver() { return false; }
562 #if __TBB_PREVIEW_OPENCL_NODE
563  template< typename, typename > friend class proxy_dependency_receiver;
564 #endif /* __TBB_PREVIEW_OPENCL_NODE */
565 }; // class receiver<T>
567 #endif // __TBB_PREVIEW_ASYNC_MSG
571 class continue_receiver : public receiver< continue_msg > {
572 public:
582  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
583  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
584  my_current_count = 0;
585  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
586  }
592  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
593  }
599  return true;
600  }
601 
603 
609  return true;
610  }
611 
612 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
613  typedef internal::edge_container<predecessor_type> built_predecessors_type;
614  typedef built_predecessors_type::edge_list_type predecessor_list_type;
615  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
617  void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
619  my_built_predecessors.add_edge( s );
620  }
621 
622  void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
624  my_built_predecessors.delete_edge(s);
625  }
627  void copy_predecessors( predecessor_list_type &v) __TBB_override {
629  my_built_predecessors.copy_edges(v);
630  }
632  size_t predecessor_count() __TBB_override {
634  return my_built_predecessors.edge_count();
635  }
637 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
639 protected:
640  template< typename R, typename B > friend class run_and_put_task;
641  template<typename X, typename Y> friend class internal::broadcast_cache;
642  template<typename X, typename Y> friend class internal::round_robin_cache;
643  // execute body is supposed to be too small to create a task for.
645  {
649  else
651  }
652  task * res = execute();
653  return res? res : SUCCESSFULLY_ENQUEUED;
654  }
656 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
657  // continue_receiver must contain its own built_predecessors because it does
658  // not have a node_cache.
659  built_predecessors_type my_built_predecessors;
660 #endif
666  // the friend declaration in the base class did not eliminate the "protected class"
667  // error in gcc 4.1.2
668  template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
672  if (f & rf_clear_edges) {
673 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
674  my_built_predecessors.clear();
675 #endif
677  }
678  }
683  virtual task * execute() = 0;
684  template<typename TT, typename M> friend class internal::successor_cache;
685  bool is_continue_receiver() __TBB_override { return true; }
687 }; // class continue_receiver
689 } // interfaceX
691 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
692  template <typename K, typename T>
693  K key_from_message( const T &t ) {
694  return t.key();
695  }
696 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
701 } // flow
702 } // tbb
703 
707 namespace tbb {
708 namespace flow {
709 namespace interface10 {
714 #if __TBB_PREVIEW_ASYNC_MSG
716 #endif
719 template <typename C, typename N>
720 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
721 {
722  if (begin) current_node = my_graph->my_nodes;
723  //else it is an end iterator by default
724 }
725 
726 template <typename C, typename N>
728  __TBB_ASSERT(current_node, "graph_iterator at end");
729  return *operator->();
730 }
731 
732 template <typename C, typename N>
734  return current_node;
735 }
737 template <typename C, typename N>
739  if (current_node) current_node = current_node->next;
740 }
741 
743 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
745  own_context = true;
746  cancelled = false;
747  caught_exception = false;
748  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
752  my_is_active = true;
753 }
754 
755 inline graph::graph(task_group_context& use_this_context) :
756  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
758  own_context = false;
762  my_is_active = true;
763 }
764 
765 inline graph::~graph() {
766  wait_for_all();
768  tbb::task::destroy(*my_root_task);
769  if (own_context) delete my_context;
770  delete my_task_arena;
771 }
773 inline void graph::reserve_wait() {
774  if (my_root_task) {
777  }
778 }
780 inline void graph::release_wait() {
781  if (my_root_task) {
784  }
785 }
788  n->next = NULL;
789  {
793  my_nodes_last = n;
794  if (!my_nodes) my_nodes = n;
795  }
796 }
799  {
801  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
802  if (n->prev) n->prev->next = n->next;
803  if (n->next) n->next->prev = n->prev;
804  if (my_nodes_last == n) my_nodes_last = n->prev;
805  if (my_nodes == n) my_nodes = n->next;
806  }
807  n->prev = n->next = NULL;
808 }
809 
810 inline void graph::reset( reset_flags f ) {
811  // reset context
813 
814  if(my_context) my_context->reset();
815  cancelled = false;
816  caught_exception = false;
817  // reset all the nodes comprising the graph
818  for(iterator ii = begin(); ii != end(); ++ii) {
819  graph_node *my_p = &(*ii);
820  my_p->reset_node(f);
821  }
822  // Reattach the arena. Might be useful to run the graph in a particular task_arena
823  // while not limiting graph lifetime to a single task_arena::execute() call.
824  prepare_task_arena( /*reinit=*/true );
826  // now spawn the tasks necessary to start the graph
827  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
828  internal::spawn_in_graph_arena(*this, *(*rti));
829  }
830  my_reset_task_list.clear();
831 }
832 
833 inline graph::iterator graph::begin() { return iterator(this, true); }
834 
835 inline graph::iterator graph::end() { return iterator(this, false); }
836 
837 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
838 
839 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
840 
841 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
842 
843 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
844 
845 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
846 inline void graph::set_name(const char *name) {
848 }
849 #endif
851 inline graph_node::graph_node(graph& g) : my_graph(g) {
852  my_graph.register_node(this);
853 }
856  my_graph.remove_node(this);
857 }
858 
862 template < typename Output >
863 class source_node : public graph_node, public sender< Output > {
864 public:
866  typedef Output output_type;
871  //Source node has no input type
873 
874 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
875  typedef typename sender<output_type>::built_successors_type built_successors_type;
876  typedef typename sender<output_type>::successor_list_type successor_list_type;
877 #endif
878 
880  template< typename Body >
881  source_node( graph &g, Body body, bool is_active = true )
882  : graph_node(g), my_active(is_active), init_my_active(is_active),
883  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
884  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
885  my_reserved(false), my_has_cached_item(false)
886  {
887  my_successors.set_owner(this);
888  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
889  static_cast<sender<output_type> *>(this), this->my_body );
890  }
893  source_node( const source_node& src ) :
894  graph_node(src.my_graph), sender<Output>(),
895  my_active(src.init_my_active),
896  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
897  my_reserved(false), my_has_cached_item(false)
898  {
899  my_successors.set_owner(this);
900  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
901  static_cast<sender<output_type> *>(this), this->my_body );
902  }
905  ~source_node() { delete my_body; delete my_init_body; }
906 
907 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
908  void set_name( const char *name ) __TBB_override {
910  }
911 #endif
912 
915  spin_mutex::scoped_lock lock(my_mutex);
916  my_successors.register_successor(r);
917  if ( my_active )
918  spawn_put();
919  return true;
920  }
921 
923  bool remove_successor( successor_type &r ) __TBB_override {
924  spin_mutex::scoped_lock lock(my_mutex);
925  my_successors.remove_successor(r);
926  return true;
927  }
928 
929 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
930 
931  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
933  void internal_add_built_successor( successor_type &r) __TBB_override {
934  spin_mutex::scoped_lock lock(my_mutex);
935  my_successors.internal_add_built_successor(r);
936  }
937 
938  void internal_delete_built_successor( successor_type &r) __TBB_override {
940  my_successors.internal_delete_built_successor(r);
941  }
942 
943  size_t successor_count() __TBB_override {
944  spin_mutex::scoped_lock lock(my_mutex);
945  return my_successors.successor_count();
946  }
948  void copy_successors(successor_list_type &v) __TBB_override {
950  my_successors.copy_successors(v);
951  }
952 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
953 
956  spin_mutex::scoped_lock lock(my_mutex);
957  if ( my_reserved )
958  return false;
959 
960  if ( my_has_cached_item ) {
961  v = my_cached_item;
962  my_has_cached_item = false;
963  return true;
964  }
965  // we've been asked to provide an item, but we have none. enqueue a task to
966  // provide one.
967  spawn_put();
968  return false;
969  }
973  spin_mutex::scoped_lock lock(my_mutex);
974  if ( my_reserved ) {
975  return false;
976  }
977 
978  if ( my_has_cached_item ) {
979  v = my_cached_item;
980  my_reserved = true;
981  return true;
982  } else {
983  return false;
984  }
985  }
986 
988 
990  spin_mutex::scoped_lock lock(my_mutex);
991  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
992  my_reserved = false;
993  if(!my_successors.empty())
994  spawn_put();
995  return true;
996  }
997 
1000  spin_mutex::scoped_lock lock(my_mutex);
1001  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1002  my_reserved = false;
1003  my_has_cached_item = false;
1004  if ( !my_successors.empty() ) {
1005  spawn_put();
1006  }
1007  return true;
1008  }
1009 
1011  void activate() {
1012  spin_mutex::scoped_lock lock(my_mutex);
1013  my_active = true;
1014  if (!my_successors.empty())
1015  spawn_put();
1016  }
1017 
1018  template<typename Body>
1019  Body copy_function_object() {
1020  internal::source_body<output_type> &body_ref = *this->my_body;
1021  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1022  }
1023 
1024 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1025  void extract( ) __TBB_override {
1026  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1027  my_active = init_my_active;
1028  my_reserved = false;
1029  if(my_has_cached_item) my_has_cached_item = false;
1030  }
1031 #endif
1032 
1033 protected:
1034 
1037  my_active = init_my_active;
1038  my_reserved =false;
1039  if(my_has_cached_item) {
1040  my_has_cached_item = false;
1041  }
1042  if(f & rf_clear_edges) my_successors.clear();
1044  internal::source_body<output_type> *tmp = my_init_body->clone();
1045  delete my_body;
1046  my_body = tmp;
1047  }
1048  if(my_active)
1049  internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1050  }
1051 
1052 private:
1054  bool my_active;
1063  // used by apply_body_bypass, can invoke body of node.
1066  if ( my_reserved ) {
1067  return false;
1068  }
1069  if ( !my_has_cached_item ) {
1071  bool r = (*my_body)(my_cached_item);
1072  tbb::internal::fgt_end_body( my_body );
1073  if (r) {
1074  my_has_cached_item = true;
1075  }
1076  }
1077  if ( my_has_cached_item ) {
1078  v = my_cached_item;
1079  my_reserved = true;
1080  return true;
1081  } else {
1082  return false;
1083  }
1084  }
1085 
1086  // when resetting, and if the source_node was created with my_active == true, then
1087  // when we reset the node we must store a task to run the node, and spawn it only
1088  // after the reset is complete and is_active() is again true. This is why we don't
1089  // test for is_active() here.
1091  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1093  }
1094 
1096  void spawn_put( ) {
1097  if(internal::is_graph_active(this->my_graph)) {
1098  internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1099  }
1100  }
1101 
1105  output_type v;
1106  if ( !try_reserve_apply_body(v) )
1107  return NULL;
1108 
1109  task *last_task = my_successors.try_put_task(v);
1110  if ( last_task )
1111  try_consume();
1112  else
1113  try_release();
1114  return last_task;
1115  }
1116 }; // class source_node
1117 
1119 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1120 class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1121 public:
1122  typedef Input input_type;
1123  typedef Output output_type;
1129 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1130  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1131  typedef typename fOutput_type::successor_list_type successor_list_type;
1132 #endif
1133  using input_impl_type::my_predecessors;
1134 
1136  // input_queue_type is allocated here, but destroyed in the function_input_base.
1137  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1138  // be done in one place. This would be an interface-breaking change.
1139  template< typename Body >
1141  graph &g, size_t concurrency,
1144  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1145  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1146  }
1147 
1150  graph_node(src.my_graph),
1151  input_impl_type(src),
1152  fOutput_type() {
1153  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1154  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1155  }
1156 
1157 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1158  void set_name( const char *name ) __TBB_override {
1160  }
1161 #endif
1162 
1163 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1164  void extract( ) __TBB_override {
1165  my_predecessors.built_predecessors().receiver_extract(*this);
1166  successors().built_successors().sender_extract(*this);
1167  }
1168 #endif
1169 
1170 protected:
1171  template< typename R, typename B > friend class run_and_put_task;
1172  template<typename X, typename Y> friend class internal::broadcast_cache;
1173  template<typename X, typename Y> friend class internal::round_robin_cache;
1174  using input_impl_type::try_put_task;
1175 
1176  internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1177 
1179  input_impl_type::reset_function_input(f);
1180  // TODO: use clear() instead.
1181  if(f & rf_clear_edges) {
1182  successors().clear();
1183  my_predecessors.clear();
1184  }
1185  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1186  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1187  }
1188 
1189 }; // class function_node
1190 
1192 // Output is a tuple of output types.
1193 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1195  public graph_node,
1197  <
1198  Input,
1199  typename internal::wrap_tuple_elements<
1200  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1201  internal::multifunction_output, // wrap this around each element
1202  Output // the tuple providing the types
1203  >::type,
1204  Policy,
1205  Allocator
1206  > {
1207 protected:
1209 public:
1210  typedef Input input_type;
1215 private:
1217  using input_impl_type::my_predecessors;
1218 public:
1219  template<typename Body>
1221  graph &g, size_t concurrency,
1223  ) : graph_node(g), base_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1224  tbb::internal::fgt_multioutput_node_with_body<N>(
1225  tbb::internal::FLOW_MULTIFUNCTION_NODE,
1226  &this->my_graph, static_cast<receiver<input_type> *>(this),
1227  this->output_ports(), this->my_body
1228  );
1229  }
1230 
1232  graph_node(other.my_graph), base_type(other) {
1233  tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1234  &this->my_graph, static_cast<receiver<input_type> *>(this),
1235  this->output_ports(), this->my_body );
1236  }
1237 
1238 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1239  void set_name( const char *name ) __TBB_override {
1241  }
1242 #endif
1243 
1244 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1245  void extract( ) __TBB_override {
1246  my_predecessors.built_predecessors().receiver_extract(*this);
1247  base_type::extract();
1248  }
1249 #endif
1250  // all the guts are in multifunction_input...
1251 protected:
1252  void reset_node(reset_flags f) __TBB_override { base_type::reset(f); }
1253 }; // multifunction_node
1254 
1256 // successors. The node has unlimited concurrency, so it does not reject inputs.
1257 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1258 class split_node : public graph_node, public receiver<TupleType> {
1261 public:
1262  typedef TupleType input_type;
1263  typedef Allocator allocator_type;
1264 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1265  typedef typename base_type::predecessor_type predecessor_type;
1266  typedef typename base_type::predecessor_list_type predecessor_list_type;
1267  typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1268  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1269 #endif
1270 
1271  typedef typename internal::wrap_tuple_elements<
1272  N, // #elements in tuple
1273  internal::multifunction_output, // wrap this around each element
1274  TupleType // the tuple providing the types
1277  explicit split_node(graph &g) : graph_node(g)
1278  {
1279  tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1280  static_cast<receiver<input_type> *>(this), this->output_ports());
1281  }
1282  split_node( const split_node & other) : graph_node(other.my_graph), base_type(other)
1283  {
1284  tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1285  static_cast<receiver<input_type> *>(this), this->output_ports());
1286  }
1288 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1289  void set_name( const char *name ) __TBB_override {
1291  }
1292 #endif
1293 
1294  output_ports_type &output_ports() { return my_output_ports; }
1296 protected:
1297  task *try_put_task(const TupleType& t) __TBB_override {
1298  // Sending split messages in parallel is not justified, as overheads would prevail.
1299  // Also, we do not have successors here. So we just tell the task returned here is successful.
1300  return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1301  }
1306  __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1307  }
1310  return my_graph;
1311  }
1312 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1313 private:
1314  void extract() __TBB_override {}
1315 
1317  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1318 
1320  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1321 
1322  size_t predecessor_count() __TBB_override { return 0; }
1323 
1324  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1325 
1326  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1327 
1329  built_predecessors_type my_predessors;
1330 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1331 
1332 private:
1334 };
1335 
1337 template <typename Output, typename Policy = internal::Policy<void> >
1338 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1339  public internal::function_output<Output> {
1340 public:
1342  typedef Output output_type;
1347 
1349  template <typename Body >
1351  graph &g,
1353  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ) {
1354  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1355  static_cast<receiver<input_type> *>(this),
1356  static_cast<sender<output_type> *>(this), this->my_body );
1357  }
1358 
1360  template <typename Body >
1362  graph &g, int number_of_predecessors,
1364  ) : graph_node(g)
1365  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1366  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1367  static_cast<receiver<input_type> *>(this),
1368  static_cast<sender<output_type> *>(this), this->my_body );
1369  }
1370 
1373  graph_node(src.my_graph), input_impl_type(src),
1374  internal::function_output<Output>() {
1375  tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1376  static_cast<receiver<input_type> *>(this),
1377  static_cast<sender<output_type> *>(this), this->my_body );
1378  }
1379 
1380 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1381  void set_name( const char *name ) __TBB_override {
1383  }
1384 #endif
1385 
1386 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1387  void extract() __TBB_override {
1388  input_impl_type::my_built_predecessors.receiver_extract(*this);
1389  successors().built_successors().sender_extract(*this);
1390  }
1391 #endif
1392 
1393 protected:
1394  template< typename R, typename B > friend class run_and_put_task;
1395  template<typename X, typename Y> friend class internal::broadcast_cache;
1396  template<typename X, typename Y> friend class internal::round_robin_cache;
1397  using input_impl_type::try_put_task;
1398  internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1399 
1401  input_impl_type::reset_receiver(f);
1402  if(f & rf_clear_edges)successors().clear();
1403  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1404  }
1405 }; // continue_node
1406 
1408 template <typename T>
1409 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1410 public:
1411  typedef T input_type;
1412  typedef T output_type;
1415 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1416  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1417  typedef typename sender<output_type>::successor_list_type successor_list_type;
1418 #endif
1419 private:
1421 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1422  internal::edge_container<predecessor_type> my_built_predecessors;
1423  spin_mutex pred_mutex; // serialize accesses on edge_container
1424 #endif
1425 public:
1426 
1427  explicit broadcast_node(graph& g) : graph_node(g) {
1428  my_successors.set_owner( this );
1429  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1430  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1431  }
1432 
1433  // Copy constructor
1435  graph_node(src.my_graph), receiver<T>(), sender<T>()
1436  {
1437  my_successors.set_owner( this );
1438  tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1439  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1440  }
1441 
1442 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1443  void set_name( const char *name ) __TBB_override {
1445  }
1446 #endif
1447 
1450  my_successors.register_successor( r );
1451  return true;
1452  }
1453 
1456  my_successors.remove_successor( r );
1457  return true;
1458  }
1459 
1460 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1461  typedef typename sender<T>::built_successors_type built_successors_type;
1462 
1463  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1464 
1465  void internal_add_built_successor(successor_type &r) __TBB_override {
1466  my_successors.internal_add_built_successor(r);
1467  }
1468 
1469  void internal_delete_built_successor(successor_type &r) __TBB_override {
1470  my_successors.internal_delete_built_successor(r);
1471  }
1472 
1473  size_t successor_count() __TBB_override {
1474  return my_successors.successor_count();
1475  }
1476 
1477  void copy_successors(successor_list_type &v) __TBB_override {
1478  my_successors.copy_successors(v);
1479  }
1481  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1482 
1483  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1484 
1485  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1487  my_built_predecessors.add_edge(p);
1488  }
1489 
1490  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1491  spin_mutex::scoped_lock l(pred_mutex);
1492  my_built_predecessors.delete_edge(p);
1493  }
1494 
1495  size_t predecessor_count() __TBB_override {
1496  spin_mutex::scoped_lock l(pred_mutex);
1497  return my_built_predecessors.edge_count();
1498  }
1499 
1500  void copy_predecessors(predecessor_list_type &v) __TBB_override {
1501  spin_mutex::scoped_lock l(pred_mutex);
1502  my_built_predecessors.copy_edges(v);
1503  }
1504 
1505  void extract() __TBB_override {
1506  my_built_predecessors.receiver_extract(*this);
1507  my_successors.built_successors().sender_extract(*this);
1508  }
1509 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1510 
1511 protected:
1512  template< typename R, typename B > friend class run_and_put_task;
1513  template<typename X, typename Y> friend class internal::broadcast_cache;
1514  template<typename X, typename Y> friend class internal::round_robin_cache;
1517  task *new_task = my_successors.try_put_task(t);
1518  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1519  return new_task;
1520  }
1522  graph& graph_reference() __TBB_override {
1523  return my_graph;
1524  }
1525 
1527 
1529  if (f&rf_clear_edges) {
1530  my_successors.clear();
1531 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1532  my_built_predecessors.clear();
1533 #endif
1534  }
1535  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1536  }
1537 }; // broadcast_node
1540 template <typename T, typename A=cache_aligned_allocator<T> >
1541 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1542 public:
1543  typedef T input_type;
1544  typedef T output_type;
1548 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1549  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1550  typedef typename sender<output_type>::successor_list_type successor_list_type;
1551 #endif
1552 protected:
1553  typedef size_t size_type;
1555 
1556 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1557  internal::edge_container<predecessor_type> my_built_predecessors;
1558 #endif
1559 
1562  enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1563 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1564  , add_blt_succ, del_blt_succ,
1565  add_blt_pred, del_blt_pred,
1566  blt_succ_cnt, blt_pred_cnt,
1567  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1568 #endif
1569  };
1570 
1571  // implements the aggregator_operation concept
1572  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1573  public:
1574  char type;
1575 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1576  task * ltask;
1577  union {
1578  input_type *elem;
1579  successor_type *r;
1580  predecessor_type *p;
1581  size_t cnt_val;
1582  successor_list_type *svec;
1583  predecessor_list_type *pvec;
1584  };
1585 #else
1586  T *elem;
1589 #endif
1590  buffer_operation(const T& e, op_type t) : type(char(t))
1591 
1592 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1593  , ltask(NULL), elem(const_cast<T*>(&e))
1594 #else
1595  , elem(const_cast<T*>(&e)) , ltask(NULL)
1596 #endif
1597  {}
1598  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1599  };
1600 
1602  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1603  friend class internal::aggregating_functor<class_type, buffer_operation>;
1604  internal::aggregator< handler_type, buffer_operation> my_aggregator;
1606  virtual void handle_operations(buffer_operation *op_list) {
1607  handle_operations_impl(op_list, this);
1608  }
1610  template<typename derived_type>
1611  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1612  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1614  buffer_operation *tmp = NULL;
1615  bool try_forwarding = false;
1616  while (op_list) {
1617  tmp = op_list;
1618  op_list = op_list->next;
1619  switch (tmp->type) {
1620  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1621  case rem_succ: internal_rem_succ(tmp); break;
1622  case req_item: internal_pop(tmp); break;
1623  case res_item: internal_reserve(tmp); break;
1624  case rel_res: internal_release(tmp); try_forwarding = true; break;
1625  case con_res: internal_consume(tmp); try_forwarding = true; break;
1626  case put_item: try_forwarding = internal_push(tmp); break;
1627  case try_fwd_task: internal_forward_task(tmp); break;
1628 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1629  // edge recording
1630  case add_blt_succ: internal_add_built_succ(tmp); break;
1631  case del_blt_succ: internal_del_built_succ(tmp); break;
1632  case add_blt_pred: internal_add_built_pred(tmp); break;
1633  case del_blt_pred: internal_del_built_pred(tmp); break;
1634  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1635  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1636  case blt_succ_cpy: internal_copy_succs(tmp); break;
1637  case blt_pred_cpy: internal_copy_preds(tmp); break;
1638 #endif
1639  }
1640  }
1641 
1642  derived->order();
1644  if (try_forwarding && !forwarder_busy) {
1645  if(internal::is_graph_active(this->my_graph)) {
1646  forwarder_busy = true;
1647  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1650  // tmp should point to the last item handled by the aggregator. This is the operation
1651  // the handling thread enqueued. So modifying that record will be okay.
1652  // workaround for icc bug
1653  tbb::task *z = tmp->ltask;
1654  graph &g = this->my_graph;
1655  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1656  }
1657  }
1658  } // handle_operations
1661  return op_data.ltask;
1662  }
1665  task *ft = grab_forwarding_task(op_data);
1666  if(ft) {
1667  internal::spawn_in_graph_arena(graph_reference(), *ft);
1668  return true;
1669  }
1670  return false;
1671  }
1672 
1674  virtual task *forward_task() {
1675  buffer_operation op_data(try_fwd_task);
1676  task *last_task = NULL;
1677  do {
1678  op_data.status = internal::WAIT;
1679  op_data.ltask = NULL;
1680  my_aggregator.execute(&op_data);
1681 
1682  // workaround for icc bug
1683  tbb::task *xtask = op_data.ltask;
1684  graph& g = this->my_graph;
1685  last_task = combine_tasks(g, last_task, xtask);
1686  } while (op_data.status ==internal::SUCCEEDED);
1687  return last_task;
1688  }
1691  virtual void internal_reg_succ(buffer_operation *op) {
1692  my_successors.register_successor(*(op->r));
1694  }
1698  my_successors.remove_successor(*(op->r));
1700  }
1702 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1703  typedef typename sender<T>::built_successors_type built_successors_type;
1705  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1707  virtual void internal_add_built_succ(buffer_operation *op) {
1708  my_successors.internal_add_built_successor(*(op->r));
1710  }
1712  virtual void internal_del_built_succ(buffer_operation *op) {
1713  my_successors.internal_delete_built_successor(*(op->r));
1715  }
1716 
1717  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1718 
1719  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1720 
1721  virtual void internal_add_built_pred(buffer_operation *op) {
1722  my_built_predecessors.add_edge(*(op->p));
1724  }
1725 
1726  virtual void internal_del_built_pred(buffer_operation *op) {
1727  my_built_predecessors.delete_edge(*(op->p));
1729  }
1730 
1731  virtual void internal_succ_cnt(buffer_operation *op) {
1732  op->cnt_val = my_successors.successor_count();
1734  }
1735 
1736  virtual void internal_pred_cnt(buffer_operation *op) {
1737  op->cnt_val = my_built_predecessors.edge_count();
1739  }
1741  virtual void internal_copy_succs(buffer_operation *op) {
1742  my_successors.copy_successors(*(op->svec));
1744  }
1746  virtual void internal_copy_preds(buffer_operation *op) {
1747  my_built_predecessors.copy_edges(*(op->pvec));
1749  }
1751 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1752 
1753 private:
1754  void order() {}
1756  bool is_item_valid() {
1757  return this->my_item_valid(this->my_tail - 1);
1758  }
1760  void try_put_and_add_task(task*& last_task) {
1761  task *new_task = my_successors.try_put_task(this->back());
1762  if (new_task) {
1763  // workaround for icc bug
1764  graph& g = this->my_graph;
1765  last_task = combine_tasks(g, last_task, new_task);
1766  this->destroy_back();
1767  }
1768  }
1769 
1770 protected:
1773  internal_forward_task_impl(op, this);
1774  }
1775 
1776  template<typename derived_type>
1777  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1778  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1779 
1780  if (this->my_reserved || !derived->is_item_valid()) {
1782  this->forwarder_busy = false;
1783  return;
1784  }
1785  // Try forwarding, giving each successor a chance
1786  task * last_task = NULL;
1787  size_type counter = my_successors.size();
1788  for (; counter > 0 && derived->is_item_valid(); --counter)
1789  derived->try_put_and_add_task(last_task);
1791  op->ltask = last_task; // return task
1792  if (last_task && !counter) {
1794  }
1795  else {
1797  forwarder_busy = false;
1798  }
1799  }
1801  virtual bool internal_push(buffer_operation *op) {
1802  this->push_back(*(op->elem));
1804  return true;
1805  }
1806 
1807  virtual void internal_pop(buffer_operation *op) {
1808  if(this->pop_back(*(op->elem))) {
1810  }
1811  else {
1813  }
1814  }
1817  if(this->reserve_front(*(op->elem))) {
1819  }
1820  else {
1822  }
1823  }
1824 
1826  this->consume_front();
1828  }
1829 
1831  this->release_front();
1833  }
1834 
1835 public:
1837  explicit buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1838  forwarder_busy(false) {
1839  my_successors.set_owner(this);
1840  my_aggregator.initialize_handler(handler_type(this));
1841  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1842  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1843  }
1844 
1846  buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
1847  internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1848  forwarder_busy = false;
1849  my_successors.set_owner(this);
1850  my_aggregator.initialize_handler(handler_type(this));
1851  tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1852  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1853  }
1854 
1855 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1856  void set_name( const char *name ) __TBB_override {
1858  }
1859 #endif
1861  //
1862  // message sender implementation
1863  //
1868  buffer_operation op_data(reg_succ);
1869  op_data.r = &r;
1870  my_aggregator.execute(&op_data);
1871  (void)enqueue_forwarding_task(op_data);
1872  return true;
1873  }
1875 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1876  void internal_add_built_successor( successor_type &r) __TBB_override {
1877  buffer_operation op_data(add_blt_succ);
1878  op_data.r = &r;
1879  my_aggregator.execute(&op_data);
1880  }
1882  void internal_delete_built_successor( successor_type &r) __TBB_override {
1883  buffer_operation op_data(del_blt_succ);
1884  op_data.r = &r;
1885  my_aggregator.execute(&op_data);
1886  }
1887 
1888  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1889  buffer_operation op_data(add_blt_pred);
1890  op_data.p = &p;
1891  my_aggregator.execute(&op_data);
1892  }
1893 
1894  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1895  buffer_operation op_data(del_blt_pred);
1896  op_data.p = &p;
1897  my_aggregator.execute(&op_data);
1898  }
1899 
1900  size_t predecessor_count() __TBB_override {
1901  buffer_operation op_data(blt_pred_cnt);
1902  my_aggregator.execute(&op_data);
1903  return op_data.cnt_val;
1904  }
1905 
1906  size_t successor_count() __TBB_override {
1907  buffer_operation op_data(blt_succ_cnt);
1908  my_aggregator.execute(&op_data);
1909  return op_data.cnt_val;
1910  }
1911 
1912  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
1913  buffer_operation op_data(blt_pred_cpy);
1914  op_data.pvec = &v;
1915  my_aggregator.execute(&op_data);
1916  }
1917 
1918  void copy_successors( successor_list_type &v ) __TBB_override {
1919  buffer_operation op_data(blt_succ_cpy);
1920  op_data.svec = &v;
1921  my_aggregator.execute(&op_data);
1922  }
1924 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1929  bool remove_successor( successor_type &r ) __TBB_override {
1930  r.remove_predecessor(*this);
1931  buffer_operation op_data(rem_succ);
1932  op_data.r = &r;
1933  my_aggregator.execute(&op_data);
1934  // even though this operation does not cause a forward, if we are the handler, and
1935  // a forward is scheduled, we may be the first to reach this point after the aggregator,
1936  // and so should check for the task.
1937  (void)enqueue_forwarding_task(op_data);
1938  return true;
1939  }
1944  bool try_get( T &v ) __TBB_override {
1945  buffer_operation op_data(req_item);
1946  op_data.elem = &v;
1947  my_aggregator.execute(&op_data);
1948  (void)enqueue_forwarding_task(op_data);
1949  return (op_data.status==internal::SUCCEEDED);
1950  }
1951 
1953 
1956  buffer_operation op_data(res_item);
1957  op_data.elem = &v;
1958  my_aggregator.execute(&op_data);
1959  (void)enqueue_forwarding_task(op_data);
1960  return (op_data.status==internal::SUCCEEDED);
1961  }
1962 
1964 
1966  buffer_operation op_data(rel_res);
1967  my_aggregator.execute(&op_data);
1968  (void)enqueue_forwarding_task(op_data);
1969  return true;
1970  }
1971 
1973 
1975  buffer_operation op_data(con_res);
1976  my_aggregator.execute(&op_data);
1977  (void)enqueue_forwarding_task(op_data);
1978  return true;
1979  }
1980 
1981 protected:
1983  template< typename R, typename B > friend class run_and_put_task;
1984  template<typename X, typename Y> friend class internal::broadcast_cache;
1985  template<typename X, typename Y> friend class internal::round_robin_cache;
1988  buffer_operation op_data(t, put_item);
1989  my_aggregator.execute(&op_data);
1990  task *ft = grab_forwarding_task(op_data);
1991  // sequencer_nodes can return failure (if an item has been previously inserted)
1992  // We have to spawn the returned task if our own operation fails.
1993 
1994  if(ft && op_data.status ==internal::FAILED) {
1995  // we haven't succeeded queueing the item, but for some reason the
1996  // call returned a task (if another request resulted in a successful
1997  // forward this could happen.) Queue the task and reset the pointer.
1998  internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
1999  }
2000  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2001  ft = SUCCESSFULLY_ENQUEUED;
2002  }
2003  return ft;
2004  }
2005 
2007  return my_graph;
2008  }
2009 
2011 
2012 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2013 public:
2014  void extract() __TBB_override {
2015  my_built_predecessors.receiver_extract(*this);
2016  my_successors.built_successors().sender_extract(*this);
2017  }
2018 #endif
2019 
2020 protected:
2023  // TODO: just clear structures
2024  if (f&rf_clear_edges) {
2025  my_successors.clear();
2026 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2027  my_built_predecessors.clear();
2028 #endif
2029  }
2030  forwarder_busy = false;
2031  }
2032 }; // buffer_node
2033 
2035 template <typename T, typename A=cache_aligned_allocator<T> >
2036 class queue_node : public buffer_node<T, A> {
2037 protected:
2042 
2043 private:
2044  template<typename, typename> friend class buffer_node;
2045 
2046  bool is_item_valid() {
2047  return this->my_item_valid(this->my_head);
2048  }
2049 
2050  void try_put_and_add_task(task*& last_task) {
2051  task *new_task = this->my_successors.try_put_task(this->front());
2052  if (new_task) {
2053  // workaround for icc bug
2054  graph& graph_ref = this->graph_reference();
2055  last_task = combine_tasks(graph_ref, last_task, new_task);
2056  this->destroy_front();
2057  }
2058  }
2059 
2060 protected:
2062  this->internal_forward_task_impl(op, this);
2063  }
2064 
2066  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2068  }
2069  else {
2070  this->pop_front(*(op->elem));
2072  }
2073  }
2075  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2077  }
2078  else {
2079  this->reserve_front(*(op->elem));
2081  }
2082  }
2084  this->consume_front();
2086  }
2087 
2088 public:
2089  typedef T input_type;
2090  typedef T output_type;
2093 
2095  explicit queue_node( graph &g ) : base_type(g) {
2096  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2097  static_cast<receiver<input_type> *>(this),
2098  static_cast<sender<output_type> *>(this) );
2099  }
2100 
2102  queue_node( const queue_node& src) : base_type(src) {
2103  tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2104  static_cast<receiver<input_type> *>(this),
2105  static_cast<sender<output_type> *>(this) );
2106  }
2107 
2108 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2109  void set_name( const char *name ) __TBB_override {
2111  }
2112 #endif
2113 
2114 protected:
2116  base_type::reset_node(f);
2117  }
2118 }; // queue_node
2119 
2121 template< typename T, typename A=cache_aligned_allocator<T> >
2122 class sequencer_node : public queue_node<T, A> {
2124  // my_sequencer should be a benign function and must be callable
2125  // from a parallel context. Does this mean it needn't be reset?
2126 public:
2127  typedef T input_type;
2128  typedef T output_type;
2131 
2133  template< typename Sequencer >
2134  sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2135  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2136  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2137  static_cast<receiver<input_type> *>(this),
2138  static_cast<sender<output_type> *>(this) );
2139  }
2140 
2142  sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2143  my_sequencer( src.my_sequencer->clone() ) {
2144  tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2145  static_cast<receiver<input_type> *>(this),
2146  static_cast<sender<output_type> *>(this) );
2147  }
2148 
2150  ~sequencer_node() { delete my_sequencer; }
2151 
2152 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2153  void set_name( const char *name ) __TBB_override {
2155  }
2156 #endif
2157 
2158 protected:
2161 
2162 private:
2164  size_type tag = (*my_sequencer)(*(op->elem));
2165 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2166  if (tag < this->my_head) {
2167  // have already emitted a message with this tag
2169  return false;
2170  }
2171 #endif
2172  // cannot modify this->my_tail now; the buffer would be inconsistent.
2173  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2174 
2175  if (this->size(new_tail) > this->capacity()) {
2176  this->grow_my_array(this->size(new_tail));
2177  }
2178  this->my_tail = new_tail;
2179 
2180  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2181  __TBB_store_with_release(op->status, res);
2182  return res ==internal::SUCCEEDED;
2183  }
2184 }; // sequencer_node
2185 
2187 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2188 class priority_queue_node : public buffer_node<T, A> {
2189 public:
2190  typedef T input_type;
2191  typedef T output_type;
2196 
2198  explicit priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2199  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2200  static_cast<receiver<input_type> *>(this),
2201  static_cast<sender<output_type> *>(this) );
2202  }
2203 
2205  priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2206  tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2207  static_cast<receiver<input_type> *>(this),
2208  static_cast<sender<output_type> *>(this) );
2209  }
2210 
2211 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2212  void set_name( const char *name ) __TBB_override {
2214  }
2215 #endif
2216 
2217 protected:
2218 
2220  mark = 0;
2221  base_type::reset_node(f);
2222  }
2223 
2227 
2230  this->internal_forward_task_impl(op, this);
2231  }
2232 
2234  this->handle_operations_impl(op_list, this);
2235  }
2236 
2238  prio_push(*(op->elem));
2240  return true;
2241  }
2242 
2244  // if empty or already reserved, don't pop
2245  if ( this->my_reserved == true || this->my_tail == 0 ) {
2247  return;
2248  }
2249 
2250  *(op->elem) = prio();
2252  prio_pop();
2253 
2254  }
2255 
2256  // pops the highest-priority item, saves copy
2258  if (this->my_reserved == true || this->my_tail == 0) {
2260  return;
2261  }
2262  this->my_reserved = true;
2263  *(op->elem) = prio();
2264  reserved_item = *(op->elem);
2266  prio_pop();
2267  }
2268 
2271  this->my_reserved = false;
2272  reserved_item = input_type();
2273  }
2274 
2277  prio_push(reserved_item);
2278  this->my_reserved = false;
2279  reserved_item = input_type();
2280  }
2281 
2282 private:
2283  template<typename, typename> friend class buffer_node;
2284 
2285  void order() {
2286  if (mark < this->my_tail) heapify();
2287  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2288  }
2289 
2290  bool is_item_valid() {
2291  return this->my_tail > 0;
2292  }
2293 
2294  void try_put_and_add_task(task*& last_task) {
2295  task * new_task = this->my_successors.try_put_task(this->prio());
2296  if (new_task) {
2297  // workaround for icc bug
2298  graph& graph_ref = this->graph_reference();
2299  last_task = combine_tasks(graph_ref, last_task, new_task);
2300  prio_pop();
2301  }
2302  }
2303 
2304 private:
2305  Compare compare;
2307 
2309 
2310  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2311  bool prio_use_tail() {
2312  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2313  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2314  }
2315 
2316  // prio_push: checks that the item will fit, expand array if necessary, put at end
2317  void prio_push(const T &src) {
2318  if ( this->my_tail >= this->my_array_size )
2319  this->grow_my_array( this->my_tail + 1 );
2320  (void) this->place_item(this->my_tail, src);
2321  ++(this->my_tail);
2322  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2323  }
2324 
2325  // prio_pop: deletes highest priority item from the array, and if it is item
2326  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2327  // and mark. Assumes the array has already been tested for emptiness; no failure.
2328  void prio_pop() {
2329  if (prio_use_tail()) {
2330  // there are newly pushed elements; last one higher than top
2331  // copy the data
2332  this->destroy_item(this->my_tail-1);
2333  --(this->my_tail);
2334  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2335  return;
2336  }
2337  this->destroy_item(0);
2338  if(this->my_tail > 1) {
2339  // push the last element down heap
2340  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2341  this->move_item(0,this->my_tail - 1);
2342  }
2343  --(this->my_tail);
2344  if(mark > this->my_tail) --mark;
2345  if (this->my_tail > 1) // don't reheap for heap of size 1
2346  reheap();
2347  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2348  }
2349 
2350  const T& prio() {
2351  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2352  }
2353 
2354  // turn array into heap
2355  void heapify() {
2356  if(this->my_tail == 0) {
2357  mark = 0;
2358  return;
2359  }
2360  if (!mark) mark = 1;
2361  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2362  size_type cur_pos = mark;
2363  input_type to_place;
2364  this->fetch_item(mark,to_place);
2365  do { // push to_place up the heap
2366  size_type parent = (cur_pos-1)>>1;
2367  if (!compare(this->get_my_item(parent), to_place))
2368  break;
2369  this->move_item(cur_pos, parent);
2370  cur_pos = parent;
2371  } while( cur_pos );
2372  (void) this->place_item(cur_pos, to_place);
2373  }
2374  }
2375 
2376  // otherwise heapified array with new root element; rearrange to heap
2377  void reheap() {
2378  size_type cur_pos=0, child=1;
2379  while (child < mark) {
2380  size_type target = child;
2381  if (child+1<mark &&
2382  compare(this->get_my_item(child),
2383  this->get_my_item(child+1)))
2384  ++target;
2385  // target now has the higher priority child
2386  if (compare(this->get_my_item(target),
2387  this->get_my_item(cur_pos)))
2388  break;
2389  // swap
2390  this->swap_items(cur_pos, target);
2391  cur_pos = target;
2392  child = (cur_pos<<1)+1;
2393  }
2394  }
2395 }; // priority_queue_node
2396 
2397 } // interfaceX
2398 
2399 namespace interface11 {
2400 
2401 using namespace interface10;
2402 namespace internal = interface10::internal;
2403 
2405 
2408 template< typename T, typename DecrementType=continue_msg >
2409 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2410 public:
2411  typedef T input_type;
2412  typedef T output_type;
2415 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2416  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2417  typedef typename sender<output_type>::built_successors_type built_successors_type;
2418  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2419  typedef typename sender<output_type>::successor_list_type successor_list_type;
2420 #endif
2421  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2422 
2423 private:
2425  size_t my_count; //number of successful puts
2426  size_t my_tries; //number of active put attempts
2430  __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
2431 
2433 
2434  // Let decrementer call decrement_counter()
2435  friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
2436 
2437  bool check_conditions() { // always called under lock
2438  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2439  }
2440 
2441  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2443  input_type v;
2444  task *rval = NULL;
2445  bool reserved = false;
2446  {
2447  spin_mutex::scoped_lock lock(my_mutex);
2448  if ( check_conditions() )
2449  ++my_tries;
2450  else
2451  return NULL;
2452  }
2453 
2454  //SUCCESS
2455  // if we can reserve and can put, we consume the reservation
2456  // we increment the count and decrement the tries
2457  if ( (my_predecessors.try_reserve(v)) == true ){
2458  reserved=true;
2459  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2460  {
2461  spin_mutex::scoped_lock lock(my_mutex);
2462  ++my_count;
2463  --my_tries;
2464  my_predecessors.try_consume();
2465  if ( check_conditions() ) {
2466  if ( internal::is_graph_active(this->my_graph) ) {
2467  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2469  internal::spawn_in_graph_arena(graph_reference(), *rtask);
2470  }
2471  }
2472  }
2473  return rval;
2474  }
2475  }
2476  //FAILURE
2477  //if we can't reserve, we decrement the tries
2478  //if we can reserve but can't put, we decrement the tries and release the reservation
2479  {
2480  spin_mutex::scoped_lock lock(my_mutex);
2481  --my_tries;
2482  if (reserved) my_predecessors.try_release();
2483  if ( check_conditions() ) {
2484  if ( internal::is_graph_active(this->my_graph) ) {
2485  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2487  __TBB_ASSERT(!rval, "Have two tasks to handle");
2488  return rtask;
2489  }
2490  }
2491  return rval;
2492  }
2493  }
2494 
2495  void forward() {
2496  __TBB_ASSERT(false, "Should never be called");
2497  return;
2498  }
2499 
2500  task* decrement_counter( long long delta ) {
2501  {
2502  spin_mutex::scoped_lock lock(my_mutex);
2503  if( delta > 0 && size_t(delta) > my_count )
2504  my_count = 0;
2505  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
2506  my_count = my_threshold;
2507  else
2508  my_count -= size_t(delta); // absolute value of delta is sufficiently small
2509  }
2510  return forward_task();
2511  }
2512 
2513  void initialize() {
2514  my_predecessors.set_owner(this);
2515  my_successors.set_owner(this);
2516  decrement.set_owner(this);
2518  tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2519  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2520  static_cast<sender<output_type> *>(this)
2521  );
2522  }
2523 public:
2526 
2527 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
2529  "Deprecated interface of the limiter node can be used only in conjunction "
2530  "with continue_msg as the type of DecrementType template parameter." );
2531 #endif // Check for incompatible interface
2532 
2535  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
2536  : graph_node(g), my_threshold(threshold), my_count(0),
2538  my_tries(0), decrement(),
2539  init_decrement_predecessors(num_decrement_predecessors),
2540  decrement(num_decrement_predecessors)) {
2541  initialize();
2542  }
2543 
2545  limiter_node( const limiter_node& src ) :
2546  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2547  my_threshold(src.my_threshold), my_count(0),
2549  my_tries(0), decrement(),
2550  init_decrement_predecessors(src.init_decrement_predecessors),
2551  decrement(src.init_decrement_predecessors)) {
2552  initialize();
2553  }
2554 
2555 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2556  void set_name( const char *name ) __TBB_override {
2558  }
2559 #endif
2560 
2563  spin_mutex::scoped_lock lock(my_mutex);
2564  bool was_empty = my_successors.empty();
2565  my_successors.register_successor(r);
2566  //spawn a forward task if this is the only successor
2567  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2568  if ( internal::is_graph_active(this->my_graph) ) {
2569  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2571  internal::spawn_in_graph_arena(graph_reference(), *task);
2572  }
2573  }
2574  return true;
2575  }
2576 
2578 
2580  r.remove_predecessor(*this);
2581  my_successors.remove_successor(r);
2582  return true;
2583  }
2584 
2585 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2586  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2587  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2588 
2589  void internal_add_built_successor(successor_type &src) __TBB_override {
2590  my_successors.internal_add_built_successor(src);
2591  }
2592 
2593  void internal_delete_built_successor(successor_type &src) __TBB_override {
2594  my_successors.internal_delete_built_successor(src);
2595  }
2596 
2597  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2598 
2599  void copy_successors(successor_list_type &v) __TBB_override {
2600  my_successors.copy_successors(v);
2601  }
2602 
2603  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2604  my_predecessors.internal_add_built_predecessor(src);
2605  }
2606 
2607  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2608  my_predecessors.internal_delete_built_predecessor(src);
2609  }
2610 
2611  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2612 
2613  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2614  my_predecessors.copy_predecessors(v);
2615  }
2616 
2617  void extract() __TBB_override {
2618  my_count = 0;
2619  my_successors.built_successors().sender_extract(*this);
2620  my_predecessors.built_predecessors().receiver_extract(*this);
2621  decrement.built_predecessors().receiver_extract(decrement);
2622  }
2623 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2624 
2627  spin_mutex::scoped_lock lock(my_mutex);
2628  my_predecessors.add( src );
2629  if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
2630  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2632  internal::spawn_in_graph_arena(graph_reference(), *task);
2633  }
2634  return true;
2635  }
2636 
2639  my_predecessors.remove( src );
2640  return true;
2641  }
2642 
2643 protected:
2644 
2645  template< typename R, typename B > friend class run_and_put_task;
2646  template<typename X, typename Y> friend class internal::broadcast_cache;
2647  template<typename X, typename Y> friend class internal::round_robin_cache;
2650  {
2652  if ( my_count + my_tries >= my_threshold )
2653  return NULL;
2654  else
2655  ++my_tries;
2656  }
2657 
2658  task * rtask = my_successors.try_put_task(t);
2659 
2660  if ( !rtask ) { // try_put_task failed.
2662  --my_tries;
2663  if (check_conditions() && internal::is_graph_active(this->my_graph)) {
2664  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2666  }
2667  }
2668  else {
2670  ++my_count;
2671  --my_tries;
2672  }
2673  return rtask;
2674  }
2675 
2676  graph& graph_reference() __TBB_override { return my_graph; }
2677 
2679  __TBB_ASSERT(false,NULL); // should never be called
2680  }
2681 
2683  my_count = 0;
2684  if(f & rf_clear_edges) {
2685  my_predecessors.clear();
2686  my_successors.clear();
2687  }
2688  else
2689  {
2690  my_predecessors.reset( );
2691  }
2692  decrement.reset_receiver(f);
2693  }
2694 }; // limiter_node
2695 } // namespace interfaceX
2696 
2697 namespace interface10 {
2698 
2700 
2704 using internal::input_port;
2705 using internal::tag_value;
2706 
2707 template<typename OutputTuple, typename JP=queueing> class join_node;
2708 
2709 template<typename OutputTuple>
2710 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2711 private:
2714 public:
2715  typedef OutputTuple output_type;
2717  explicit join_node(graph &g) : unfolded_type(g) {
2718  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2719  this->input_ports(), static_cast< sender< output_type > *>(this) );
2720  }
2721  join_node(const join_node &other) : unfolded_type(other) {
2722  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2723  this->input_ports(), static_cast< sender< output_type > *>(this) );
2724  }
2725 
2726 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2727  void set_name( const char *name ) __TBB_override {
2729  }
2730 #endif
2731 
2732 };
2733 
2734 template<typename OutputTuple>
2735 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2736 private:
2739 public:
2740  typedef OutputTuple output_type;
2742  explicit join_node(graph &g) : unfolded_type(g) {
2743  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2744  this->input_ports(), static_cast< sender< output_type > *>(this) );
2745  }
2746  join_node(const join_node &other) : unfolded_type(other) {
2747  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2748  this->input_ports(), static_cast< sender< output_type > *>(this) );
2749  }
2750 
2751 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2752  void set_name( const char *name ) __TBB_override {
2754  }
2755 #endif
2756 
2757 };
2758 
2759 // template for key_matching join_node
2760 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2761 template<typename OutputTuple, typename K, typename KHash>
2762 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2763  key_matching_port, OutputTuple, key_matching<K,KHash> > {
2764 private:
2767 public:
2768  typedef OutputTuple output_type;
2770 
2771 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2773 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2774 
2775  template<typename __TBB_B0, typename __TBB_B1>
2776  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2777  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2778  this->input_ports(), static_cast< sender< output_type > *>(this) );
2779  }
2780  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2781  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2782  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2783  this->input_ports(), static_cast< sender< output_type > *>(this) );
2784  }
2785  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2786  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2787  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2788  this->input_ports(), static_cast< sender< output_type > *>(this) );
2789  }
2790  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2791  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2792  unfolded_type(g, b0, b1, b2, b3, b4) {
2793  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2794  this->input_ports(), static_cast< sender< output_type > *>(this) );
2795  }
2796 #if __TBB_VARIADIC_MAX >= 6
2797  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2798  typename __TBB_B5>
2799  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2800  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2801  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2802  this->input_ports(), static_cast< sender< output_type > *>(this) );
2803  }
2804 #endif
2805 #if __TBB_VARIADIC_MAX >= 7
2806  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2807  typename __TBB_B5, typename __TBB_B6>
2808  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2809  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2810  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2811  this->input_ports(), static_cast< sender< output_type > *>(this) );
2812  }
2813 #endif
2814 #if __TBB_VARIADIC_MAX >= 8
2815  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2816  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2817  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2818  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2819  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2820  this->input_ports(), static_cast< sender< output_type > *>(this) );
2821  }
2822 #endif
2823 #if __TBB_VARIADIC_MAX >= 9
2824  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2825  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2826  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2827  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2828  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2829  this->input_ports(), static_cast< sender< output_type > *>(this) );
2830  }
2831 #endif
2832 #if __TBB_VARIADIC_MAX >= 10
2833  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2834  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2835  join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2836  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2837  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2838  this->input_ports(), static_cast< sender< output_type > *>(this) );
2839  }
2840 #endif
2841  join_node(const join_node &other) : unfolded_type(other) {
2842  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2843  this->input_ports(), static_cast< sender< output_type > *>(this) );
2844  }
2845 
2846 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2847  void set_name( const char *name ) __TBB_override {
2849  }
2850 #endif
2851 
2852 };
2853 
2854 // indexer node
2856 
2857 // TODO: Implement interface with variadic template or tuple
2858 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2859  typename T4=null_type, typename T5=null_type, typename T6=null_type,
2860  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2861 
2862 //indexer node specializations
2863 template<typename T0>
2864 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2865 private:
2866  static const int N = 1;
2867 public:
2868  typedef tuple<T0> InputTuple;
2872  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2873  this->input_ports(), static_cast< sender< output_type > *>(this) );
2874  }
2875  // Copy constructor
2876  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2877  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2878  this->input_ports(), static_cast< sender< output_type > *>(this) );
2879  }
2880 
2881 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2882  void set_name( const char *name ) __TBB_override {
2884  }
2885 #endif
2886 };
2887 
2888 template<typename T0, typename T1>
2889 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2890 private:
2891  static const int N = 2;
2892 public:
2893  typedef tuple<T0, T1> InputTuple;
2897  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2898  this->input_ports(), static_cast< sender< output_type > *>(this) );
2899  }
2900  // Copy constructor
2901  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2902  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2903  this->input_ports(), static_cast< sender< output_type > *>(this) );
2904  }
2905 
2906 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2907  void set_name( const char *name ) __TBB_override {
2909  }
2910 #endif
2911 };
2912 
2913 template<typename T0, typename T1, typename T2>
2914 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2915 private:
2916  static const int N = 3;
2917 public:
2918  typedef tuple<T0, T1, T2> InputTuple;
2922  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2923  this->input_ports(), static_cast< sender< output_type > *>(this) );
2924  }
2925  // Copy constructor
2926  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2927  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2928  this->input_ports(), static_cast< sender< output_type > *>(this) );
2929  }
2930 
2931 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2932  void set_name( const char *name ) __TBB_override {
2934  }
2935 #endif
2936 };
2937 
2938 template<typename T0, typename T1, typename T2, typename T3>
2939 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2940 private:
2941  static const int N = 4;
2942 public:
2943  typedef tuple<T0, T1, T2, T3> InputTuple;
2947  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2948  this->input_ports(), static_cast< sender< output_type > *>(this) );
2949  }
2950  // Copy constructor
2951  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2952  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2953  this->input_ports(), static_cast< sender< output_type > *>(this) );
2954  }
2955 
2956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2957  void set_name( const char *name ) __TBB_override {
2959  }
2960 #endif
2961 };
2962 
2963 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2964 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
2965 private:
2966  static const int N = 5;
2967 public:
2968  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
2972  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2973  this->input_ports(), static_cast< sender< output_type > *>(this) );
2974  }
2975  // Copy constructor
2976  indexer_node( const indexer_node& other ) : unfolded_type(other) {
2977  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2978  this->input_ports(), static_cast< sender< output_type > *>(this) );
2979  }
2980 
2981 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2982  void set_name( const char *name ) __TBB_override {
2984  }
2985 #endif
2986 };
2987 
2988 #if __TBB_VARIADIC_MAX >= 6
2989 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2990 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
2991 private:
2992  static const int N = 6;
2993 public:
2994  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2998  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2999  this->input_ports(), static_cast< sender< output_type > *>(this) );
3000  }
3001  // Copy constructor
3002  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3003  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3004  this->input_ports(), static_cast< sender< output_type > *>(this) );
3005  }
3006 
3007 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3008  void set_name( const char *name ) __TBB_override {
3010  }
3011 #endif
3012 };
3013 #endif //variadic max 6
3014 
3015 #if __TBB_VARIADIC_MAX >= 7
3016 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3017  typename T6>
3018 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3019 private:
3020  static const int N = 7;
3021 public:
3022  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3026  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3027  this->input_ports(), static_cast< sender< output_type > *>(this) );
3028  }
3029  // Copy constructor
3030  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3031  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3032  this->input_ports(), static_cast< sender< output_type > *>(this) );
3033  }
3034 
3035 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3036  void set_name( const char *name ) __TBB_override {
3038  }
3039 #endif
3040 };
3041 #endif //variadic max 7
3042 
3043 #if __TBB_VARIADIC_MAX >= 8
3044 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3045  typename T6, typename T7>
3046 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3047 private:
3048  static const int N = 8;
3049 public:
3050  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3054  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3055  this->input_ports(), static_cast< sender< output_type > *>(this) );
3056  }
3057  // Copy constructor
3058  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3059  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3060  this->input_ports(), static_cast< sender< output_type > *>(this) );
3061  }
3062 
3063 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3064  void set_name( const char *name ) __TBB_override {
3066  }
3067 #endif
3068 };
3069 #endif //variadic max 8
3070 
3071 #if __TBB_VARIADIC_MAX >= 9
3072 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3073  typename T6, typename T7, typename T8>
3074 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3075 private:
3076  static const int N = 9;
3077 public:
3078  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3082  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3083  this->input_ports(), static_cast< sender< output_type > *>(this) );
3084  }
3085  // Copy constructor
3086  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3087  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3088  this->input_ports(), static_cast< sender< output_type > *>(this) );
3089  }
3090 
3091 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3092  void set_name( const char *name ) __TBB_override {
3094  }
3095 #endif
3096 };
3097 #endif //variadic max 9
3098 
3099 #if __TBB_VARIADIC_MAX >= 10
3100 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3101  typename T6, typename T7, typename T8, typename T9>
3102 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3103 private:
3104  static const int N = 10;
3105 public:
3106  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3110  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3111  this->input_ports(), static_cast< sender< output_type > *>(this) );
3112  }
3113  // Copy constructor
3114  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3115  tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3116  this->input_ports(), static_cast< sender< output_type > *>(this) );
3117  }
3118 
3119 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3120  void set_name( const char *name ) __TBB_override {
3122  }
3123 #endif
3124 };
3125 #endif //variadic max 10
3126 
3127 #if __TBB_PREVIEW_ASYNC_MSG
3129 #else
3130 template< typename T >
3131 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3132 #endif
3133 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3134  s.internal_add_built_predecessor(p);
3135  p.internal_add_built_successor(s);
3136 #endif
3137  p.register_successor( s );
3139 }
3140 
3142 template< typename T >
3143 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3144  internal_make_edge( p, s );
3145 }
3146 
3147 #if __TBB_PREVIEW_ASYNC_MSG
3148 template< typename TS, typename TR,
3151 inline void make_edge( TS &p, TR &s ) {
3152  internal_make_edge( p, s );
3153 }
3154 
3155 template< typename T >
3157  internal_make_edge( p, s );
3158 }
3159 
3160 template< typename T >
3162  internal_make_edge( p, s );
3163 }
3164 
3165 #endif // __TBB_PREVIEW_ASYNC_MSG
3166 
3167 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3168 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3169 template< typename T, typename V,
3170  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3171 inline void make_edge( T& output, V& input) {
3172  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3173 }
3174 
3175 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3176 template< typename T, typename R,
3177  typename = typename T::output_ports_type >
3178 inline void make_edge( T& output, receiver<R>& input) {
3179  make_edge(get<0>(output.output_ports()), input);
3180 }
3181 
3182 //Makes an edge from a sender to port 0 of a multi-input successor.
3183 template< typename S, typename V,
3184  typename = typename V::input_ports_type >
3185 inline void make_edge( sender<S>& output, V& input) {
3186  make_edge(output, get<0>(input.input_ports()));
3187 }
3188 #endif
3189 
3190 #if __TBB_PREVIEW_ASYNC_MSG
3192 #else
3193 template< typename T >
3194 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3195 #endif
3196  p.remove_successor( s );
3197 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3198  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3199  p.internal_delete_built_successor(s);
3200  s.internal_delete_built_predecessor(p);
3201 #endif
3203 }
3204 
3206 template< typename T >
3207 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3208  internal_remove_edge( p, s );
3209 }
3210 
3211 #if __TBB_PREVIEW_ASYNC_MSG
3212 template< typename TS, typename TR,
3215 inline void remove_edge( TS &p, TR &s ) {
3216  internal_remove_edge( p, s );
3217 }
3218 
3219 template< typename T >
3221  internal_remove_edge( p, s );
3222 }
3223 
3224 template< typename T >
3226  internal_remove_edge( p, s );
3227 }
3228 #endif // __TBB_PREVIEW_ASYNC_MSG
3229 
3230 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3231 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3232 template< typename T, typename V,
3233  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3234 inline void remove_edge( T& output, V& input) {
3235  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3236 }
3237 
3238 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3239 template< typename T, typename R,
3240  typename = typename T::output_ports_type >
3241 inline void remove_edge( T& output, receiver<R>& input) {
3242  remove_edge(get<0>(output.output_ports()), input);
3243 }
3244 //Removes an edge between a sender and port 0 of a multi-input successor.
3245 template< typename S, typename V,
3246  typename = typename V::input_ports_type >
3247 inline void remove_edge( sender<S>& output, V& input) {
3248  remove_edge(output, get<0>(input.input_ports()));
3249 }
3250 #endif
3251 
3252 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3253 template<typename C >
3254 template< typename S >
3255 void internal::edge_container<C>::sender_extract( S &s ) {
3256  edge_list_type e = built_edges;
3257  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3258  remove_edge(s, **i);
3259  }
3260 }
3261 
3262 template<typename C >
3263 template< typename R >
3264 void internal::edge_container<C>::receiver_extract( R &r ) {
3265  edge_list_type e = built_edges;
3266  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3267  remove_edge(**i, r);
3268  }
3269 }
3270 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3271 
3273 template< typename Body, typename Node >
3274 Body copy_body( Node &n ) {
3275  return n.template copy_function_object<Body>();
3276 }
3277 
3278 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3279 
3280 //composite_node
3281 template< typename InputTuple, typename OutputTuple > class composite_node;
3282 
3283 template< typename... InputTypes, typename... OutputTypes>
3284 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3285 
3286 public:
3287  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3288  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3289 
3290 private:
3291  std::unique_ptr<input_ports_type> my_input_ports;
3292  std::unique_ptr<output_ports_type> my_output_ports;
3293 
3294  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3295  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3296 
3297 protected:
3299 
3300 public:
3301 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3302  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3303  tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3305  }
3306 #else
3308  tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3309  }
3310 #endif
3311 
3312  template<typename T1, typename T2>
3313  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3314  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3315  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3316  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3317  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3318 
3321  }
3322 
3323  template< typename... NodeTypes >
3324  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3325 
3326  template< typename... NodeTypes >
3327  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3328 
3329 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3330  void set_name( const char *name ) __TBB_override {
3332  }
3333 #endif
3334 
3336  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3337  return *my_input_ports;
3338  }
3339 
3341  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3342  return *my_output_ports;
3343  }
3344 
3345 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3346  void extract() __TBB_override {
3347  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3348  }
3349 #endif
3350 }; // class composite_node
3351 
3352 //composite_node with only input ports
3353 template< typename... InputTypes>
3354 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3355 public:
3356  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3357 
3358 private:
3359  std::unique_ptr<input_ports_type> my_input_ports;
3360  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3361 
3362 protected:
3364 
3365 public:
3366 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3367  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3368  tbb::internal::fgt_composite( this, &g );
3370  }
3371 #else
3373  tbb::internal::fgt_composite( this, &g );
3374  }
3375 #endif
3376 
3377  template<typename T>
3378  void set_external_ports(T&& input_ports_tuple) {
3379  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3380 
3381  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3382 
3383  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3384  }
3385 
3386  template< typename... NodeTypes >
3387  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3388 
3389  template< typename... NodeTypes >
3390  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3391 
3392 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3393  void set_name( const char *name ) __TBB_override {
3395  }
3396 #endif
3397 
3399  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3400  return *my_input_ports;
3401  }
3402 
3403 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3404  void extract() __TBB_override {
3405  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3406  }
3407 #endif
3408 
3409 }; // class composite_node
3410 
3411 //composite_nodes with only output_ports
3412 template<typename... OutputTypes>
3413 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3414 public:
3415  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3416 
3417 private:
3418  std::unique_ptr<output_ports_type> my_output_ports;
3419  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3420 
3421 protected:
3423 
3424 public:
3425 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3426  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3427  tbb::internal::fgt_composite( this, &g );
3429  }
3430 #else
3432  tbb::internal::fgt_composite( this, &g );
3433  }
3434 #endif
3435 
3436  template<typename T>
3437  void set_external_ports(T&& output_ports_tuple) {
3438  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3439 
3440  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3441 
3442  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3443  }
3444 
3445  template<typename... NodeTypes >
3446  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3447 
3448  template<typename... NodeTypes >
3449  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3450 
3451 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3452  void set_name( const char *name ) __TBB_override {
3454  }
3455 #endif
3456 
3458  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3459  return *my_output_ports;
3460  }
3461 
3462 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3463  void extract() __TBB_override {
3464  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3465  }
3466 #endif
3467 
3468 }; // class composite_node
3469 
3470 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3471 
3472 namespace internal {
3473 
3474 template<typename Gateway>
3476 public:
3477  typedef Gateway gateway_type;
3478 
3479  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3480  void set_gateway(gateway_type *gateway) {
3481  my_gateway = gateway;
3482  }
3483 
3484 protected:
3486 };
3487 
3488 template<typename Input, typename Ports, typename Gateway, typename Body>
3489 class async_body: public async_body_base<Gateway> {
3490 public:
3492  typedef Gateway gateway_type;
3493 
3494  async_body(const Body &body, gateway_type *gateway)
3495  : base_type(gateway), my_body(body) { }
3496 
3497  void operator()( const Input &v, Ports & ) {
3498  my_body(v, *this->my_gateway);
3499  }
3500 
3501  Body get_body() { return my_body; }
3502 
3503 private:
3504  Body my_body;
3505 };
3506 
3507 }
3508 
3510 template < typename Input, typename Output,
3511  typename Policy = queueing_lightweight,
3512  typename Allocator=cache_aligned_allocator<Input> >
3513 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3516 
3517 public:
3518  typedef Input input_type;
3519  typedef Output output_type;
3526 
3527 private:
3531  // TODO: pass value by copy since we do not want to block asynchronous thread.
3532  const Output *value;
3533  bool result;
3534  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3535  void operator()() {
3536  result = port->try_put(*value);
3537  }
3538  };
3539 
3540  class receiver_gateway_impl: public receiver_gateway<Output> {
3541  public:
3542  receiver_gateway_impl(async_node* node): my_node(node) {}
3544  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3545  my_node->my_graph.reserve_wait();
3546  }
3547 
3549  my_node->my_graph.release_wait();
3550  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3551  }
3552 
3554  bool try_put(const Output &i) __TBB_override {
3555  return my_node->try_put_impl(i);
3556  }
3557 
3558  private:
3560  } my_gateway;
3561 
3562  //The substitute of 'this' for member construction, to prevent compiler warnings
3563  async_node* self() { return this; }
3564 
3566  bool try_put_impl(const Output &i) {
3567  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3568  internal::broadcast_cache<output_type>& port_successors = port_0.successors();
3570  task_list tasks;
3571  bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3572  __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3573  "Return status is inconsistent with the method operation." );
3574 
3575  while( !tasks.empty() ) {
3576  internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
3577  }
3578  tbb::internal::fgt_async_try_put_end(this, &port_0);
3579  return is_at_least_one_put_successful;
3580  }
3581 
3582 public:
3583  template<typename Body>
3585  graph &g, size_t concurrency,
3587  ) : base_type(
3588  g, concurrency,
3589  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3590  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3591  tbb::internal::fgt_multioutput_node_with_body<1>(
3592  tbb::internal::FLOW_ASYNC_NODE,
3593  &this->my_graph, static_cast<receiver<input_type> *>(this),
3594  this->output_ports(), this->my_body
3595  );
3596  }
3597 
3598  async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3599  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3600  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3601 
3602  tbb::internal::fgt_multioutput_node_with_body<1>( tbb::internal::FLOW_ASYNC_NODE,
3603  &this->my_graph, static_cast<receiver<input_type> *>(this),
3604  this->output_ports(), this->my_body );
3605  }
3606 
3608  return my_gateway;
3609  }
3610 
3611 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3612  void set_name( const char *name ) __TBB_override {
3614  }
3615 #endif
3616 
3617  // Define sender< Output >
3618 
3621  return internal::output_port<0>(*this).register_successor(r);
3622  }
3623 
3626  return internal::output_port<0>(*this).remove_successor(r);
3627  }
3628 
3629  template<typename Body>
3633  mfn_body_type &body_ref = *this->my_body;
3634  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3635  return ab.get_body();
3636  }
3637 
3638 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3639  typedef typename internal::edge_container<successor_type> built_successors_type;
3641  typedef typename built_successors_type::edge_list_type successor_list_type;
3642  built_successors_type &built_successors() __TBB_override {
3643  return internal::output_port<0>(*this).built_successors();
3644  }
3645 
3646  void internal_add_built_successor( successor_type &r ) __TBB_override {
3647  internal::output_port<0>(*this).internal_add_built_successor(r);
3648  }
3649 
3650  void internal_delete_built_successor( successor_type &r ) __TBB_override {
3651  internal::output_port<0>(*this).internal_delete_built_successor(r);
3652  }
3653 
3654  void copy_successors( successor_list_type &l ) __TBB_override {
3655  internal::output_port<0>(*this).copy_successors(l);
3656  }
3657 
3658  size_t successor_count() __TBB_override {
3659  return internal::output_port<0>(*this).successor_count();
3660  }
3661 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3662 
3663 protected:
3664 
3666  base_type::reset_node(f);
3667  }
3668 };
3669 
3670 #if __TBB_PREVIEW_STREAMING_NODE
3672 #endif // __TBB_PREVIEW_STREAMING_NODE
3673 
3674 } // interfaceX
3675 
3676 
3677 namespace interface10a {
3678 
3679 using namespace interface10;
3680 namespace internal = interface10::internal;
3681 
3682 template< typename T >
3683 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3684 public:
3685  typedef T input_type;
3686  typedef T output_type;
3689 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3690  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
3691  typedef typename sender<output_type>::built_successors_type built_successors_type;
3692  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
3693  typedef typename sender<output_type>::successor_list_type successor_list_type;
3694 #endif
3695 
3696  explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
3697  my_successors.set_owner( this );
3698  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3699  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3700  }
3701 
3704  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
3705  {
3706  my_successors.set_owner( this );
3707  tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3708  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3709  }
3710 
3712 
3713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3714  void set_name( const char *name ) __TBB_override {
3716  }
3717 #endif
3718 
3720  spin_mutex::scoped_lock l( my_mutex );
3721  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
3722  // We have a valid value that must be forwarded immediately.
3723  bool ret = s.try_put( my_buffer );
3724  if ( ret ) {
3725  // We add the successor that accepted our put
3726  my_successors.register_successor( s );
3727  } else {
3728  // In case of reservation a race between the moment of reservation and register_successor can appear,
3729  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3730  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3731  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3732  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
3733  register_predecessor_task( *this, s );
3734  internal::spawn_in_graph_arena( my_graph, *rtask );
3735  }
3736  } else {
3737  // No valid value yet, just add as successor
3738  my_successors.register_successor( s );
3739  }
3740  return true;
3741  }
3742 
3744  spin_mutex::scoped_lock l( my_mutex );
3745  my_successors.remove_successor(s);
3746  return true;
3747  }
3748 
3749 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3750  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
3751  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3752 
3753  void internal_add_built_successor( successor_type &s) __TBB_override {
3754  spin_mutex::scoped_lock l( my_mutex );
3755  my_successors.internal_add_built_successor(s);
3756  }
3757 
3758  void internal_delete_built_successor( successor_type &s) __TBB_override {
3759  spin_mutex::scoped_lock l( my_mutex );
3760  my_successors.internal_delete_built_successor(s);
3761  }
3762 
3763  size_t successor_count() __TBB_override {
3764  spin_mutex::scoped_lock l( my_mutex );
3765  return my_successors.successor_count();
3766  }
3767 
3768  void copy_successors(successor_list_type &v) __TBB_override {
3769  spin_mutex::scoped_lock l( my_mutex );
3770  my_successors.copy_successors(v);
3771  }
3772 
3773  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
3774  spin_mutex::scoped_lock l( my_mutex );
3775  my_built_predecessors.add_edge(p);
3776  }
3777 
3778  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
3779  spin_mutex::scoped_lock l( my_mutex );
3780  my_built_predecessors.delete_edge(p);
3781  }
3782 
3783  size_t predecessor_count() __TBB_override {
3784  spin_mutex::scoped_lock l( my_mutex );
3785  return my_built_predecessors.edge_count();
3786  }
3787 
3788  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
3789  spin_mutex::scoped_lock l( my_mutex );
3790  my_built_predecessors.copy_edges(v);
3791  }
3792 
3793  void extract() __TBB_override {
3794  my_buffer_is_valid = false;
3795  built_successors().sender_extract(*this);
3796  built_predecessors().receiver_extract(*this);
3797  }
3798 
3799 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3800 
3802  spin_mutex::scoped_lock l( my_mutex );
3803  if ( my_buffer_is_valid ) {
3804  v = my_buffer;
3805  return true;
3806  }
3807  return false;
3808  }
3809 
3812  return try_get(v);
3813  }
3814 
3816  bool try_release() __TBB_override { return true; }
3817 
3819  bool try_consume() __TBB_override { return true; }
3820 
3821  bool is_valid() {
3822  spin_mutex::scoped_lock l( my_mutex );
3823  return my_buffer_is_valid;
3824  }
3825 
3826  void clear() {
3827  spin_mutex::scoped_lock l( my_mutex );
3828  my_buffer_is_valid = false;
3829  }
3830 
3831 protected:
3832 
3833  template< typename R, typename B > friend class run_and_put_task;
3834  template<typename X, typename Y> friend class internal::broadcast_cache;
3835  template<typename X, typename Y> friend class internal::round_robin_cache;
3838  return try_put_task_impl(v);
3839  }
3840 
3842  my_buffer = v;
3843  my_buffer_is_valid = true;
3844  task * rtask = my_successors.try_put_task(v);
3845  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3846  return rtask;
3847  }
3848 
3850  return my_graph;
3851  }
3852 
3855 
3857  o(owner), s(succ) {};
3858 
3860  if (!s.register_predecessor(o)) {
3861  o.register_successor(s);
3862  }
3863  return NULL;
3864  }
3865 
3868  };
3869 
3872 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3873  internal::edge_container<predecessor_type> my_built_predecessors;
3874 #endif
3878 
3880  my_buffer_is_valid = false;
3881  if (f&rf_clear_edges) {
3882  my_successors.clear();
3883  }
3884  }
3885 }; // overwrite_node
3886 
3887 template< typename T >
3888 class write_once_node : public overwrite_node<T> {
3889 public:
3890  typedef T input_type;
3891  typedef T output_type;
3895 
3897  explicit write_once_node(graph& g) : base_type(g) {
3898  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3899  static_cast<receiver<input_type> *>(this),
3900  static_cast<sender<output_type> *>(this) );
3901  }
3902 
3905  tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3906  static_cast<receiver<input_type> *>(this),
3907  static_cast<sender<output_type> *>(this) );
3908  }
3909 
3910 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3911  void set_name( const char *name ) __TBB_override {
3913  }
3914 #endif
3915 
3916 protected:
3917  template< typename R, typename B > friend class run_and_put_task;
3918  template<typename X, typename Y> friend class internal::broadcast_cache;
3919  template<typename X, typename Y> friend class internal::round_robin_cache;
3921  spin_mutex::scoped_lock l( this->my_mutex );
3922  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3923  }
3924 };
3925 } // interfaceX
3926 
3931 
3932  using interface10::graph;
3933  using interface10::graph_node;
3934  using interface10::continue_msg;
3935 
3936  using interface10::source_node;
3937  using interface10::function_node;
3938  using interface10::multifunction_node;
3939  using interface10::split_node;
3941  using interface10::indexer_node;
3942  using interface10::internal::tagged_msg;
3945  using interface10::continue_node;
3946  using interface10a::overwrite_node;
3947  using interface10a::write_once_node;
3948  using interface10::broadcast_node;
3949  using interface10::buffer_node;
3950  using interface10::queue_node;
3951  using interface10::sequencer_node;
3952  using interface10::priority_queue_node;
3953  using interface11::limiter_node;
3954  using namespace interface10::internal::graph_policy_namespace;
3955  using interface10::join_node;
3957  using interface10::copy_body;
3958  using interface10::make_edge;
3961 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3962  using interface10::composite_node;
3963 #endif
3964  using interface10::async_node;
3965 #if __TBB_PREVIEW_ASYNC_MSG
3966  using interface10::async_msg;
3967 #endif
3968 #if __TBB_PREVIEW_STREAMING_NODE
3969  using interface10::port_ref;
3970  using interface10::streaming_node;
3971 #endif // __TBB_PREVIEW_STREAMING_NODE
3972 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3974  using internal::no_priority;
3975 #endif
3976 
3977 
3978 } // flow
3979 } // tbb
3980 
3981 #undef __TBB_PFG_RESET_ARG
3982 #undef __TBB_COMMA
3983 
3984 #endif // __TBB_flow_graph_H
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3051
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:1830
friend class scoped_lock
Definition: spin_mutex.h:176
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3052
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:644
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3128
split_node(const split_node &other)
Definition: flow_graph.h:1282
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2010
~graph()
Destroys the graph.
Definition: flow_graph.h:765
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:3523
receiver< input_type > receiver_type
Definition: flow_graph.h:3520
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1213
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:301
Class for determining type of std::allocator<T>::value_type.
Definition: tbb_stddef.h:450
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3024
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:298
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1128
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2050
async_body_base< Gateway > base_type
Definition: flow_graph.h:3491
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2682
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2970
void prepare_task_arena(bool reinit=false)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3079
Forward declaration section.
Definition: flow_graph.h:95
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1120
receiver< TupleType > base_type
Definition: flow_graph.h:1260
#define __TBB_override
Definition: tbb_stddef.h:240
write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:3904
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:3497
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:3859
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:2808
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:841
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2678
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1343
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:349
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1526
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
unsigned int node_priority_t
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3625
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1344
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:2562
static void fgt_end_body(void *)
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1398
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3107
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:3534
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1528
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:2791
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:1867
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:1697
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1124
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:2545
continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1372
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:411
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:405
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3894
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2123
A cache of successors that are broadcast to.
Definition: flow_graph.h:108
static void fgt_node_with_body(string_index, void *, void *, void *)
Forwards messages in arbitrary order.
Definition: flow_graph.h:1541
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:1602
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:1760
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3274
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:3854
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2945
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:100
source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:893
base_type::size_type size_type
Definition: flow_graph.h:2039
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
Definition: flow_graph.h:1302
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1664
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1258
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3743
static void fgt_make_edge(void *, void *)
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:3836
void const char const char int ITT_FORMAT __itt_group_sync s
static void fgt_graph_desc(void *, const char *)
graph_iterator< const graph, const graph_node > const_iterator
The base of all graph nodes.
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:972
broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1434
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1176
graph & graph_reference() __TBB_override
Definition: flow_graph.h:2676
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1302
T input_type
The input type of this receiver.
Definition: flow_graph.h:439
graph_iterator< graph, graph_node > iterator
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:3920
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1361
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2269
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:322
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:249
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3719
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:596
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:364
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3620
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3877
multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1220
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:110
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:1944
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:670
Forwards messages in FIFO order.
Definition: flow_graph.h:2036
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1346
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:3524
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:408
The graph class.
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:780
K key_from_message(const T &t)
Definition: flow_graph.h:693
function_node(graph &g, size_t concurrency,)
Constructor.
Definition: flow_graph.h:1140
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2219
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:955
priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2205
task that does nothing. Useful for synchronization.
Definition: task.h:964
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:283
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:3529
buffer_node< T, A >::buffer_operation prio_operation
Definition: flow_graph.h:2226
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3688
task * try_put_task(const input_type &) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:644
static void fgt_async_commit(void *, void *)
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:2649
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:217
static void fgt_multioutput_node_desc(const NodeType *, const char *)
Used to form groups of tasks.
Definition: task.h:332
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:532
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3522
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3665
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:421
Base class for receivers of completion messages.
Definition: flow_graph.h:571
static void fgt_async_try_put_end(void *, void *)
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2224
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2257
continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1350
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:2894
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1096
Input and scheduling for a function node that takes a type Input as input.
Definition: flow_graph.h:61
iterator begin()
start iterator
Definition: flow_graph.h:833
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:316
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2129
static tbb::task *const SUCCESSFULLY_ENQUEUED
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:1516
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:3521
The two-phase join port.
bool gather_successful_try_puts(const X &t, task_list &tasks)
Definition: flow_graph.h:511
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1104
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2159
sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2134
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:773
indexer_node(const indexer_node &other)
Definition: flow_graph.h:2876
static void fgt_reserve_wait(void *)
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2163
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:201
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1212
buffer_node< T, A >::buffer_operation sequencer_operation
Definition: flow_graph.h:2160
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:3687
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
Definition: flow_graph.h:1216
void enqueue_in_graph_arena(graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:2869
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:3801
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:29
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
A list of children.
Definition: task.h:995
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2996
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:914
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2413
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:2799
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:3841
static void fgt_node(string_index, void *, void *)
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1545
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:2786
An abstract cache of successors.
Definition: flow_graph.h:107
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:2534
Forwards messages of type T to all successors.
Definition: flow_graph.h:1409
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:1816
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3554
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2895
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2229
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:96
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1009
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1056
queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2095
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1194
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2427
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1502
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:1987
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:3514
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:3893
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:361
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:658
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:685
pointer operator->() const
Dereference.
Definition: flow_graph.h:733
tbb::task_group_context * my_context
continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:581
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2429
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:342
A lock that occupies a single byte.
Definition: spin_mutex.h:36
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:2969
static void fgt_async_reserve(void *, void *)
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:3525
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1275
const V & cast_to(T const &t)
Definition: flow_graph.h:715
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:203
output_ports_type & output_ports()
Definition: flow_graph.h:1294
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3080
void remove_node(graph_node *n)
Definition: flow_graph.h:798
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
Forwards messages in priority order.
Definition: flow_graph.h:2188
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:758
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:1772
task & pop_front()
Pop the front task from the list.
Definition: task.h:1030
Implements methods for both executable and function nodes that puts Output to its successors.
Definition: flow_graph.h:779
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1413
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1036
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:2835
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:2944
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:999
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:109
bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:606
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:3811
const_iterator cend() const
end const iterator
Definition: flow_graph.h:843
graph & graph_reference() __TBB_override
Definition: flow_graph.h:2006
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:2525
void set_ref_count(int count)
Set reference count.
Definition: task.h:731
reference operator*() const
Dereference.
Definition: flow_graph.h:727
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1449
graph & graph_reference() __TBB_override
Definition: flow_graph.h:1309
async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:3584
Enables one or the other code branches.
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:3819
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1338
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:1611
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2061
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1057
iterator end()
end iterator
Definition: flow_graph.h:835
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1252
continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:589
static const node_priority_t no_priority
Base class for user-defined tasks.
Definition: task.h:589
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2920
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2130
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1988
interface10::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:88
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
void reset(reset_flags f=rf_reset_protocol)
Definition: flow_graph.h:810
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1546
static void fgt_begin_body(void *)
sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2142
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3143
static void fgt_composite(void *, void *)
Detects whether two given types are the same.
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1400
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2074
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2083
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:3515
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:633
A task that calls a node's forward_task function.
Definition: flow_graph.h:271
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:578
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1345
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2414
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:3494
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:1777
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:444
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:2626
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2040
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1126
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:209
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1660
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
Definition: flow_graph.h:1500
static void fgt_release_wait(void *)
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2021
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2065
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:193
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:111
graph & graph_reference() __TBB_override
Definition: flow_graph.h:3849
overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:3703
Forwards messages in sequence order.
Definition: flow_graph.h:2122
buffer_node< T, A >::item_type item_type
Definition: flow_graph.h:2225
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:2995
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1297
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:2781
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2195
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3191
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:171
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1058
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1974
internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:402
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:1825
virtual bool try_get_wrapper(void *p, bool is_async)=0
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:2713
multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1231
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1414
static void fgt_graph(void *)
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1064
function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1149
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2275
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3566
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:869
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:3856
static void fgt_node_desc(const NodeType *, const char *)
A task that calls a node's apply_body_bypass function with no input.
Definition: flow_graph.h:321
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:741
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:421
internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:441
continue_msg input_type
The input type.
Definition: flow_graph.h:575
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:3816
static void fgt_multiinput_multioutput_node(string_index, void *, void *)
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2237
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2092
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1308
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2102
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2294
Implements async node.
Definition: flow_graph.h:3513
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:879
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1955
buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:1846
internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:42
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:2919
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:92
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3023
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:453
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1127
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:863
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:2826
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:1807
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:3548
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
async_node(const async_node &other)
Definition: flow_graph.h:3598
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1178
void const char const char int ITT_FORMAT __itt_group_sync p
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3106
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:678
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:2870
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2115
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:2766
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1125
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:565
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:2638
task * decrement_counter(long long delta)
Definition: flow_graph.h:2500
void register_node(graph_node *n)
Definition: flow_graph.h:787
indexer_node(const indexer_node &other)
Definition: flow_graph.h:3114
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3108
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2243
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1214
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:2817
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1965
Base class for tasks generated by graph nodes.
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2091
T output_type
The output type of this sender.
Definition: flow_graph.h:400
static void fgt_async_try_put_begin(void *, void *)
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:743
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3207
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:3871
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:84
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:866
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:2738
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1455
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
virtual void finalize() const
Definition: flow_graph.h:147
Represents acquisition of a mutex.
Definition: spin_mutex.h:50
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2233
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:3543
static void fgt_remove_edge(void *, void *)
A generic null type.
Definition: flow_graph.h:89
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3879
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2194
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:989
source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:881
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:2579

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.