00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef _SessionImpl_
00023 #define _SessionImpl_
00024
00025 #include "Demux.h"
00026 #include "Execution.h"
00027 #include "Results.h"
00028
00029 #include "qpid/SessionId.h"
00030 #include "qpid/SessionState.h"
00031 #include "boost/shared_ptr.hpp"
00032 #include "boost/weak_ptr.hpp"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/ChannelHandler.h"
00035 #include "qpid/framing/SequenceNumber.h"
00036 #include "qpid/framing/AMQP_ClientOperations.h"
00037 #include "qpid/framing/AMQP_ServerProxy.h"
00038 #include "qpid/sys/Semaphore.h"
00039 #include "qpid/sys/StateMonitor.h"
00040 #include "qpid/sys/ExceptionHolder.h"
00041
00042 #include <boost/optional.hpp>
00043
00044 namespace qpid {
00045
00046 namespace framing {
00047
00048 class FrameSet;
00049 class MethodContent;
00050 class SequenceSet;
00051
00052 }
00053
00054 namespace client {
00055
00056 class Future;
00057 class ConnectionImpl;
00058 class SessionHandler;
00059
00061 class SessionImpl : public framing::FrameHandler::InOutHandler,
00062 public Execution,
00063 private framing::AMQP_ClientOperations::SessionHandler,
00064 private framing::AMQP_ClientOperations::ExecutionHandler
00065 {
00066 public:
00067 SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
00068 ~SessionImpl();
00069
00070
00071
00072 framing::FrameSet::shared_ptr get();
00073
00074 const SessionId getId() const;
00075
00076 uint16_t getChannel() const;
00077 void setChannel(uint16_t channel);
00078
00079 void open(uint32_t detachedLifetime);
00080 void close();
00081 void resume(shared_ptr<ConnectionImpl>);
00082 void suspend();
00083
00084 void assertOpen() const;
00085
00086 Future send(const framing::AMQBody& command);
00087 Future send(const framing::AMQBody& command, const framing::MethodContent& content);
00088 Future send(const framing::AMQBody& command, const framing::FrameSet& content);
00089
00090 Demux& getDemux();
00091 void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
00092 bool isComplete(const framing::SequenceNumber& id);
00093 bool isCompleteUpTo(const framing::SequenceNumber& id);
00094 void waitForCompletion(const framing::SequenceNumber& id);
00095 void sendCompletion();
00096 void sendFlush();
00097
00098 void setException(const sys::ExceptionHolder&);
00099
00100
00101 void connectionClosed(uint16_t code, const std::string& text);
00102 void connectionBroke(uint16_t code, const std::string& text);
00103
00105 uint32_t setTimeout(uint32_t requestedSeconds);
00106
00108 uint32_t getTimeout() const;
00109
00113 void setWeakPtr(bool weak=true);
00114
00115 private:
00116 enum State {
00117 INACTIVE,
00118 ATTACHING,
00119 ATTACHED,
00120 DETACHING,
00121 DETACHED
00122 };
00123 typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
00124 typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
00125 typedef sys::StateMonitor<State, DETACHED> StateMonitor;
00126 typedef StateMonitor::Set States;
00127
00128 inline void setState(State s);
00129 inline void waitFor(State);
00130
00131 void setExceptionLH(const sys::ExceptionHolder&);
00132 void detach();
00133
00134 void check() const;
00135 void checkOpen() const;
00136 void handleClosed();
00137
00138 void handleIn(framing::AMQFrame& frame);
00139 void handleOut(framing::AMQFrame& frame);
00146 void proxyOut(framing::AMQFrame& frame);
00147 void sendFrame(framing::AMQFrame& frame, bool canBlock);
00148 void deliver(framing::AMQFrame& frame);
00149
00150 Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
00151 void sendContent(const framing::MethodContent&);
00152 void waitForCompletionImpl(const framing::SequenceNumber& id);
00153
00154 void sendCompletionImpl();
00155
00156
00157
00158 void attach(const std::string& name, bool force);
00159 void attached(const std::string& name);
00160 void detach(const std::string& name);
00161 void detached(const std::string& name, uint8_t detachCode);
00162 void requestTimeout(uint32_t timeout);
00163 void timeout(uint32_t timeout);
00164 void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
00165 void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
00166 void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
00167 void completed(const framing::SequenceSet& commands, bool timelyReply);
00168 void knownCompleted(const framing::SequenceSet& commands);
00169 void flush(bool expected, bool confirmed, bool completed);
00170 void gap(const framing::SequenceSet& commands);
00171
00172
00173
00174 void sync();
00175 void result(const framing::SequenceNumber& commandId, const std::string& value);
00176 void exception(uint16_t errorCode,
00177 const framing::SequenceNumber& commandId,
00178 uint8_t classCode,
00179 uint8_t commandCode,
00180 uint8_t fieldIndex,
00181 const std::string& description,
00182 const framing::FieldTable& errorInfo);
00183
00184 sys::ExceptionHolder exceptionHolder;
00185 mutable StateMonitor state;
00186 mutable sys::Semaphore sendLock;
00187 uint32_t detachedLifetime;
00188 const uint64_t maxFrameSize;
00189 const SessionId id;
00190
00191 shared_ptr<ConnectionImpl> connection();
00192 shared_ptr<ConnectionImpl> connectionShared;
00193 boost::weak_ptr<ConnectionImpl> connectionWeak;
00194 bool weakPtr;
00195
00196 framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
00197 framing::ChannelHandler channel;
00198 framing::AMQP_ServerProxy::Session proxy;
00199
00200 Results results;
00201 Demux demux;
00202 framing::FrameSet::shared_ptr arriving;
00203
00204 framing::SequenceSet incompleteIn;
00205 framing::SequenceSet completedIn;
00206 framing::SequenceSet incompleteOut;
00207 framing::SequenceSet completedOut;
00208 framing::SequenceNumber nextIn;
00209 framing::SequenceNumber nextOut;
00210
00211 SessionState sessionState;
00212
00213 friend class client::SessionHandler;
00214 };
00215
00216 }}
00217
00218 #endif