00001 #ifndef QPID_BROKER_SEMANTICSTATE_H
00002 #define QPID_BROKER_SEMANTICSTATE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "Consumer.h"
00026 #include "Deliverable.h"
00027 #include "DeliveryAdapter.h"
00028 #include "DeliveryRecord.h"
00029 #include "DeliveryToken.h"
00030 #include "DtxBuffer.h"
00031 #include "DtxManager.h"
00032 #include "NameGenerator.h"
00033 #include "TxBuffer.h"
00034
00035 #include "qpid/framing/FrameHandler.h"
00036 #include "qpid/framing/SequenceSet.h"
00037 #include "qpid/framing/Uuid.h"
00038 #include "qpid/sys/AggregateOutput.h"
00039 #include "qpid/sys/Mutex.h"
00040 #include "qpid/shared_ptr.h"
00041 #include "AclModule.h"
00042
00043 #include <list>
00044 #include <map>
00045 #include <vector>
00046
00047 #include <boost/intrusive_ptr.hpp>
00048 #include <boost/cast.hpp>
00049
00050 namespace qpid {
00051 namespace broker {
00052
00053 class SessionContext;
00054
00059 class SemanticState : public sys::OutputTask,
00060 private boost::noncopyable
00061 {
00062 public:
00063 class ConsumerImpl : public Consumer, public sys::OutputTask,
00064 public boost::enable_shared_from_this<ConsumerImpl>
00065 {
00066 qpid::sys::Mutex lock;
00067 SemanticState* const parent;
00068 const DeliveryToken::shared_ptr token;
00069 const string name;
00070 const Queue::shared_ptr queue;
00071 const bool ackExpected;
00072 const bool nolocal;
00073 const bool acquire;
00074 bool blocked;
00075 bool windowing;
00076 uint32_t msgCredit;
00077 uint32_t byteCredit;
00078 bool notifyEnabled;
00079
00080 bool checkCredit(boost::intrusive_ptr<Message>& msg);
00081 void allocateCredit(boost::intrusive_ptr<Message>& msg);
00082
00083 public:
00084 typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
00085
00086 ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
00087 const string& name, Queue::shared_ptr queue,
00088 bool ack, bool nolocal, bool acquire);
00089 ~ConsumerImpl();
00090 OwnershipToken* getSession();
00091 bool deliver(QueuedMessage& msg);
00092 bool filter(boost::intrusive_ptr<Message> msg);
00093 bool accept(boost::intrusive_ptr<Message> msg);
00094
00095 void disableNotify();
00096 void enableNotify();
00097 void notify();
00098
00099 void setWindowMode();
00100 void setCreditMode();
00101 void addByteCredit(uint32_t value);
00102 void addMessageCredit(uint32_t value);
00103 void flush();
00104 void stop();
00105 void complete(DeliveryRecord&);
00106 Queue::shared_ptr getQueue() { return queue; }
00107 bool isBlocked() const { return blocked; }
00108
00109 bool hasOutput();
00110 bool doOutput();
00111
00112 std::string getName() const { return name; }
00113
00114 bool isAckExpected() const { return ackExpected; }
00115 bool isAcquire() const { return acquire; }
00116 bool isWindowing() const { return windowing; }
00117 uint32_t getMsgCredit() const { return msgCredit; }
00118 uint32_t getByteCredit() const { return byteCredit; }
00119 };
00120
00121 private:
00122 typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
00123 typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
00124
00125 SessionContext& session;
00126 DeliveryAdapter& deliveryAdapter;
00127 ConsumerImplMap consumers;
00128 NameGenerator tagGenerator;
00129 std::list<DeliveryRecord> unacked;
00130 TxBuffer::shared_ptr txBuffer;
00131 DtxBuffer::shared_ptr dtxBuffer;
00132 bool dtxSelected;
00133 DtxBufferMap suspendedXids;
00134 framing::SequenceSet accumulatedAck;
00135 boost::shared_ptr<Exchange> cacheExchange;
00136 sys::AggregateOutput outputTasks;
00137 AclModule* acl;
00138 const bool authMsg;
00139 const string userID;
00140
00141 void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
00142 void record(const DeliveryRecord& delivery);
00143 void checkDtxTimeout();
00144 ConsumerImpl& find(const std::string& destination);
00145 void complete(DeliveryRecord&);
00146 AckRange findRange(DeliveryId first, DeliveryId last);
00147 void requestDispatch();
00148 void requestDispatch(ConsumerImpl&);
00149 void cancel(ConsumerImpl::shared_ptr);
00150
00151 public:
00152 SemanticState(DeliveryAdapter&, SessionContext&);
00153 ~SemanticState();
00154
00155 SessionContext& getSession() { return session; }
00156
00163 Queue::shared_ptr getQueue(const std::string& name) const;
00164
00165 bool exists(const string& consumerTag);
00166
00170 void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
00171 bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0);
00172
00173 void cancel(const string& tag);
00174
00175 void setWindowMode(const std::string& destination);
00176 void setCreditMode(const std::string& destination);
00177 void addByteCredit(const std::string& destination, uint32_t value);
00178 void addMessageCredit(const std::string& destination, uint32_t value);
00179 void flush(const std::string& destination);
00180 void stop(const std::string& destination);
00181
00182 void startTx();
00183 void commit(MessageStore* const store);
00184 void rollback();
00185 void selectDtx();
00186 void startDtx(const std::string& xid, DtxManager& mgr, bool join);
00187 void endDtx(const std::string& xid, bool fail);
00188 void suspendDtx(const std::string& xid);
00189 void resumeDtx(const std::string& xid);
00190 void recover(bool requeue);
00191 DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
00192 void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
00193 void release(DeliveryId first, DeliveryId last, bool setRedelivered);
00194 void reject(DeliveryId first, DeliveryId last);
00195 void handle(boost::intrusive_ptr<Message> msg);
00196 bool hasOutput() { return outputTasks.hasOutput(); }
00197 bool doOutput() { return outputTasks.doOutput(); }
00198
00199
00200 void completed(DeliveryId deliveryTag, DeliveryId endTag);
00201 void accepted(DeliveryId deliveryTag, DeliveryId endTag);
00202
00203 void attached();
00204 void detached();
00205
00206 static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
00207 template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
00208 };
00209
00210 }}
00211
00212
00213
00214
00215 #endif