00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef _SessionImpl_
00023 #define _SessionImpl_
00024
00025 #include "Demux.h"
00026 #include "Execution.h"
00027 #include "Results.h"
00028
00029 #include "qpid/SessionId.h"
00030 #include "qpid/SessionState.h"
00031 #include "boost/shared_ptr.hpp"
00032 #include "boost/weak_ptr.hpp"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/ChannelHandler.h"
00035 #include "qpid/framing/SequenceNumber.h"
00036 #include "qpid/framing/AMQP_ClientOperations.h"
00037 #include "qpid/framing/AMQP_ServerProxy.h"
00038 #include "qpid/sys/Semaphore.h"
00039 #include "qpid/sys/StateMonitor.h"
00040 #include "qpid/sys/ExceptionHolder.h"
00041
00042 #include <boost/optional.hpp>
00043
00044 namespace qpid {
00045
00046 namespace framing {
00047
00048 class FrameSet;
00049 class MethodContent;
00050 class SequenceSet;
00051
00052 }
00053
00054 namespace client {
00055
00056 class Future;
00057 class ConnectionImpl;
00058 class SessionHandler;
00059
00061 class SessionImpl : public framing::FrameHandler::InOutHandler,
00062 public Execution,
00063 private framing::AMQP_ClientOperations::SessionHandler,
00064 private framing::AMQP_ClientOperations::ExecutionHandler,
00065 private framing::AMQP_ClientOperations::MessageHandler
00066 {
00067 public:
00068 SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
00069 ~SessionImpl();
00070
00071
00072
00073 framing::FrameSet::shared_ptr get();
00074
00075 const SessionId getId() const;
00076
00077 uint16_t getChannel() const;
00078 void setChannel(uint16_t channel);
00079
00080 void open(uint32_t detachedLifetime);
00081 void close();
00082 void resume(shared_ptr<ConnectionImpl>);
00083 void suspend();
00084
00085 void assertOpen() const;
00086
00087 Future send(const framing::AMQBody& command);
00088 Future send(const framing::AMQBody& command, const framing::MethodContent& content);
00089 Future send(const framing::AMQBody& command, const framing::FrameSet& content);
00090 void sendRawFrame(framing::AMQFrame& frame);
00091
00092 Demux& getDemux();
00093 void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
00094 void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
00095 bool isComplete(const framing::SequenceNumber& id);
00096 bool isCompleteUpTo(const framing::SequenceNumber& id);
00097 void waitForCompletion(const framing::SequenceNumber& id);
00098 void sendCompletion();
00099 void sendFlush();
00100
00101 void setException(const sys::ExceptionHolder&);
00102
00103
00104 void connectionClosed(uint16_t code, const std::string& text);
00105 void connectionBroke(const std::string& text);
00106
00108 uint32_t setTimeout(uint32_t requestedSeconds);
00109
00111 uint32_t getTimeout() const;
00112
00116 void setWeakPtr(bool weak=true);
00117
00121 shared_ptr<ConnectionImpl> getConnection();
00122
00123 private:
00124 enum State {
00125 INACTIVE,
00126 ATTACHING,
00127 ATTACHED,
00128 DETACHING,
00129 DETACHED
00130 };
00131 typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
00132 typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
00133 typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
00134 typedef sys::StateMonitor<State, DETACHED> StateMonitor;
00135 typedef StateMonitor::Set States;
00136
00137 inline void setState(State s);
00138 inline void waitFor(State);
00139
00140 void setExceptionLH(const sys::ExceptionHolder&);
00141 void detach();
00142
00143 void check() const;
00144 void checkOpen() const;
00145 void handleClosed();
00146
00147 void handleIn(framing::AMQFrame& frame);
00148 void handleOut(framing::AMQFrame& frame);
00155 void proxyOut(framing::AMQFrame& frame);
00156 void sendFrame(framing::AMQFrame& frame, bool canBlock);
00157 void deliver(framing::AMQFrame& frame);
00158
00159 Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
00160 void sendContent(const framing::MethodContent&);
00161 void waitForCompletionImpl(const framing::SequenceNumber& id);
00162
00163 void sendCompletionImpl();
00164
00165
00166
00167 void attach(const std::string& name, bool force);
00168 void attached(const std::string& name);
00169 void detach(const std::string& name);
00170 void detached(const std::string& name, uint8_t detachCode);
00171 void requestTimeout(uint32_t timeout);
00172 void timeout(uint32_t timeout);
00173 void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
00174 void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
00175 void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
00176 void completed(const framing::SequenceSet& commands, bool timelyReply);
00177 void knownCompleted(const framing::SequenceSet& commands);
00178 void flush(bool expected, bool confirmed, bool completed);
00179 void gap(const framing::SequenceSet& commands);
00180
00181
00182
00183 void sync();
00184 void result(const framing::SequenceNumber& commandId, const std::string& value);
00185 void exception(uint16_t errorCode,
00186 const framing::SequenceNumber& commandId,
00187 uint8_t classCode,
00188 uint8_t commandCode,
00189 uint8_t fieldIndex,
00190 const std::string& description,
00191 const framing::FieldTable& errorInfo);
00192
00193
00194
00195
00196 void accept(const qpid::framing::SequenceSet&);
00197 void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
00198 void release(const qpid::framing::SequenceSet&, bool);
00199 qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
00200 void setFlowMode(const std::string&, uint8_t);
00201 void flow(const std::string&, uint8_t, uint32_t);
00202 void stop(const std::string&);
00203
00204
00205 sys::ExceptionHolder exceptionHolder;
00206 mutable StateMonitor state;
00207 mutable sys::Semaphore sendLock;
00208 uint32_t detachedLifetime;
00209 const uint64_t maxFrameSize;
00210 const SessionId id;
00211
00212 shared_ptr<ConnectionImpl> connectionShared;
00213 boost::weak_ptr<ConnectionImpl> connectionWeak;
00214 bool weakPtr;
00215
00216 framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
00217 framing::ChannelHandler channel;
00218 framing::AMQP_ServerProxy::Session proxy;
00219
00220 Results results;
00221 Demux demux;
00222 framing::FrameSet::shared_ptr arriving;
00223
00224 framing::SequenceSet incompleteIn;
00225 framing::SequenceSet completedIn;
00226 framing::SequenceSet incompleteOut;
00227 framing::SequenceSet completedOut;
00228 framing::SequenceNumber nextIn;
00229 framing::SequenceNumber nextOut;
00230
00231 SessionState sessionState;
00232
00233
00234 sys::Semaphore* sendMsgCredit;
00235
00236 friend class client::SessionHandler;
00237 };
00238
00239 }}
00240
00241 #endif