00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <fvutils/net/fuse_server_client_thread.h>
00025
00026 #include <fvutils/net/fuse_server.h>
00027 #include <fvutils/net/fuse_server.h>
00028 #include <fvutils/net/fuse_transceiver.h>
00029 #include <fvutils/net/fuse_message_queue.h>
00030 #include <fvutils/net/fuse_image_content.h>
00031 #include <fvutils/net/fuse_lut_content.h>
00032 #include <fvutils/net/fuse_imagelist_content.h>
00033 #include <fvutils/net/fuse_lutlist_content.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <fvutils/compression/jpeg_compressor.h>
00037
00038 #include <core/exceptions/system.h>
00039 #include <netcomm/socket/stream.h>
00040 #include <netcomm/utils/exceptions.h>
00041 #include <utils/logging/liblogger.h>
00042
00043 #include <netinet/in.h>
00044 #include <cstring>
00045 #include <cstdlib>
00046
00047 using namespace fawkes;
00048
00049 namespace firevision {
00050 #if 0
00051 }
00052 #endif
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
00068 : Thread("FuseServerClientThread")
00069 {
00070 __fuse_server = fuse_server;
00071 __socket = s;
00072 __jpeg_compressor = NULL;
00073
00074 __inbound_queue = new FuseNetworkMessageQueue();
00075 __outbound_queue = new FuseNetworkMessageQueue();
00076
00077 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00078 greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00079 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00080 greetmsg, sizeof(FUSE_greeting_message_t)));
00081
00082 __alive = true;
00083 }
00084
00085
00086
00087 FuseServerClientThread::~FuseServerClientThread()
00088 {
00089 delete __socket;
00090 delete __jpeg_compressor;
00091
00092 for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) {
00093 delete __bit->second;
00094 }
00095 __buffers.clear();
00096
00097 for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) {
00098 delete __lit->second;
00099 }
00100 __luts.clear();
00101
00102 while ( ! __inbound_queue->empty() ) {
00103 FuseNetworkMessage *m = __inbound_queue->front();
00104 m->unref();
00105 __inbound_queue->pop();
00106 }
00107
00108 while ( ! __outbound_queue->empty() ) {
00109 FuseNetworkMessage *m = __outbound_queue->front();
00110 m->unref();
00111 __outbound_queue->pop();
00112 }
00113
00114 delete __inbound_queue;
00115 delete __outbound_queue;
00116 }
00117
00118
00119
00120 void
00121 FuseServerClientThread::send()
00122 {
00123 if ( ! __outbound_queue->empty() ) {
00124 try {
00125 FuseNetworkTransceiver::send(__socket, __outbound_queue);
00126 } catch (Exception &e) {
00127 __fuse_server->connection_died(this);
00128 __alive = false;
00129 }
00130 }
00131 }
00132
00133
00134
00135
00136
00137
00138 void
00139 FuseServerClientThread::recv()
00140 {
00141 try {
00142 FuseNetworkTransceiver::recv(__socket, __inbound_queue);
00143 } catch (ConnectionDiedException &e) {
00144 __socket->close();
00145 __fuse_server->connection_died(this);
00146 __alive = false;
00147 }
00148 }
00149
00150
00151
00152
00153
00154 void
00155 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m)
00156 {
00157 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00158 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00159 throw Exception("Invalid version on other side");
00160 }
00161 }
00162
00163
00164 SharedMemoryImageBuffer *
00165 FuseServerClientThread::get_shmimgbuf(const char *id)
00166 {
00167 char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
00168 tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
00169 strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
00170
00171 if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) {
00172
00173 try {
00174 SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
00175 __buffers[tmp_image_id] = b;
00176 return b;
00177 } catch (Exception &e) {
00178 throw;
00179 }
00180 } else {
00181 return __bit->second;
00182 }
00183 }
00184
00185
00186
00187
00188
00189 void
00190 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m)
00191 {
00192 FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>();
00193
00194 SharedMemoryImageBuffer *b;
00195 try {
00196 b = get_shmimgbuf(irm->image_id);
00197 } catch (Exception &e) {
00198 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00199 m->payload(), m->payload_size(),
00200 true);
00201 __outbound_queue->push(nm);
00202 return;
00203 }
00204
00205 if ( irm->format == FUSE_IF_RAW ) {
00206 FuseImageContent *im = new FuseImageContent(b);
00207 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00208 } else if ( irm->format == FUSE_IF_JPEG ) {
00209 if ( ! __jpeg_compressor) {
00210 __jpeg_compressor = new JpegImageCompressor();
00211 __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM);
00212 }
00213 b->lock_for_read();
00214 __jpeg_compressor->set_image_dimensions(b->width(), b->height());
00215 __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer());
00216 unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size());
00217 __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size());
00218 __jpeg_compressor->compress();
00219 b->unlock();
00220 size_t compressed_buffer_size = __jpeg_compressor->compressed_size();
00221 long int sec = 0, usec = 0;
00222 b->capture_time(&sec, &usec);
00223 FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(),
00224 compressed_buffer, compressed_buffer_size,
00225 CS_UNKNOWN, b->width(), b->height(),
00226 sec, usec);
00227 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00228 free(compressed_buffer);
00229 } else {
00230 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00231 m->payload(), m->payload_size(),
00232 true);
00233 __outbound_queue->push(nm);
00234 }
00235 }
00236
00237
00238
00239
00240 void
00241 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m)
00242 {
00243 FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>();
00244
00245 SharedMemoryImageBuffer *b;
00246 try {
00247 b = get_shmimgbuf(idm->image_id);
00248
00249 FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
00250
00251 strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH);
00252 ii->colorspace = htons(b->colorspace());
00253 ii->width = htonl(b->width());
00254 ii->height = htonl(b->height());
00255 ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
00256
00257 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO,
00258 ii, sizeof(FUSE_imageinfo_t));
00259 __outbound_queue->push(nm);
00260 } catch (Exception &e) {
00261 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00262 m->payload(), m->payload_size(),
00263 true);
00264 __outbound_queue->push(nm);
00265 }
00266 }
00267
00268
00269
00270
00271
00272 void
00273 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m)
00274 {
00275 FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>();
00276
00277 char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
00278 tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
00279 strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
00280
00281 if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) {
00282
00283 FuseLutContent *lm = new FuseLutContent(__lit->second);
00284 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00285 } else {
00286 try {
00287 SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id);
00288 __luts[tmp_lut_id] = b;
00289 FuseLutContent *lm = new FuseLutContent(b);
00290 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00291 } catch (Exception &e) {
00292
00293 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
00294 m->payload(), m->payload_size(),
00295 true);
00296 __outbound_queue->push(nm);
00297 }
00298 }
00299 }
00300
00301
00302
00303
00304
00305 void
00306 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m)
00307 {
00308 FuseLutContent *lc = m->msgc<FuseLutContent>();
00309 FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t));
00310 strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH);
00311
00312
00313 SharedMemoryLookupTable *b;
00314 if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) {
00315
00316 b = __lit->second;
00317 } else {
00318 try {
00319 b = new SharedMemoryLookupTable(lc->lut_id(), false);
00320 __luts[lc->lut_id()] = b;
00321 } catch (Exception &e) {
00322 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00323 reply, sizeof(FUSE_lutdesc_message_t)));
00324 e.append("Cannot open shared memory lookup table %s", lc->lut_id());
00325 LibLogger::log_warn("FuseServerClientThread", e);
00326 delete lc;
00327 return;
00328 }
00329 }
00330
00331 if ( (b->width() != lc->width()) ||
00332 (b->height() != lc->height()) ||
00333 (b->depth() != lc->depth()) ||
00334 (b->bytes_per_cell() != lc->bytes_per_cell()) ) {
00335 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00336 reply, sizeof(FUSE_lutdesc_message_t)));
00337 LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. "
00338 "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
00339 b->width(), b->height(), b->depth(), b->bytes_per_cell(),
00340 lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell());
00341 } else {
00342 b->set(lc->buffer());
00343 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED,
00344 reply, sizeof(FUSE_lutdesc_message_t)));
00345 }
00346
00347 delete lc;
00348 }
00349
00350
00351
00352
00353
00354 void
00355 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m)
00356 {
00357 FuseImageListContent *ilm = new FuseImageListContent();
00358
00359 SharedMemoryImageBufferHeader *h = new SharedMemoryImageBufferHeader();
00360 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
00361 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00362
00363 while ( i != endi ) {
00364 const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
00365 if ( ih ) {
00366 ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
00367 }
00368
00369 ++i;
00370 }
00371
00372 delete h;
00373
00374 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
00375 }
00376
00377
00378
00379
00380
00381 void
00382 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m)
00383 {
00384 FuseLutListContent *llm = new FuseLutListContent();
00385
00386 SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader();
00387 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
00388 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00389
00390 while ( i != endi ) {
00391 const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
00392 if ( lh ) {
00393 llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
00394 }
00395
00396 ++i;
00397 }
00398
00399 delete h;
00400
00401 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
00402 }
00403
00404
00405
00406 void
00407 FuseServerClientThread::process_inbound()
00408 {
00409 __inbound_queue->lock();
00410 while ( ! __inbound_queue->empty() ) {
00411 FuseNetworkMessage *m = __inbound_queue->front();
00412
00413 try {
00414 switch (m->type()) {
00415 case FUSE_MT_GREETING:
00416 process_greeting_message(m);
00417 break;
00418 case FUSE_MT_GET_IMAGE:
00419 process_getimage_message(m);
00420 break;
00421 case FUSE_MT_GET_IMAGE_INFO:
00422 process_getimageinfo_message(m);
00423 break;
00424 case FUSE_MT_GET_IMAGE_LIST:
00425 process_getimagelist_message(m);
00426 break;
00427 case FUSE_MT_GET_LUT_LIST:
00428 process_getlutlist_message(m);
00429 break;
00430 case FUSE_MT_GET_LUT:
00431 process_getlut_message(m);
00432 break;
00433 case FUSE_MT_SET_LUT:
00434 process_setlut_message(m);
00435 break;
00436 default:
00437 throw Exception("Unknown message type received\n");
00438 }
00439 } catch (Exception &e) {
00440 e.append("FUSE protocol error");
00441 LibLogger::log_warn("FuseServerClientThread", e);
00442 __fuse_server->connection_died(this);
00443 __alive = false;
00444 }
00445
00446 m->unref();
00447 __inbound_queue->pop();
00448 }
00449 __inbound_queue->unlock();
00450 }
00451
00452
00453 void
00454 FuseServerClientThread::loop()
00455 {
00456 if ( ! __alive ) {
00457 usleep(10000);
00458 return;
00459 }
00460
00461 short p = 0;
00462 try {
00463 p = __socket->poll(10);
00464 } catch (InterruptedException &e) {
00465
00466 return;
00467 }
00468
00469 if ( (p & Socket::POLL_ERR) ||
00470 (p & Socket::POLL_HUP) ||
00471 (p & Socket::POLL_RDHUP)) {
00472 __fuse_server->connection_died(this);
00473 __alive = false;
00474 } else if ( p & Socket::POLL_IN ) {
00475 try {
00476
00477 recv();
00478 process_inbound();
00479 }
00480 catch (...) {
00481 __fuse_server->connection_died(this);
00482 __alive = false;
00483 }
00484 }
00485
00486 if ( __alive ) {
00487 send();
00488 }
00489 }
00490
00491 }