00001 #ifndef _Broker_
00002 #define _Broker_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "ConnectionFactory.h"
00026 #include "ConnectionToken.h"
00027 #include "DirectExchange.h"
00028 #include "DtxManager.h"
00029 #include "ExchangeRegistry.h"
00030 #include "MessageStore.h"
00031 #include "QueueRegistry.h"
00032 #include "LinkRegistry.h"
00033 #include "SessionManager.h"
00034 #include "QueueCleaner.h"
00035 #include "Vhost.h"
00036 #include "System.h"
00037 #include "Timer.h"
00038 #include "qpid/management/Manageable.h"
00039 #include "qpid/management/ManagementBroker.h"
00040 #include "qmf/org/apache/qpid/broker/Broker.h"
00041 #include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
00042 #include "qpid/Options.h"
00043 #include "qpid/Plugin.h"
00044 #include "qpid/DataDir.h"
00045 #include "qpid/framing/FrameHandler.h"
00046 #include "qpid/framing/OutputHandler.h"
00047 #include "qpid/framing/ProtocolInitiation.h"
00048 #include "qpid/sys/Runnable.h"
00049 #include "qpid/RefCounted.h"
00050 #include "AclModule.h"
00051
00052 #include <boost/intrusive_ptr.hpp>
00053 #include <vector>
00054
00055 namespace qpid {
00056
00057 namespace sys {
00058 class ProtocolFactory;
00059 class Poller;
00060 }
00061
00062 class Url;
00063
00064 namespace broker {
00065
00066 static const uint16_t DEFAULT_PORT=5672;
00067
00068 struct NoSuchTransportException : qpid::Exception
00069 {
00070 NoSuchTransportException(const std::string& s) : Exception(s) {}
00071 virtual ~NoSuchTransportException() throw() {}
00072 };
00073
00077 class Broker : public sys::Runnable, public Plugin::Target,
00078 public management::Manageable, public RefCounted
00079 {
00080 public:
00081
00082 struct Options : public qpid::Options {
00083 Options(const std::string& name="Broker Options");
00084
00085 bool noDataDir;
00086 std::string dataDir;
00087 uint16_t port;
00088 int workerThreads;
00089 int maxConnections;
00090 int connectionBacklog;
00091 uint64_t stagingThreshold;
00092 bool enableMgmt;
00093 uint16_t mgmtPubInterval;
00094 uint16_t queueCleanInterval;
00095 bool auth;
00096 std::string realm;
00097 size_t replayFlushLimit;
00098 size_t replayHardLimit;
00099 uint queueLimit;
00100 bool tcpNoDelay;
00101 };
00102
00103 private:
00104 typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
00105
00106 boost::shared_ptr<sys::Poller> poller;
00107 Options config;
00108 management::ManagementAgent::Singleton managementAgentSingleton;
00109 ProtocolFactoryMap protocolFactories;
00110 MessageStore* store;
00111 AclModule* acl;
00112 DataDir dataDir;
00113
00114 QueueRegistry queues;
00115 ExchangeRegistry exchanges;
00116 LinkRegistry links;
00117 boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
00118 Timer timer;
00119 DtxManager dtxManager;
00120 SessionManager sessionManager;
00121 management::ManagementAgent* managementAgent;
00122 qmf::org::apache::qpid::broker::Broker* mgmtObject;
00123 Vhost::shared_ptr vhostObject;
00124 System::shared_ptr systemObject;
00125 QueueCleaner queueCleaner;
00126
00127 void declareStandardExchange(const std::string& name, const std::string& type);
00128
00129 std::vector<Url> knownBrokers;
00130 std::vector<Url> getKnownBrokersImpl();
00131
00132 public:
00133
00134
00135 virtual ~Broker();
00136
00137 Broker(const Options& configuration);
00138 static boost::intrusive_ptr<Broker> create(const Options& configuration);
00139 static boost::intrusive_ptr<Broker> create(int16_t port = DEFAULT_PORT);
00140
00147 virtual uint16_t getPort() const;
00148
00153 virtual void run();
00154
00156 virtual void shutdown();
00157
00158 void setStore (MessageStore*);
00159 MessageStore& getStore() { return *store; }
00160 void setAcl (AclModule* _acl) {acl = _acl;}
00161 AclModule* getAcl() { return acl; }
00162 QueueRegistry& getQueues() { return queues; }
00163 ExchangeRegistry& getExchanges() { return exchanges; }
00164 LinkRegistry& getLinks() { return links; }
00165 uint64_t getStagingThreshold() { return config.stagingThreshold; }
00166 DtxManager& getDtxManager() { return dtxManager; }
00167 DataDir& getDataDir() { return dataDir; }
00168 Options& getOptions() { return config; }
00169
00170 SessionManager& getSessionManager() { return sessionManager; }
00171
00172 management::ManagementObject* GetManagementObject (void) const;
00173 management::Manageable* GetVhostObject (void) const;
00174 management::Manageable::status_t ManagementMethod (uint32_t methodId,
00175 management::Args& args,
00176 std::string& text);
00177
00179 void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
00180
00182 void accept();
00183
00185 void connect(const std::string& host, uint16_t port,
00186 const std::string& transport,
00187 boost::function2<void, int, std::string> failed,
00188 sys::ConnectionCodec::Factory* =0);
00190 void connect(const Url& url,
00191 boost::function2<void, int, std::string> failed,
00192 sys::ConnectionCodec::Factory* =0);
00193
00197 uint32_t queueMoveMessages( const std::string& srcQueue,
00198 const std::string& destQueue,
00199 uint32_t qty);
00200
00201
00202
00203 boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
00204
00206 boost::shared_ptr<sys::Poller> getPoller();
00207
00208 boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
00209 void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
00210
00211 Timer& getTimer() { return timer; }
00212
00213 boost::function<std::vector<Url> ()> getKnownBrokers;
00214
00215 static const std::string TCP_TRANSPORT;
00216 };
00217
00218 }}
00219
00220
00221
00222 #endif