00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <blackboard/local.h>
00028 #include <blackboard/remote.h>
00029 #include <blackboard/exceptions.h>
00030 #include <blackboard/bbconfig.h>
00031 #include <blackboard/interface_listener.h>
00032
00033 #include <interfaces/TestInterface.h>
00034
00035 #include <interface/interface_info.h>
00036 #include <core/exceptions/system.h>
00037 #include <netcomm/fawkes/client.h>
00038 #include <netcomm/fawkes/server_thread.h>
00039 #include <utils/time/time.h>
00040
00041 #include <signal.h>
00042 #include <cstdlib>
00043 #include <cstring>
00044 #include <cstdio>
00045
00046 #include <iostream>
00047 #include <vector>
00048
00049 using namespace std;
00050 using namespace fawkes;
00051
00052
00053 bool quit = false;
00054
00055 void
00056 signal_handler(int signum)
00057 {
00058 quit = true;
00059 }
00060
00061
00062 #define NUM_CHUNKS 5
00063
00064 void
00065 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer)
00066 {
00067 while (! quit) {
00068 int expval = ti_reader->test_int() + 1;
00069 TestInterface::SetTestIntMessage *m = new TestInterface::SetTestIntMessage(expval);
00070 unsigned int msgid = ti_reader->msgq_enqueue(m);
00071 printf("Sent with message ID %u\n", msgid);
00072
00073 if ( ti_writer->msgq_size() > 1 ) {
00074 cout << "Error, more than one message! flushing." << endl;
00075 ti_writer->msgq_flush();
00076 }
00077
00078 usleep(100000);
00079
00080 if ( ti_writer->msgq_first() != NULL ) {
00081 if ( ti_writer->msgq_first_is<TestInterface::SetTestStringMessage>() ) {
00082 TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg);
00083 printf("Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n", msg->id());
00084 }
00085 if ( ti_writer->msgq_first_is<TestInterface::SetTestIntMessage>() ) {
00086 TestInterface::SetTestIntMessage *m2 = ti_writer->msgq_first<TestInterface::SetTestIntMessage>();
00087 printf("Received message with ID %u (enqueue time: %s)\n", m2->id(), m2->time_enqueued()->str());
00088 ti_writer->set_test_int( m2->test_int() );
00089 try {
00090 ti_writer->write();
00091 } catch (InterfaceWriteDeniedException &e) {
00092 cout << "BUG: caught write denied exception" << endl;
00093 e.print_trace();
00094 }
00095 ti_writer->msgq_pop();
00096 } else {
00097 cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl;
00098 }
00099
00100 usleep(100000);
00101
00102
00103 ti_reader->read();
00104 int val = ti_reader->test_int();
00105 if ( val == expval ) {
00106
00107 } else {
00108 cout << " failure, value is " << ti_reader->test_int() << ", expected "
00109 << expval << endl;
00110 }
00111 } else {
00112 printf("No message in queue, if network test this means the message was dropped\n");
00113 }
00114
00115 usleep(10);
00116 }
00117 }
00118
00119 class SyncInterfaceListener : public fawkes::BlackBoardInterfaceListener
00120 {
00121 public:
00122 SyncInterfaceListener(fawkes::Interface *reader,
00123 fawkes::Interface *writer,
00124 fawkes::BlackBoard *reader_bb,
00125 fawkes::BlackBoard *writer_bb)
00126 : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id())
00127 {
00128 __reader = reader;
00129 __writer = writer;
00130 __reader_bb = reader_bb;
00131 __writer_bb = writer_bb;
00132
00133 bbil_add_data_interface(__reader);
00134 bbil_add_message_interface(__writer);
00135
00136 __reader_bb->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
00137 __writer_bb->register_listener(this, BlackBoard::BBIL_FLAG_MESSAGES);
00138 }
00139
00140
00141
00142 ~SyncInterfaceListener()
00143 {
00144 __reader_bb->unregister_listener(this);
00145 __writer_bb->unregister_listener(this);
00146 }
00147
00148
00149 bool
00150 bb_interface_message_received(Interface *interface,
00151 Message *message) throw()
00152 {
00153 try {
00154 if ( interface == __writer ) {
00155 printf("%s: Forwarding message\n", bbil_name());
00156 Message *m = message->clone();
00157 m->set_hops(message->hops());
00158 m->ref();
00159 __reader->msgq_enqueue(m);
00160 message->set_id(m->id());
00161 m->unref();
00162 return false;
00163 } else {
00164
00165 printf("%s: Message received for unknown interface\n", bbil_name());
00166 return true;
00167 }
00168 } catch (Exception &e) {
00169 printf("%s: Exception when message received\n", bbil_name());
00170 e.print_trace();
00171 return false;
00172 }
00173 }
00174
00175
00176 void
00177 bb_interface_data_changed(Interface *interface) throw()
00178 {
00179 try {
00180 if ( interface == __reader ) {
00181
00182 __reader->read();
00183 __writer->copy_values(__reader);
00184 __writer->write();
00185 } else {
00186
00187 printf("%s: Data changed for unknown interface", bbil_name());
00188 }
00189 } catch (Exception &e) {
00190 printf("%s: Exception when data changed\n", bbil_name());
00191 e.print_trace();
00192 }
00193 }
00194
00195 private:
00196 fawkes::Interface *__writer;
00197 fawkes::Interface *__reader;
00198
00199 fawkes::BlackBoard *__writer_bb;
00200 fawkes::BlackBoard *__reader_bb;
00201
00202 };
00203
00204
00205 int
00206 main(int argc, char **argv)
00207 {
00208 signal(SIGINT, signal_handler);
00209
00210 LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00211 BlackBoard *lbb = llbb;
00212
00213 FawkesNetworkServerThread *fns = new FawkesNetworkServerThread(1910);
00214 fns->start();
00215
00216 llbb->start_nethandler(fns);
00217
00218 BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910);
00219
00220 InterfaceInfoList *infl = rbb->list_all();
00221 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00222 const unsigned char *hash = (*i).hash();
00223 char phash[__INTERFACE_HASH_SIZE * 2 + 1];
00224 memset(phash, 0, sizeof(phash));
00225 for (unsigned int j = 0; j < __INTERFACE_HASH_SIZE; ++j) {
00226 sprintf(&phash[j * 2], "%02x", hash[j]);
00227 }
00228 printf("%s::%s (%s), w:%i r:%u s:%u\n",
00229 (*i).type(), (*i).id(), phash, (*i).has_writer(),
00230 (*i).num_readers(), (*i).serial());
00231 }
00232 delete infl;
00233
00234
00235 TestInterface *ti_reader;
00236 TestInterface *ti_writer;
00237 try {
00238 cout << "Opening interfaces.. " << flush;
00239 ti_writer = rbb->open_for_writing<TestInterface>("SomeID");
00240 ti_reader = rbb->open_for_reading<TestInterface>("SomeID");
00241 cout << "success, "
00242 << "writer hash=" << ti_writer->hash_printable()
00243 << " reader hash=" << ti_reader->hash_printable()
00244 << endl;
00245 } catch (Exception &e) {
00246 cout << "failed! Aborting" << endl;
00247 e.print_trace();
00248 exit(1);
00249 }
00250
00251 try {
00252 cout << "Trying to open second writer.. " << flush;
00253 TestInterface *ti_writer_two;
00254 ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID");
00255 cout << "BUG: Detection of second writer did NOT work!" << endl;
00256 exit(2);
00257 } catch (BlackBoardWriterActiveException &e) {
00258 cout << "exception caught as expected, detected and prevented second writer!" << endl;
00259 }
00260
00261 try {
00262 cout << "Trying to open third writer.. " << flush;
00263 TestInterface *ti_writer_three;
00264 ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID");
00265 cout << "No exception as expected, different ID ok!" << endl;
00266 rbb->close(ti_writer_three);
00267 } catch (BlackBoardWriterActiveException &e) {
00268 cout << "BUG: Third writer with different ID detected as another writer!" << endl;
00269 exit(3);
00270 }
00271
00272 cout << endl << endl
00273 << "Running data tests ==================================================" << endl;
00274
00275 cout << "Writing initial value ("
00276 << TestInterface::TEST_CONSTANT << ") into interface as TestInt" << endl;
00277 ti_writer->set_test_int( TestInterface::TEST_CONSTANT );
00278 try {
00279 ti_writer->write();
00280 } catch (InterfaceWriteDeniedException &e) {
00281 cout << "BUG: caught write denied exception" << endl;
00282 e.print_trace();
00283 }
00284
00285 cout << "Giving some time to have value processed" << endl;
00286 usleep(100000);
00287
00288 cout << "Reading value from reader interface.. " << flush;
00289 ti_reader->read();
00290 int val = ti_reader->test_int();
00291 if ( val == TestInterface::TEST_CONSTANT ) {
00292 cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
00293 } else {
00294 cout << " failure, value is " << ti_reader->test_int() << ", expected "
00295 << TestInterface::TEST_CONSTANT << endl;
00296 }
00297
00298 cout << "Closing interfaces.. " << flush;
00299 try {
00300 rbb->close(ti_reader);
00301 rbb->close(ti_writer);
00302 cout << "done" << endl;
00303 } catch (Exception &e) {
00304 cout << "failed" << endl;
00305 e.print_trace();
00306 }
00307
00308 cout << endl << endl << "Starting MESSAGING tests" << endl
00309 << "Press Ctrl-C to continue with next test" << endl << endl;
00310
00311 ti_writer = lbb->open_for_writing<TestInterface>("Messaging");
00312 ti_reader = rbb->open_for_reading<TestInterface>("Messaging");
00313
00314 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00315 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00316
00317 test_messaging(ti_reader, ti_writer);
00318
00319 rbb->close(ti_reader);
00320 lbb->close(ti_writer);
00321
00322 cout << endl << endl << "Starting MESSAGING tests, doing repeater scenario" << endl
00323 << "Press Ctrl-C to continue with next test" << endl << endl;
00324 quit = false;
00325
00326 delete rbb;
00327
00328 LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00329
00330 FawkesNetworkServerThread *repfns = new FawkesNetworkServerThread(1911);
00331 repfns->start();
00332
00333 repllbb->start_nethandler(repfns);
00334
00335 BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911);
00336 rbb = new RemoteBlackBoard("localhost", 1911);
00337
00338 TestInterface *rep_reader;
00339 TestInterface *rep_writer;
00340
00341 ti_writer = rbb->open_for_writing<TestInterface>("Messaging");
00342 ti_reader = lbb->open_for_reading<TestInterface>("Messaging");
00343
00344 rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging");
00345 rep_writer = lbb->open_for_writing<TestInterface>("Messaging");
00346
00347 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00348 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00349
00350 SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb);
00351
00352 test_messaging(ti_reader, ti_writer);
00353
00354 delete sil;
00355 lbb->close(ti_reader);
00356 rbb->close(ti_writer);
00357 rep_rbb->close(rep_reader);
00358 lbb->close(rep_writer);
00359 delete repllbb;
00360 delete rep_rbb;
00361
00362 cout << "Tests done" << endl;
00363
00364 delete rbb;
00365 delete llbb;
00366 delete fns;
00367 }
00368
00369
00370