00001 #ifndef QPID_SYS_POLLABLEQUEUE_H
00002 #define QPID_SYS_POLLABLEQUEUE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "qpid/sys/PollableCondition.h"
00026 #include "qpid/sys/Dispatcher.h"
00027 #include "qpid/sys/Monitor.h"
00028 #include <boost/function.hpp>
00029 #include <boost/bind.hpp>
00030 #include <algorithm>
00031 #include <deque>
00032
00033 namespace qpid {
00034 namespace sys {
00035
00036 class Poller;
00037
00043 template <class T>
00044 class PollableQueue {
00045 public:
00047 typedef boost::function<void (const T&)> Callback;
00048
00050 PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
00051
00052 ~PollableQueue();
00053
00055 void push(const T& t);
00056
00058 void start();
00059
00061 void stop();
00062
00064 bool isStopped() const { ScopedLock l(lock); return stopped; }
00065
00066 size_t size() { ScopedLock l(lock); return queue.size(); }
00067 bool empty() { ScopedLock l(lock); return queue.empty(); }
00068 private:
00069 typedef std::deque<T> Queue;
00070 typedef sys::Monitor::ScopedLock ScopedLock;
00071 typedef sys::Monitor::ScopedUnlock ScopedUnlock;
00072
00073 void dispatch(sys::DispatchHandle&);
00074
00075 mutable sys::Monitor lock;
00076 Callback callback;
00077 boost::shared_ptr<sys::Poller> poller;
00078 PollableCondition condition;
00079 DispatchHandle handle;
00080 Queue queue;
00081 Thread dispatcher;
00082 bool stopped;
00083 };
00084
00085 template <class T> PollableQueue<T>::PollableQueue(
00086 const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
00087 : callback(cb), poller(p),
00088 handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true)
00089 {
00090 handle.startWatch(poller);
00091 handle.unwatch();
00092 }
00093
00094 template <class T> void PollableQueue<T>::start() {
00095 ScopedLock l(lock);
00096 assert(stopped);
00097 stopped = false;
00098 if (!queue.empty()) condition.set();
00099 handle.rewatch();
00100 }
00101
00102 template <class T> PollableQueue<T>::~PollableQueue() {
00103 handle.stopWatch();
00104 }
00105
00106 template <class T> void PollableQueue<T>::push(const T& t) {
00107 ScopedLock l(lock);
00108 if (queue.empty()) condition.set();
00109 queue.push_back(t);
00110 }
00111
00112 template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
00113 ScopedLock l(lock);
00114 assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
00115 dispatcher = Thread::current();
00116 while (!stopped && !queue.empty()) {
00117 T value = queue.front();
00118 queue.pop_front();
00119 {
00120 ScopedUnlock u(lock);
00121 callback(value);
00122 }
00123 }
00124 if (queue.empty()) condition.clear();
00125 if (stopped) lock.notifyAll();
00126 dispatcher = Thread();
00127 if (!stopped) h.rewatch();
00128 }
00129
00130 template <class T> void PollableQueue<T>::stop() {
00131 ScopedLock l(lock);
00132 assert(!stopped);
00133 handle.unwatch();
00134 stopped = true;
00135
00136 while (dispatcher.id() && dispatcher.id() != Thread::current().id())
00137 lock.wait();
00138 }
00139
00140 }}
00141
00142 #endif