00001 #ifndef QPID_CLUSTER_CLUSTER_H
00002 #define QPID_CLUSTER_CLUSTER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "Cpg.h"
00023 #include "Event.h"
00024 #include "NoOpConnectionOutputHandler.h"
00025 #include "ClusterMap.h"
00026 #include "FailoverExchange.h"
00027
00028 #include "qpid/broker/Broker.h"
00029 #include "qpid/sys/PollableQueue.h"
00030 #include "qpid/sys/Monitor.h"
00031 #include "qpid/sys/LockPtr.h"
00032 #include "qpid/management/Manageable.h"
00033 #include "qpid/Url.h"
00034 #include "qmf/org/apache/qpid/cluster/Cluster.h"
00035
00036 #include <boost/intrusive_ptr.hpp>
00037 #include <boost/bind.hpp>
00038 #include <boost/optional.hpp>
00039
00040 #include <algorithm>
00041 #include <vector>
00042 #include <map>
00043
00044 namespace qpid {
00045 namespace cluster {
00046
00047 class Connection;
00048
00053 class Cluster : private Cpg::Handler, public management::Manageable {
00054 public:
00055 typedef boost::intrusive_ptr<Connection> ConnectionPtr;
00056 typedef std::vector<ConnectionPtr> Connections;
00057
00063 Cluster(const std::string& name, const Url& url, broker::Broker&);
00064
00065 virtual ~Cluster();
00066
00067
00068 bool insert(const ConnectionPtr&);
00069 void erase(ConnectionId);
00070
00071
00072 void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id);
00073 void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
00074 void mcast(const Event& e);
00075
00076
00077 std::vector<Url> getUrls() const;
00078 boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
00079
00080
00081 void leave();
00082
00083
00084 void dumpInDone(const ClusterMap&);
00085
00086 MemberId getId() const;
00087 broker::Broker& getBroker() const;
00088
00089 private:
00090 typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
00091 typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
00092 typedef sys::Monitor::ScopedLock Lock;
00093
00094 typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
00095 typedef sys::PollableQueue<Event> PollableEventQueue;
00096 typedef std::deque<Event> PlainEventQueue;
00097
00098
00099
00100
00101
00102
00103
00104 void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t, Lock&);
00105 void mcastControl(const framing::AMQBody& controlBody, Lock&);
00106 void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&);
00107 void mcast(const Event& e, Lock&);
00108 void leave(Lock&);
00109 std::vector<Url> getUrls(Lock&) const;
00110
00111
00112 void tryMakeOffer(const MemberId&, Lock&);
00113
00114
00115 void brokerShutdown();
00116
00117
00118
00119
00120 void dumpRequest(const MemberId&, const std::string&, Lock&);
00121 void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&);
00122 void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&);
00123 void ready(const MemberId&, const std::string&, Lock&);
00124 void configChange(const MemberId&, const std::string& addresses, Lock& l);
00125 void shutdown(const MemberId&, Lock&);
00126 void delivered(const Event&);
00127 void delivered(const Event&, Lock&);
00128
00129
00130 void dispatch(sys::DispatchHandle&);
00131 void disconnect(sys::DispatchHandle&);
00132
00133 void deliver(
00134 cpg_handle_t ,
00135 struct cpg_name *group,
00136 uint32_t ,
00137 uint32_t ,
00138 void* ,
00139 int );
00140
00141 void deliver(const Event& e, Lock&);
00142
00143 void configChange(
00144 cpg_handle_t ,
00145 struct cpg_name *,
00146 struct cpg_address *, int ,
00147 struct cpg_address *, int ,
00148 struct cpg_address *, int
00149 );
00150
00151 boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&);
00152 Connections getConnections(Lock&);
00153
00154 virtual qpid::management::ManagementObject* GetManagementObject() const;
00155 virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
00156
00157 void stopClusterNode(Lock&);
00158 void stopFullCluster(Lock&);
00159 void memberUpdate(Lock&);
00160
00161
00162 void checkDumpIn(Lock&);
00163
00164
00165 void dumpOutDone();
00166 void dumpOutError(const std::exception&);
00167 void dumpOutDone(Lock&);
00168
00169 mutable sys::Monitor lock;
00170
00171 broker::Broker& broker;
00172 boost::shared_ptr<sys::Poller> poller;
00173 Cpg cpg;
00174 const Cpg::Name name;
00175 const Url myUrl;
00176 const MemberId myId;
00177
00178 ConnectionMap connections;
00179 NoOpConnectionOutputHandler shadowOut;
00180 sys::DispatchHandle cpgDispatchHandle;
00181 PollableEventQueue deliverQueue;
00182 PlainEventQueue mcastQueue;
00183 uint32_t mcastId;
00184
00185 qmf::org::apache::qpid::cluster::Cluster* mgmtObject;
00186
00187 enum {
00188 INIT,
00189 NEWBIE,
00190 DUMPEE,
00191 CATCHUP,
00192 READY,
00193 OFFER,
00194 DUMPER,
00195 LEFT
00196 } state;
00197
00198 ClusterMap map;
00199 sys::Thread dumpThread;
00200 boost::optional<ClusterMap> dumpedMap;
00201
00202 size_t lastSize;
00203 boost::shared_ptr<FailoverExchange> failoverExchange;
00204
00205 friend std::ostream& operator<<(std::ostream&, const Cluster&);
00206 friend class ClusterDispatcher;
00207 };
00208
00209 }}
00210
00211
00212
00213 #endif