00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <netcomm/fawkes/client.h>
00025 #include <netcomm/fawkes/client_handler.h>
00026 #include <netcomm/fawkes/message_queue.h>
00027 #include <netcomm/fawkes/transceiver.h>
00028 #include <netcomm/socket/stream.h>
00029 #include <netcomm/utils/exceptions.h>
00030
00031 #include <core/threading/thread.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/threading/mutex_locker.h>
00034 #include <core/threading/wait_condition.h>
00035 #include <core/exceptions/system.h>
00036
00037 #include <list>
00038 #include <cstring>
00039 #include <cstdlib>
00040 #include <unistd.h>
00041
00042 namespace fawkes {
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 HandlerAlreadyRegisteredException::HandlerAlreadyRegisteredException()
00054 : Exception("A handler for this component has already been registered")
00055 {
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065 class FawkesNetworkClientSendThread : public Thread
00066 {
00067 public:
00068
00069
00070
00071
00072
00073 FawkesNetworkClientSendThread(StreamSocket *s, FawkesNetworkClient *parent)
00074 : Thread("FawkesNetworkClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
00075 {
00076 __s = s;
00077 __parent = parent;
00078 __outbound_mutex = new Mutex();
00079 __outbound_msgqs[0] = new FawkesNetworkMessageQueue();
00080 __outbound_msgqs[1] = new FawkesNetworkMessageQueue();
00081 __outbound_active = 0;
00082 __outbound_msgq = __outbound_msgqs[0];
00083 }
00084
00085
00086 ~FawkesNetworkClientSendThread()
00087 {
00088 for (unsigned int i = 0; i < 2; ++i) {
00089 while ( ! __outbound_msgqs[i]->empty() ) {
00090 FawkesNetworkMessage *m = __outbound_msgqs[i]->front();
00091 m->unref();
00092 __outbound_msgqs[i]->pop();
00093 }
00094 }
00095 delete __outbound_msgqs[0];
00096 delete __outbound_msgqs[1];
00097 delete __outbound_mutex;
00098 }
00099
00100 virtual void once()
00101 {
00102 __parent->set_send_slave_alive();
00103 }
00104
00105 virtual void loop()
00106 {
00107 if ( ! __parent->connected() ) return;
00108
00109 while ( __outbound_havemore ) {
00110 __outbound_mutex->lock();
00111 __outbound_havemore = false;
00112 FawkesNetworkMessageQueue *q = __outbound_msgq;
00113 __outbound_active = 1 - __outbound_active;
00114 __outbound_msgq = __outbound_msgqs[__outbound_active];
00115 __outbound_mutex->unlock();
00116
00117 if ( ! q->empty() ) {
00118 try {
00119 FawkesNetworkTransceiver::send(__s, q);
00120 } catch (ConnectionDiedException &e) {
00121 __parent->connection_died();
00122 exit();
00123 }
00124 }
00125 }
00126 }
00127
00128
00129
00130
00131 void force_send()
00132 {
00133 if ( loop_mutex->try_lock() ) {
00134 loop();
00135 loop_mutex->unlock();
00136 }
00137 }
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 void enqueue(FawkesNetworkMessage *message)
00151 {
00152 __outbound_mutex->lock();
00153 __outbound_msgq->push(message);
00154 __outbound_havemore = true;
00155 __outbound_mutex->unlock();
00156 wakeup();
00157 }
00158
00159
00160 protected: virtual void run() { Thread::run(); }
00161
00162 private:
00163 StreamSocket *__s;
00164 FawkesNetworkClient *__parent;
00165 Mutex *__outbound_mutex;
00166 unsigned int __outbound_active;
00167 bool __outbound_havemore;
00168 FawkesNetworkMessageQueue *__outbound_msgq;
00169 FawkesNetworkMessageQueue *__outbound_msgqs[2];
00170
00171 };
00172
00173
00174
00175
00176
00177
00178
00179
00180 class FawkesNetworkClientRecvThread : public Thread
00181 {
00182 public:
00183
00184
00185
00186
00187
00188 FawkesNetworkClientRecvThread(StreamSocket *s, FawkesNetworkClient *parent,
00189 Mutex *recv_mutex)
00190 : Thread("FawkesNetworkClientRecvThread")
00191 {
00192 __s = s;
00193 __parent = parent;
00194 __inbound_msgq = new FawkesNetworkMessageQueue();
00195 __recv_mutex = recv_mutex;
00196 }
00197
00198
00199 ~FawkesNetworkClientRecvThread()
00200 {
00201 while ( ! __inbound_msgq->empty() ) {
00202 FawkesNetworkMessage *m = __inbound_msgq->front();
00203 m->unref();
00204 __inbound_msgq->pop();
00205 }
00206 delete __inbound_msgq;
00207 }
00208
00209
00210 void recv()
00211 {
00212 std::list<unsigned int> wakeup_list;
00213
00214 try {
00215 FawkesNetworkTransceiver::recv(__s, __inbound_msgq);
00216
00217 MutexLocker lock(__recv_mutex);
00218
00219 __inbound_msgq->lock();
00220 while ( ! __inbound_msgq->empty() ) {
00221 FawkesNetworkMessage *m = __inbound_msgq->front();
00222 wakeup_list.push_back(m->cid());
00223 __parent->dispatch_message(m);
00224 m->unref();
00225 __inbound_msgq->pop();
00226 }
00227 __inbound_msgq->unlock();
00228
00229 lock.unlock();
00230
00231 wakeup_list.sort();
00232 wakeup_list.unique();
00233 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
00234 __parent->wake_handlers(*i);
00235 }
00236 } catch (ConnectionDiedException &e) {
00237 throw;
00238 }
00239 }
00240
00241 virtual void once()
00242 {
00243 __parent->set_recv_slave_alive();
00244 }
00245
00246 virtual void loop()
00247 {
00248
00249 if (! __s ) return;
00250
00251 short p = 0;
00252 try {
00253 p = __s->poll();
00254 } catch (InterruptedException &e) {
00255 return;
00256 }
00257
00258 if ( (p & Socket::POLL_ERR) ||
00259 (p & Socket::POLL_HUP) ||
00260 (p & Socket::POLL_RDHUP)) {
00261 __parent->connection_died();
00262 exit();
00263 } else if ( p & Socket::POLL_IN ) {
00264
00265 try {
00266 recv();
00267 } catch (ConnectionDiedException &e) {
00268 __parent->connection_died();
00269 exit();
00270 }
00271 }
00272 }
00273
00274
00275 protected: virtual void run() { Thread::run(); }
00276
00277 private:
00278 StreamSocket *__s;
00279 FawkesNetworkClient *__parent;
00280 FawkesNetworkMessageQueue * __inbound_msgq;
00281 Mutex *__recv_mutex;
00282 };
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298 FawkesNetworkClient::FawkesNetworkClient(const char *hostname, unsigned short int port, const char *ip)
00299 {
00300 __hostname = strdup(hostname);
00301 __ip = ip ? strdup(ip) : NULL;
00302 __port = port;
00303
00304 s = NULL;
00305 __send_slave = NULL;
00306 __recv_slave = NULL;
00307
00308 connection_died_recently = false;
00309 __send_slave_alive = false;
00310 __recv_slave_alive = false;
00311
00312 slave_status_mutex = new Mutex();
00313
00314 _id = 0;
00315 _has_id = false;
00316
00317 __recv_mutex = new Mutex();
00318 __recv_waitcond = new WaitCondition(__recv_mutex);
00319 __connest_mutex = new Mutex();
00320 __connest_waitcond = new WaitCondition(__connest_mutex);
00321 __connest = false;
00322 __connest_interrupted = false;
00323 }
00324
00325
00326
00327
00328
00329
00330 FawkesNetworkClient::FawkesNetworkClient()
00331 {
00332 __hostname = NULL;
00333 __ip = NULL;
00334 __port = 0;
00335
00336 s = NULL;
00337 __send_slave = NULL;
00338 __recv_slave = NULL;
00339
00340 connection_died_recently = false;
00341 __send_slave_alive = false;
00342 __recv_slave_alive = false;
00343
00344 slave_status_mutex = new Mutex();
00345
00346 _id = 0;
00347 _has_id = false;
00348
00349 __recv_mutex = new Mutex();
00350 __recv_waitcond = new WaitCondition(__recv_mutex);
00351 __connest_mutex = new Mutex();
00352 __connest_waitcond = new WaitCondition(__connest_mutex);
00353 __connest = false;
00354 __connest_interrupted = false;
00355 }
00356
00357
00358
00359
00360
00361
00362
00363
00364 FawkesNetworkClient::FawkesNetworkClient(unsigned int id, const char *hostname,
00365 unsigned short int port, const char *ip)
00366 {
00367 __hostname = strdup(hostname);
00368 __ip = ip ? strdup(ip) : NULL;
00369 __port = port;
00370
00371 s = NULL;
00372 __send_slave = NULL;
00373 __recv_slave = NULL;
00374
00375 connection_died_recently = false;
00376 __send_slave_alive = false;
00377 __recv_slave_alive = false;
00378
00379 slave_status_mutex = new Mutex();
00380
00381 _id = id;
00382 _has_id = true;
00383
00384 __recv_mutex = new Mutex();
00385 __recv_waitcond = new WaitCondition(__recv_mutex);
00386 __connest_mutex = new Mutex();
00387 __connest_waitcond = new WaitCondition(__connest_mutex);
00388 __connest = false;
00389 __connest_interrupted = false;
00390 }
00391
00392
00393
00394 FawkesNetworkClient::~FawkesNetworkClient()
00395 {
00396 disconnect();
00397
00398 delete s;
00399 if (__hostname) free(__hostname);
00400 if (__ip) free(__ip);
00401 delete slave_status_mutex;
00402
00403 delete __connest_waitcond;
00404 delete __connest_mutex;
00405 delete __recv_waitcond;
00406 delete __recv_mutex;
00407 }
00408
00409
00410
00411
00412
00413
00414 void
00415 FawkesNetworkClient::connect()
00416 {
00417 if ( __hostname == NULL && __ip == NULL) {
00418 throw NullPointerException("Hostname not set. Cannot connect.");
00419 }
00420
00421 if ( s != NULL ) {
00422 disconnect();
00423 }
00424
00425
00426 connection_died_recently = false;
00427
00428 try {
00429 s = new StreamSocket();
00430 s->connect(__ip ? __ip : __hostname, __port);
00431 __send_slave = new FawkesNetworkClientSendThread(s, this);
00432 __send_slave->start();
00433 __recv_slave = new FawkesNetworkClientRecvThread(s, this, __recv_mutex);
00434 __recv_slave->start();
00435 } catch (SocketException &e) {
00436 connection_died_recently = true;
00437 if ( __send_slave ) {
00438 __send_slave->cancel();
00439 __send_slave->join();
00440 delete __send_slave;
00441 __send_slave = NULL;
00442 }
00443 if ( __recv_slave ) {
00444 __recv_slave->cancel();
00445 __recv_slave->join();
00446 delete __recv_slave;
00447 __recv_slave = NULL;
00448 }
00449 __send_slave_alive = false;
00450 __recv_slave_alive = false;
00451 delete s;
00452 s = NULL;
00453 throw;
00454 }
00455
00456 __connest_mutex->lock();
00457 while ( ! __connest && ! __connest_interrupted ) {
00458 __connest_waitcond->wait();
00459 }
00460 bool interrupted = __connest_interrupted;
00461 __connest_interrupted = false;
00462 __connest_mutex->unlock();
00463 if ( interrupted ) {
00464 throw InterruptedException("FawkesNetworkClient::connect()");
00465 }
00466
00467 notify_of_connection_established();
00468 }
00469
00470
00471
00472
00473
00474
00475
00476
00477 void
00478 FawkesNetworkClient::connect(const char *hostname, unsigned short int port)
00479 {
00480 connect(hostname, NULL, port);
00481 }
00482
00483
00484
00485
00486
00487
00488
00489
00490 void
00491 FawkesNetworkClient::connect(const char *hostname, const char *ip, unsigned short int port)
00492 {
00493 if (__hostname) free(__hostname);
00494 if (__ip) free(__ip);
00495 __hostname = strdup(hostname);
00496 __ip = ip ? strdup(ip) : NULL;
00497 __port = port;
00498 connect();
00499 }
00500
00501
00502 void
00503 FawkesNetworkClient::disconnect()
00504 {
00505 if ( s == NULL ) return;
00506
00507 if ( __send_slave_alive ) {
00508 if ( ! connection_died_recently ) {
00509 __send_slave->force_send();
00510
00511 usleep(100000);
00512 }
00513 __send_slave->cancel();
00514 __send_slave->join();
00515 delete __send_slave;
00516 __send_slave = NULL;
00517 }
00518 if ( __recv_slave_alive ) {
00519 __recv_slave->cancel();
00520 __recv_slave->join();
00521 delete __recv_slave;
00522 __recv_slave = NULL;
00523 }
00524 __send_slave_alive = false;
00525 __recv_slave_alive = false;
00526 delete s;
00527 s = NULL;
00528
00529 if (! connection_died_recently) {
00530 connection_died();
00531 }
00532 }
00533
00534
00535
00536
00537
00538
00539 void
00540 FawkesNetworkClient::interrupt_connect()
00541 {
00542 __connest_mutex->lock();
00543 __connest_interrupted = true;
00544 __connest_waitcond->wake_all();
00545 __connest_mutex->unlock();
00546 }
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560 void
00561 FawkesNetworkClient::enqueue(FawkesNetworkMessage *message)
00562 {
00563 if (__send_slave) __send_slave->enqueue(message);
00564 }
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578 void
00579 FawkesNetworkClient::enqueue_and_wait(FawkesNetworkMessage *message,
00580 unsigned int timeout_sec)
00581 {
00582 if (__send_slave && __recv_slave) {
00583 __recv_mutex->lock();
00584 if ( __recv_received.find(message->cid()) != __recv_received.end()) {
00585 __recv_mutex->unlock();
00586 unsigned int cid = message->cid();
00587 throw Exception("There is already a thread waiting for messages of "
00588 "component id %u", cid);
00589 }
00590 __send_slave->enqueue(message);
00591 unsigned int cid = message->cid();
00592 __recv_received[cid] = false;
00593 while (!__recv_received[cid] && ! connection_died_recently) {
00594 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
00595 __recv_received.erase(cid);
00596 __recv_mutex->unlock();
00597 throw TimeoutException("Timeout reached while waiting for incoming message "
00598 "(outgoing was %u:%u)", message->cid(), message->msgid());
00599 }
00600 }
00601 __recv_received.erase(cid);
00602 __recv_mutex->unlock();
00603 } else {
00604 unsigned int cid = message->cid();
00605 unsigned int msgid = message->msgid();
00606 throw Exception("Cannot enqueue given message %u:%u, sender or "
00607 "receiver missing", cid, msgid);
00608 }
00609 }
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619 void
00620 FawkesNetworkClient::register_handler(FawkesNetworkClientHandler *handler,
00621 unsigned int component_id)
00622 {
00623 handlers.lock();
00624 if ( handlers.find(component_id) != handlers.end() ) {
00625 handlers.unlock();
00626 throw HandlerAlreadyRegisteredException();
00627 } else {
00628 handlers[component_id] = handler;
00629 }
00630 handlers.unlock();
00631 }
00632
00633
00634
00635
00636
00637
00638 void
00639 FawkesNetworkClient::deregister_handler(unsigned int component_id)
00640 {
00641 handlers.lock();
00642 if ( handlers.find(component_id) != handlers.end() ) {
00643 handlers[component_id]->deregistered(_id);
00644 handlers.erase(component_id);
00645 }
00646 handlers.unlock();
00647 __recv_mutex->lock();
00648 if (__recv_received.find(component_id) != __recv_received.end()) {
00649 __recv_received[component_id] = true;
00650 __recv_waitcond->wake_all();
00651 }
00652 __recv_mutex->unlock();
00653 }
00654
00655
00656 void
00657 FawkesNetworkClient::dispatch_message(FawkesNetworkMessage *m)
00658 {
00659 unsigned int cid = m->cid();
00660 handlers.lock();
00661 if (handlers.find(cid) != handlers.end()) {
00662 handlers[cid]->inbound_received(m, _id);
00663 }
00664 handlers.unlock();
00665 }
00666
00667
00668 void
00669 FawkesNetworkClient::wake_handlers(unsigned int cid)
00670 {
00671 __recv_mutex->lock();
00672 if (__recv_received.find(cid) != __recv_received.end()) {
00673 __recv_received[cid] = true;
00674 }
00675 __recv_waitcond->wake_all();
00676 __recv_mutex->unlock();
00677 }
00678
00679 void
00680 FawkesNetworkClient::notify_of_connection_dead()
00681 {
00682 __connest_mutex->lock();
00683 __connest = false;
00684 __connest_mutex->unlock();
00685
00686 handlers.lock();
00687 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
00688 i->second->connection_died(_id);
00689 }
00690 handlers.unlock();
00691
00692 __recv_mutex->lock();
00693 __recv_waitcond->wake_all();
00694 __recv_mutex->unlock();
00695 }
00696
00697 void
00698 FawkesNetworkClient::notify_of_connection_established()
00699 {
00700 handlers.lock();
00701 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
00702 i->second->connection_established(_id);
00703 }
00704 handlers.unlock();
00705 }
00706
00707
00708 void
00709 FawkesNetworkClient::connection_died()
00710 {
00711 connection_died_recently = true;
00712 notify_of_connection_dead();
00713 }
00714
00715
00716 void
00717 FawkesNetworkClient::set_send_slave_alive()
00718 {
00719 slave_status_mutex->lock();
00720 __send_slave_alive = true;
00721 if ( __send_slave_alive && __recv_slave_alive ) {
00722 __connest_mutex->lock();
00723 __connest = true;
00724 __connest_waitcond->wake_all();
00725 __connest_mutex->unlock();
00726 }
00727 slave_status_mutex->unlock();
00728 }
00729
00730
00731 void
00732 FawkesNetworkClient::set_recv_slave_alive()
00733 {
00734 slave_status_mutex->lock();
00735 __recv_slave_alive = true;
00736 if ( __send_slave_alive && __recv_slave_alive ) {
00737 __connest_mutex->lock();
00738 __connest = true;
00739 __connest_waitcond->wake_all();
00740 __connest_mutex->unlock();
00741 }
00742 slave_status_mutex->unlock();
00743 }
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753 void
00754 FawkesNetworkClient::wait(unsigned int component_id, unsigned int timeout_sec)
00755 {
00756 __recv_mutex->lock();
00757 if ( __recv_received.find(component_id) != __recv_received.end()) {
00758 __recv_mutex->unlock();
00759 throw Exception("There is already a thread waiting for messages of "
00760 "component id %u", component_id);
00761 }
00762 __recv_received[component_id] = false;
00763 while (! __recv_received[component_id] && ! connection_died_recently) {
00764 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
00765 __recv_received.erase(component_id);
00766 __recv_mutex->unlock();
00767 throw TimeoutException("Timeout reached while waiting for incoming message "
00768 "(component %u)", component_id);
00769 }
00770 }
00771 __recv_received.erase(component_id);
00772 __recv_mutex->unlock();
00773 }
00774
00775
00776
00777
00778
00779
00780
00781 void
00782 FawkesNetworkClient::wake(unsigned int component_id)
00783 {
00784 __recv_mutex->lock();
00785 if ( __recv_received.find(component_id) != __recv_received.end()) {
00786 __recv_received[component_id] = true;
00787 }
00788 __recv_waitcond->wake_all();
00789 __recv_mutex->unlock();
00790 }
00791
00792
00793
00794
00795
00796 bool
00797 FawkesNetworkClient::connected() const throw()
00798 {
00799 return (! connection_died_recently && (s != NULL));
00800 }
00801
00802
00803
00804
00805
00806 bool
00807 FawkesNetworkClient::has_id() const
00808 {
00809 return _has_id;
00810 }
00811
00812
00813
00814
00815
00816 unsigned int
00817 FawkesNetworkClient::id() const
00818 {
00819 if ( !_has_id ) {
00820 throw Exception("Trying to get the ID of a client that has no ID");
00821 }
00822
00823 return _id;
00824 }
00825
00826
00827
00828
00829 const char *
00830 FawkesNetworkClient::get_hostname() const
00831 {
00832 return __hostname;
00833 }
00834
00835
00836
00837
00838 const char *
00839 FawkesNetworkClient::get_ip() const
00840 {
00841 return __ip;
00842 }
00843
00844 }