00001 #ifndef QPID_CLIENT_SUBSCRIPTIONMANAGER_H
00002 #define QPID_CLIENT_SUBSCRIPTIONMANAGER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "qpid/sys/Mutex.h"
00025 #include <qpid/client/Dispatcher.h>
00026 #include <qpid/client/Completion.h>
00027 #include <qpid/client/Session.h>
00028 #include <qpid/client/MessageListener.h>
00029 #include <qpid/client/LocalQueue.h>
00030 #include <qpid/client/FlowControl.h>
00031 #include <qpid/sys/Runnable.h>
00032 #include <set>
00033 #include <sstream>
00034
00035 namespace qpid {
00036 namespace client {
00037
00046 class SubscriptionManager : public sys::Runnable
00047 {
00048 typedef sys::Mutex::ScopedLock Lock;
00049 typedef sys::Mutex::ScopedUnlock Unlock;
00050
00051 void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
00052
00053 qpid::client::Dispatcher dispatcher;
00054 qpid::client::AsyncSession session;
00055 FlowControl flowControl;
00056 AckPolicy autoAck;
00057 bool acceptMode;
00058 bool acquireMode;
00059 bool autoStop;
00060
00061 public:
00063 SubscriptionManager(const Session& session);
00064
00077 void subscribe(MessageListener& listener,
00078 const std::string& queue,
00079 const FlowControl& flow,
00080 const std::string& tag=std::string());
00081
00092 void subscribe(LocalQueue& localQueue,
00093 const std::string& queue,
00094 const FlowControl& flow,
00095 const std::string& tag=std::string());
00096
00108 void subscribe(MessageListener& listener,
00109 const std::string& queue,
00110 const std::string& tag=std::string());
00111
00121 void subscribe(LocalQueue& localQueue,
00122 const std::string& queue,
00123 const std::string& tag=std::string());
00124
00125
00132 bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
00133
00135 void cancel(const std::string tag);
00136
00141 void run();
00142
00147 void start();
00148
00153 void setAutoStop(bool set=true);
00154
00156 void stop();
00157
00158 static const uint32_t UNLIMITED=0xFFFFFFFF;
00159
00161 void setFlowControl(const std::string& destintion, const FlowControl& flow);
00162
00164 void setFlowControl(const FlowControl& flow);
00165
00167 const FlowControl& getFlowControl() const;
00168
00175 void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true);
00176
00182 void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
00183
00188 void setAcceptMode(bool required);
00189
00195 void setAcquireMode(bool acquire);
00196
00200 void setAckPolicy(const AckPolicy& autoAck);
00201
00202 AckPolicy& getAckPolicy();
00203
00204 void registerFailoverHandler ( boost::function<void ()> fh );
00205
00206 Session getSession() const;
00207 };
00208
00210 class AutoCancel {
00211 public:
00212 AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
00213 ~AutoCancel() { sm.cancel(tag); }
00214 private:
00215 SubscriptionManager& sm;
00216 std::string tag;
00217 };
00218
00219 }}
00220
00221 #endif