00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef RDMA_WRAP_H
00022 #define RDMA_WRAP_H
00023
00024 #include "rdma_factories.h"
00025
00026 #include <rdma/rdma_cma.h>
00027
00028 #include "qpid/RefCounted.h"
00029 #include "qpid/sys/IOHandle.h"
00030 #include "qpid/sys/posix/PrivatePosix.h"
00031
00032 #include <fcntl.h>
00033
00034 #include <netdb.h>
00035
00036 #include <vector>
00037 #include <algorithm>
00038 #include <iostream>
00039 #include <stdexcept>
00040 #include <boost/shared_ptr.hpp>
00041 #include <boost/intrusive_ptr.hpp>
00042
00043 namespace Rdma {
00044 const int DEFAULT_TIMEOUT = 2000;
00045 const int DEFAULT_BACKLOG = 100;
00046 const int DEFAULT_CQ_ENTRIES = 256;
00047 const int DEFAULT_WR_ENTRIES = 64;
00048 extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM;
00049
00050 struct Buffer {
00051 friend class QueuePair;
00052
00053 char* const bytes;
00054 const int32_t byteCount;
00055 int32_t dataStart;
00056 int32_t dataCount;
00057
00058 Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
00059 bytes(b),
00060 byteCount(s),
00061 dataStart(0),
00062 dataCount(0),
00063 mr(CHECK_NULL(::ibv_reg_mr(
00064 pd, bytes, byteCount,
00065 ::IBV_ACCESS_LOCAL_WRITE)))
00066 {}
00067
00068 ~Buffer() {
00069 (void) ::ibv_dereg_mr(mr);
00070 delete [] bytes;
00071 }
00072
00073 private:
00074 ::ibv_mr* mr;
00075 };
00076
00077 class Connection;
00078
00079 enum QueueDirection {
00080 NONE,
00081 SEND,
00082 RECV
00083 };
00084
00085 class QueuePairEvent {
00086 boost::shared_ptr< ::ibv_cq > cq;
00087 ::ibv_wc wc;
00088 QueueDirection dir;
00089
00090 friend class QueuePair;
00091
00092 QueuePairEvent() :
00093 dir(NONE)
00094 {}
00095
00096 QueuePairEvent(
00097 const ::ibv_wc& w,
00098 boost::shared_ptr< ::ibv_cq > c,
00099 QueueDirection d) :
00100 cq(c),
00101 wc(w),
00102 dir(d)
00103 {
00104 assert(dir != NONE);
00105 }
00106
00107 public:
00108 operator bool() const {
00109 return dir != NONE;
00110 }
00111
00112 bool immPresent() const {
00113 return wc.wc_flags & IBV_WC_WITH_IMM;
00114 }
00115
00116 uint32_t getImm() const {
00117 return ntohl(wc.imm_data);
00118 }
00119
00120 QueueDirection getDirection() const {
00121 return dir;
00122 }
00123
00124 ::ibv_wc_opcode getEventType() const {
00125 return wc.opcode;
00126 }
00127
00128 ::ibv_wc_status getEventStatus() const {
00129 return wc.status;
00130 }
00131
00132 Buffer* getBuffer() const {
00133 Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
00134 b->dataCount = wc.byte_len;
00135 return b;
00136 }
00137 };
00138
00139
00140
00141
00142 class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
00143 boost::shared_ptr< ::ibv_pd > pd;
00144 boost::shared_ptr< ::ibv_comp_channel > cchannel;
00145 boost::shared_ptr< ::ibv_cq > scq;
00146 boost::shared_ptr< ::ibv_cq > rcq;
00147 boost::shared_ptr< ::ibv_qp > qp;
00148 int outstandingSendEvents;
00149 int outstandingRecvEvents;
00150
00151 friend class Connection;
00152
00153 QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
00154 ~QueuePair();
00155
00156 public:
00157 typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
00158
00159
00160 Buffer* createBuffer(int s) {
00161 return new Buffer(pd.get(), new char[s], s);
00162 }
00163
00164
00165
00166 void nonblocking() {
00167 ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
00168 }
00169
00170
00171
00172 QueuePair::intrusive_ptr getNextChannelEvent() {
00173
00174 ::ibv_cq* cq;
00175 void* ctx;
00176 int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
00177 if (rc == -1 && errno == EAGAIN)
00178 return 0;
00179 CHECK(rc);
00180
00181
00182 if (cq == scq.get()) {
00183 if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
00184 ::ibv_ack_cq_events(cq, outstandingSendEvents);
00185 outstandingSendEvents = 0;
00186 }
00187 } else if (cq == rcq.get()) {
00188 if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
00189 ::ibv_ack_cq_events(cq, outstandingRecvEvents);
00190 outstandingRecvEvents = 0;
00191 }
00192 }
00193
00194 return static_cast<QueuePair*>(ctx);
00195 }
00196
00197 QueuePairEvent getNextEvent() {
00198 ::ibv_wc w;
00199 if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
00200 return QueuePairEvent(w, scq, SEND);
00201 else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
00202 return QueuePairEvent(w, rcq, RECV);
00203 else
00204 return QueuePairEvent();
00205 }
00206
00207 void postRecv(Buffer* buf);
00208 void postSend(Buffer* buf);
00209 void postSend(uint32_t imm, Buffer* buf);
00210 void notifyRecv();
00211 void notifySend();
00212 };
00213
00214 class ConnectionEvent {
00215 friend class Connection;
00216
00217
00218
00219 boost::intrusive_ptr<Connection> id;
00220 boost::intrusive_ptr<Connection> listen_id;
00221 boost::shared_ptr< ::rdma_cm_event > event;
00222
00223 ConnectionEvent() {}
00224 ConnectionEvent(::rdma_cm_event* e);
00225
00226
00227 public:
00228 operator bool() const {
00229 return event;
00230 }
00231
00232 ::rdma_cm_event_type getEventType() const {
00233 return event->event;
00234 }
00235
00236 ::rdma_conn_param getConnectionParam() const;
00237
00238 boost::intrusive_ptr<Connection> getConnection () const {
00239 return id;
00240 }
00241
00242 boost::intrusive_ptr<Connection> getListenId() const {
00243 return listen_id;
00244 }
00245 };
00246
00247
00248
00249
00250
00251
00252
00253 class Connection : public qpid::sys::IOHandle, public qpid::RefCounted {
00254 boost::shared_ptr< ::rdma_event_channel > channel;
00255 boost::shared_ptr< ::rdma_cm_id > id;
00256 QueuePair::intrusive_ptr qp;
00257
00258 void* context;
00259
00260 friend class ConnectionEvent;
00261 friend class QueuePair;
00262
00263
00264
00265 Connection(::rdma_cm_id* i) :
00266 qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
00267 id(i, destroyId),
00268 context(0)
00269 {
00270 impl->fd = id->channel->fd;
00271
00272
00273
00274 if (i)
00275 i->context = this;
00276 }
00277
00278 Connection() :
00279 qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
00280 channel(mkEChannel()),
00281 id(mkId(channel.get(), this, RDMA_PS_TCP)),
00282 context(0)
00283 {
00284 impl->fd = channel->fd;
00285 }
00286
00287 ~Connection() {
00288
00289 id->context = 0;
00290 }
00291
00292
00293
00294 void ensureQueuePair() {
00295 assert(id.get());
00296
00297
00298 if (qp)
00299 return;
00300
00301 qp = new QueuePair(id);
00302 }
00303
00304 public:
00305 typedef boost::intrusive_ptr<Connection> intrusive_ptr;
00306
00307 static intrusive_ptr make() {
00308 return new Connection();
00309 }
00310
00311 static intrusive_ptr find(::rdma_cm_id* i) {
00312 if (!i)
00313 return 0;
00314 Connection* id = static_cast< Connection* >(i->context);
00315 if (!id)
00316 throw std::logic_error("Couldn't find existing Connection");
00317 return id;
00318 }
00319
00320 template <typename T>
00321 void addContext(T* c) {
00322
00323 if (!context)
00324 context = c;
00325 }
00326
00327 template <typename T>
00328 T* getContext() {
00329 return static_cast<T*>(context);
00330 }
00331
00332
00333
00334 void nonblocking() {
00335 assert(id.get());
00336 ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
00337 }
00338
00339
00340
00341 ConnectionEvent getNextEvent() {
00342 assert(id.get());
00343 ::rdma_cm_event* e;
00344 int rc = ::rdma_get_cm_event(id->channel, &e);
00345 if (rc == -1 && errno == EAGAIN)
00346 return ConnectionEvent();
00347 CHECK(rc);
00348 return ConnectionEvent(e);
00349 }
00350
00351 void bind(sockaddr& src_addr) const {
00352 assert(id.get());
00353 CHECK(::rdma_bind_addr(id.get(), &src_addr));
00354 }
00355
00356 void listen(int backlog = DEFAULT_BACKLOG) const {
00357 assert(id.get());
00358 CHECK(::rdma_listen(id.get(), backlog));
00359 }
00360
00361 void resolve_addr(
00362 sockaddr& dst_addr,
00363 sockaddr* src_addr = 0,
00364 int timeout_ms = DEFAULT_TIMEOUT) const
00365 {
00366 assert(id.get());
00367 CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms));
00368 }
00369
00370 void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
00371 assert(id.get());
00372 CHECK(::rdma_resolve_route(id.get(), timeout_ms));
00373 }
00374
00375 void disconnect() const {
00376 assert(id.get());
00377 CHECK(::rdma_disconnect(id.get()));
00378 }
00379
00380
00381 void connect() {
00382 assert(id.get());
00383
00384
00385 ensureQueuePair();
00386
00387 ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
00388 CHECK(::rdma_connect(id.get(), &p));
00389 }
00390
00391 template <typename T>
00392 void connect(const T* data) {
00393 assert(id.get());
00394
00395 ensureQueuePair();
00396
00397 ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
00398 p.private_data = data;
00399 p.private_data_len = sizeof(T);
00400 CHECK(::rdma_connect(id.get(), &p));
00401 }
00402
00403
00404
00405 template <typename T>
00406 void accept(const ::rdma_conn_param& param, const T* data) {
00407 assert(id.get());
00408
00409 ensureQueuePair();
00410
00411 ::rdma_conn_param p = param;
00412 p.private_data = data;
00413 p.private_data_len = sizeof(T);
00414 CHECK(::rdma_accept(id.get(), &p));
00415 }
00416
00417 void accept(const ::rdma_conn_param& param) {
00418 assert(id.get());
00419
00420 ensureQueuePair();
00421
00422 ::rdma_conn_param p = param;
00423 p.private_data = 0;
00424 p.private_data_len = 0;
00425 CHECK(::rdma_accept(id.get(), &p));
00426 }
00427
00428 template <typename T>
00429 void reject(const T* data) const {
00430 assert(id.get());
00431 CHECK(::rdma_reject(id.get(), data, sizeof(T)));
00432 }
00433
00434 void reject() const {
00435 assert(id.get());
00436 CHECK(::rdma_reject(id.get(), 0, 0));
00437 }
00438
00439 QueuePair::intrusive_ptr getQueuePair() {
00440 assert(id.get());
00441
00442 ensureQueuePair();
00443
00444 return qp;
00445 }
00446
00447 std::string getLocalName() const {
00448 ::sockaddr* addr = ::rdma_get_local_addr(id.get());
00449 char hostName[NI_MAXHOST];
00450 char portName[NI_MAXSERV];
00451 CHECK_IBV(::getnameinfo(
00452 addr, sizeof(::sockaddr_storage),
00453 hostName, sizeof(hostName),
00454 portName, sizeof(portName),
00455 NI_NUMERICHOST | NI_NUMERICSERV));
00456 std::string r(hostName);
00457 r += ":";
00458 r += portName;
00459 return r;
00460 }
00461
00462 std::string getPeerName() const {
00463 ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
00464 char hostName[NI_MAXHOST];
00465 char portName[NI_MAXSERV];
00466 CHECK_IBV(::getnameinfo(
00467 addr, sizeof(::sockaddr_storage),
00468 hostName, sizeof(hostName),
00469 portName, sizeof(portName),
00470 NI_NUMERICHOST | NI_NUMERICSERV));
00471 std::string r(hostName);
00472 r += ":";
00473 r += portName;
00474 return r;
00475 }
00476 };
00477
00478 inline void QueuePair::notifyRecv() {
00479 CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
00480 }
00481
00482 inline void QueuePair::notifySend() {
00483 CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
00484 }
00485
00486 inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
00487 id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
00488 Connection::find(e->id) : new Connection(e->id)),
00489 listen_id(Connection::find(e->listen_id)),
00490 event(e, acker)
00491 {}
00492 }
00493
00494 std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);
00495
00496 #endif // RDMA_WRAP_H