00001 #ifndef _sys_AsynchIO
00002 #define _sys_AsynchIO
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "Dispatcher.h"
00025
00026 #include <boost/function.hpp>
00027 #include <deque>
00028
00029 namespace qpid {
00030 namespace sys {
00031
00032 class Socket;
00033
00034
00035
00036
00037
00038 class AsynchAcceptor {
00039 public:
00040 typedef boost::function1<void, const Socket&> Callback;
00041
00042 private:
00043 Callback acceptedCallback;
00044 DispatchHandle handle;
00045 const Socket& socket;
00046
00047 public:
00048 AsynchAcceptor(const Socket& s, Callback callback);
00049 void start(Poller::shared_ptr poller);
00050
00051 private:
00052 void readable(DispatchHandle& handle);
00053 };
00054
00055
00056
00057
00058
00059 class AsynchConnector : private DispatchHandle {
00060 public:
00061 typedef boost::function1<void, const Socket&> ConnectedCallback;
00062 typedef boost::function2<void, int, std::string> FailedCallback;
00063
00064 private:
00065 ConnectedCallback connCallback;
00066 FailedCallback failCallback;
00067 const Socket& socket;
00068
00069 public:
00070 AsynchConnector(const Socket& socket,
00071 Poller::shared_ptr poller,
00072 std::string hostname,
00073 uint16_t port,
00074 ConnectedCallback connCb,
00075 FailedCallback failCb = 0);
00076
00077 private:
00078 void connComplete(DispatchHandle& handle);
00079 void failure(int, std::string);
00080 };
00081
00082 struct AsynchIOBufferBase {
00083 char* const bytes;
00084 const int32_t byteCount;
00085 int32_t dataStart;
00086 int32_t dataCount;
00087
00088 AsynchIOBufferBase(char* const b, const int32_t s) :
00089 bytes(b),
00090 byteCount(s),
00091 dataStart(0),
00092 dataCount(0)
00093 {}
00094
00095 virtual ~AsynchIOBufferBase()
00096 {}
00097 };
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111 class AsynchIO : private DispatchHandle {
00112 public:
00113 typedef AsynchIOBufferBase BufferBase;
00114
00115 typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
00116 typedef boost::function1<void, AsynchIO&> EofCallback;
00117 typedef boost::function1<void, AsynchIO&> DisconnectCallback;
00118 typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
00119 typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
00120 typedef boost::function1<void, AsynchIO&> IdleCallback;
00121
00122 private:
00123 ReadCallback readCallback;
00124 EofCallback eofCallback;
00125 DisconnectCallback disCallback;
00126 ClosedCallback closedCallback;
00127 BuffersEmptyCallback emptyCallback;
00128 IdleCallback idleCallback;
00129 const Socket& socket;
00130 std::deque<BufferBase*> bufferQueue;
00131 std::deque<BufferBase*> writeQueue;
00132 bool queuedClose;
00139 volatile bool writePending;
00140
00141 public:
00142 AsynchIO(const Socket& s,
00143 ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
00144 ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
00145 void queueForDeletion();
00146
00147 void start(Poller::shared_ptr poller);
00148 void queueReadBuffer(BufferBase* buff);
00149 void unread(BufferBase* buff);
00150 void queueWrite(BufferBase* buff);
00151 void notifyPendingWrite();
00152 void queueWriteClose();
00153 bool writeQueueEmpty() { return writeQueue.empty(); }
00154 BufferBase* getQueuedBuffer();
00155
00156 private:
00157 ~AsynchIO();
00158 void readable(DispatchHandle& handle);
00159 void writeable(DispatchHandle& handle);
00160 void disconnected(DispatchHandle& handle);
00161 void close(DispatchHandle& handle);
00162 };
00163
00164 }}
00165
00166 #endif // _sys_AsynchIO