OpenSync 0.22
|
00001 /* 00002 * libosengine - A synchronization engine for the opensync framework 00003 * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org> 00004 * 00005 * This library is free software; you can redistribute it and/or 00006 * modify it under the terms of the GNU Lesser General Public 00007 * License as published by the Free Software Foundation; either 00008 * version 2.1 of the License, or (at your option) any later version. 00009 * 00010 * This library is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00013 * Lesser General Public License for more details. 00014 * 00015 * You should have received a copy of the GNU Lesser General Public 00016 * License along with this library; if not, write to the Free Software 00017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00018 * 00019 */ 00020 00021 #include <fcntl.h> 00022 #include <sys/poll.h> 00023 00024 #include "opensync.h" 00025 #include "opensync_internals.h" 00026 00027 #include <sys/time.h> 00028 #include <signal.h> 00029 00030 typedef struct OSyncPendingMessage { 00031 long long int id1; 00032 int id2; 00034 OSyncMessageHandler callback; 00036 gpointer user_data; 00037 } OSyncPendingMessage; 00038 00046 00047 static 00048 gboolean _incoming_prepare(GSource *source, gint *timeout_) 00049 { 00050 *timeout_ = 1; 00051 return FALSE; 00052 } 00053 00054 static 00055 gboolean _incoming_check(GSource *source) 00056 { 00057 OSyncQueue *queue = *((OSyncQueue **)(source + 1)); 00058 if (g_async_queue_length(queue->incoming) > 0) 00059 return TRUE; 00060 00061 return FALSE; 00062 } 00063 00064 /* This function is called from the master thread. The function dispatched incoming data from 00065 * the remote end */ 00066 static 00067 gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) 00068 { 00069 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data); 00070 OSyncQueue *queue = user_data; 00071 00072 OSyncMessage *message = NULL; 00073 while ((message = g_async_queue_try_pop(queue->incoming))) { 00074 /* We check of the message is a reply to something */ 00075 if (message->cmd == OSYNC_MESSAGE_REPLY || message->cmd == OSYNC_MESSAGE_ERRORREPLY) { 00076 00077 /* Search for the pending reply. We have to lock the 00078 * list since another thread might be duing the updates */ 00079 g_mutex_lock(queue->pendingLock); 00080 00081 OSyncPendingMessage *found = NULL; 00082 00083 GList *p = NULL; 00084 for (p = queue->pendingReplies; p; p = p->next) { 00085 OSyncPendingMessage *pending = p->data; 00086 00087 if (pending->id1 == message->id1 && pending->id2 == message->id2) { 00088 00089 /* Get the pending message from the queue */ 00090 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending); 00091 found = pending; 00092 break; 00093 } 00094 } 00095 g_mutex_unlock(queue->pendingLock); 00096 00097 if (found) { 00098 /* Call the callback of the pending message and free the message */ 00099 osync_assert(found->callback); 00100 found->callback(message, found->user_data); 00101 00102 g_free(found); 00103 } else 00104 osync_trace(TRACE_INTERNAL, "%s: No pending message for %lld:%d", __func__, message->id1, message->id2); 00105 00106 } else 00107 queue->message_handler(message, queue->user_data); 00108 00109 osync_message_unref(message); 00110 } 00111 00112 osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__); 00113 return TRUE; 00114 } 00115 00116 static void _osync_queue_stop_incoming(OSyncQueue *queue) 00117 { 00118 if (queue->incoming_source) { 00119 g_source_destroy(queue->incoming_source); 00120 queue->incoming_source = NULL; 00121 } 00122 00123 if (queue->incomingContext) { 00124 g_main_context_unref(queue->incomingContext); 00125 queue->incomingContext = NULL; 00126 } 00127 00128 if (queue->incoming_functions) { 00129 g_free(queue->incoming_functions); 00130 queue->incoming_functions = NULL; 00131 } 00132 } 00133 00134 static 00135 gboolean _queue_prepare(GSource *source, gint *timeout_) 00136 { 00137 *timeout_ = 1; 00138 return FALSE; 00139 } 00140 00141 static 00142 gboolean _queue_check(GSource *source) 00143 { 00144 OSyncQueue *queue = *((OSyncQueue **)(source + 1)); 00145 if (g_async_queue_length(queue->outgoing) > 0) 00146 return TRUE; 00147 return FALSE; 00148 } 00149 00150 int _osync_queue_write_data(OSyncQueue *queue, const void *vptr, size_t n, OSyncError **error) 00151 { 00152 ssize_t nwritten = 0; 00153 00154 while (n > 0) { 00155 if ((nwritten = write(queue->fd, vptr, n)) <= 0) { 00156 if (errno == EINTR) 00157 nwritten = 0; /* and call write() again */ 00158 else { 00159 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to write IPC data: %i: %s", errno, strerror(errno)); 00160 return (-1); /* error */ 00161 } 00162 } 00163 00164 n -= nwritten; 00165 vptr += nwritten; 00166 } 00167 return (nwritten); 00168 } 00169 00170 osync_bool _osync_queue_write_long_long_int(OSyncQueue *queue, const long long int message, OSyncError **error) 00171 { 00172 if (_osync_queue_write_data(queue, &message, sizeof(long long int), error) < 0) 00173 return FALSE; 00174 00175 return TRUE; 00176 } 00177 00178 osync_bool _osync_queue_write_int(OSyncQueue *queue, const int message, OSyncError **error) 00179 { 00180 if (_osync_queue_write_data(queue, &message, sizeof(int), error) < 0) 00181 return FALSE; 00182 00183 return TRUE; 00184 } 00185 00186 /* This function sends the data to the remote side. If there is an error, it sends an error 00187 * message to the incoming queue */ 00188 static 00189 gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) 00190 { 00191 OSyncQueue *queue = user_data; 00192 OSyncError *error = NULL; 00193 00194 OSyncMessage *message = NULL; 00195 00196 while ((message = g_async_queue_try_pop(queue->outgoing))) { 00197 /* Check if the queue is connected */ 00198 if (!queue->connected) { 00199 osync_error_set(&error, OSYNC_ERROR_GENERIC, "Trying to send to a queue thats not connected"); 00200 goto error; 00201 } 00202 00203 /*FIXME: review usage of osync_marshal_get_size_message() */ 00204 if (!_osync_queue_write_int(queue, message->buffer->len + osync_marshal_get_size_message(message), &error)) 00205 goto error; 00206 00207 if (!_osync_queue_write_int(queue, message->cmd, &error)) 00208 goto error; 00209 00210 if (!_osync_queue_write_long_long_int(queue, message->id1, &error)) 00211 goto error; 00212 00213 if (!_osync_queue_write_int(queue, message->id2, &error)) 00214 goto error; 00215 00216 if (message->buffer->len) { 00217 int sent = 0; 00218 do { 00219 int written = _osync_queue_write_data(queue, message->buffer->data + sent, message->buffer->len - sent, &error); 00220 if (written < 0) 00221 goto error; 00222 00223 sent += written; 00224 } while (sent < message->buffer->len); 00225 } 00226 00227 osync_message_unref(message); 00228 } 00229 00230 return TRUE; 00231 00232 error: 00233 if (message) 00234 osync_message_unref(message); 00235 00236 if (error) { 00237 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error); 00238 if (message) { 00239 osync_marshal_error(message, error); 00240 g_async_queue_push(queue->incoming, message); 00241 } 00242 00243 osync_error_free(&error); 00244 } 00245 return FALSE; 00246 } 00247 00248 static 00249 gboolean _source_prepare(GSource *source, gint *timeout_) 00250 { 00251 *timeout_ = 1; 00252 return FALSE; 00253 } 00254 00255 static 00256 int _osync_queue_read_data(OSyncQueue *queue, void *vptr, size_t n, OSyncError **error) 00257 { 00258 size_t nleft; 00259 ssize_t nread = 0; 00260 00261 nleft = n; 00262 while (n > 0) { 00263 if ((nread = read(queue->fd, vptr, nleft)) < 0) { 00264 if (errno == EINTR) 00265 nread = 0; /* and call read() again */ 00266 else { 00267 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read IPC data: %i: %s", errno, strerror(errno)); 00268 return (-1); 00269 } 00270 } else if (nread == 0) 00271 break; /* EOF */ 00272 00273 nleft -= nread; 00274 vptr += nread; 00275 } 00276 return (n - nleft); /* return >= 0 */ 00277 } 00278 00279 static 00280 osync_bool _osync_queue_read_int(OSyncQueue *queue, int *message, OSyncError **error) 00281 { 00282 int read = _osync_queue_read_data(queue, message, sizeof(int), error); 00283 00284 if (read < 0) 00285 return FALSE; 00286 00287 if (read != sizeof(int)) { 00288 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF"); 00289 return FALSE; 00290 } 00291 00292 return TRUE; 00293 } 00294 00295 static 00296 osync_bool _osync_queue_read_long_long_int(OSyncQueue *queue, long long int *message, OSyncError **error) 00297 { 00298 int read = _osync_queue_read_data(queue, message, sizeof(long long int), error); 00299 00300 if (read < 0) 00301 return FALSE; 00302 00303 if (read != sizeof(long long int)) { 00304 osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF"); 00305 return FALSE; 00306 } 00307 00308 return TRUE; 00309 } 00310 00311 static 00312 gboolean _source_check(GSource *source) 00313 { 00314 OSyncQueue *queue = *((OSyncQueue **)(source + 1)); 00315 OSyncMessage *message = NULL; 00316 OSyncError *error = NULL; 00317 00318 if (queue->connected == FALSE) { 00319 /* Ok. so we arent connected. lets check if there are pending replies. We cannot 00320 * receive any data on the pipe, therefore, any pending replies will never 00321 * be answered. So we return error messages for all of them. */ 00322 if (queue->pendingReplies) { 00323 g_mutex_lock(queue->pendingLock); 00324 osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Broken Pipe"); 00325 GList *p = NULL; 00326 for (p = queue->pendingReplies; p; p = p->next) { 00327 OSyncPendingMessage *pending = p->data; 00328 00329 message = osync_message_new(OSYNC_MESSAGE_ERRORREPLY, 0, NULL); 00330 if (message) { 00331 osync_marshal_error(message, error); 00332 00333 message->id1 = pending->id1; 00334 message->id2 = pending->id2; 00335 00336 g_async_queue_push(queue->incoming, message); 00337 } 00338 } 00339 00340 osync_error_free(&error); 00341 g_mutex_unlock(queue->pendingLock); 00342 } 00343 00344 return FALSE; 00345 } 00346 00347 switch (osync_queue_poll(queue)) { 00348 case OSYNC_QUEUE_EVENT_NONE: 00349 return FALSE; 00350 case OSYNC_QUEUE_EVENT_READ: 00351 return TRUE; 00352 case OSYNC_QUEUE_EVENT_HUP: 00353 case OSYNC_QUEUE_EVENT_ERROR: 00354 queue->connected = FALSE; 00355 00356 /* Now we can send the hup message, and wake up the consumer thread so 00357 * it can pickup the messages in the incoming queue */ 00358 message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error); 00359 if (!message) 00360 goto error; 00361 00362 g_async_queue_push(queue->incoming, message); 00363 00364 if (queue->incomingContext) 00365 g_main_context_wakeup(queue->incomingContext); 00366 return FALSE; 00367 } 00368 00369 return FALSE; 00370 00371 error: 00372 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error); 00373 if (message) { 00374 osync_marshal_error(message, error); 00375 g_async_queue_push(queue->incoming, message); 00376 } 00377 osync_error_free(&error); 00378 return FALSE; 00379 } 00380 00381 /* This function reads from the file descriptor and inserts incoming data into the 00382 * incoming queue */ 00383 static 00384 gboolean _source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) 00385 { 00386 OSyncQueue *queue = user_data; 00387 OSyncMessage *message = NULL; 00388 OSyncError *error = NULL; 00389 00390 do { 00391 int size = 0; 00392 int cmd = 0; 00393 long long int id1 = 0; 00394 int id2 = 0; 00395 00396 if (!_osync_queue_read_int(queue, &size, &error)) 00397 goto error; 00398 00399 if (!_osync_queue_read_int(queue, &cmd, &error)) 00400 goto error; 00401 00402 if (!_osync_queue_read_long_long_int(queue, &id1, &error)) 00403 goto error; 00404 00405 if (!_osync_queue_read_int(queue, &id2, &error)) 00406 goto error; 00407 00408 message = osync_message_new(cmd, size, &error); 00409 if (!message) 00410 goto error; 00411 00412 message->id1 = id1; 00413 message->id2 = id2; 00414 00415 if (size) { 00416 int read = 0; 00417 do { 00418 int inc = _osync_queue_read_data(queue, message->buffer->data + read, size - read, &error); 00419 00420 if (inc < 0) 00421 goto error_free_message; 00422 00423 if (inc == 0) { 00424 osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Encountered EOF while data was missing"); 00425 goto error_free_message; 00426 } 00427 00428 read += inc; 00429 } while (read < size); 00430 } 00431 00432 g_async_queue_push(queue->incoming, message); 00433 00434 if (queue->incomingContext) 00435 g_main_context_wakeup(queue->incomingContext); 00436 } while (_source_check(queue->read_source)); 00437 00438 return TRUE; 00439 00440 error_free_message: 00441 osync_message_unref(message); 00442 error: 00443 if (error) { 00444 message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error); 00445 if (message) { 00446 osync_marshal_error(message, error); 00447 g_async_queue_push(queue->incoming, message); 00448 } 00449 00450 osync_error_free(&error); 00451 } 00452 00453 return FALSE; 00454 } 00455 00461 OSyncQueue *osync_queue_new(const char *name, OSyncError **error) 00462 { 00463 osync_trace(TRACE_ENTRY, "%s(%s, %p)", __func__, name, error); 00464 00465 OSyncQueue *queue = osync_try_malloc0(sizeof(OSyncQueue), error); 00466 if (!queue) 00467 goto error; 00468 00469 if (name) 00470 queue->name = g_strdup(name); 00471 queue->fd = -1; 00472 00473 if (!g_thread_supported ()) 00474 g_thread_init (NULL); 00475 00476 queue->pendingLock = g_mutex_new(); 00477 00478 queue->context = g_main_context_new(); 00479 00480 queue->outgoing = g_async_queue_new(); 00481 queue->incoming = g_async_queue_new(); 00482 00483 osync_trace(TRACE_EXIT, "%s: %p", __func__, queue); 00484 return queue; 00485 00486 error: 00487 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00488 return NULL; 00489 } 00490 00491 /* Creates anonymous pipes which dont have to be created and are automatically connected. 00492 * 00493 * Lets assume parent wants to send, child wants to receive 00494 * 00495 * osync_queue_new_pipes() 00496 * fork() 00497 * 00498 * Parent: 00499 * connect(write_queue) 00500 * disconnect(read_queue) 00501 * 00502 * Child: 00503 * connect(read_queue) 00504 * close(write_queue) 00505 * 00506 * 00507 * */ 00508 osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error) 00509 { 00510 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, read_queue, write_queue, error); 00511 00512 *read_queue = osync_queue_new(NULL, error); 00513 if (!*read_queue) 00514 goto error; 00515 00516 *write_queue = osync_queue_new(NULL, error); 00517 if (!*write_queue) 00518 goto error_free_read_queue; 00519 00520 int filedes[2]; 00521 00522 if (pipe(filedes) < 0) { 00523 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create pipes"); 00524 goto error_free_write_queue; 00525 } 00526 00527 (*read_queue)->fd = filedes[0]; 00528 (*write_queue)->fd = filedes[1]; 00529 00530 osync_trace(TRACE_EXIT, "%s", __func__); 00531 return TRUE; 00532 00533 error_free_write_queue: 00534 osync_queue_free(*write_queue); 00535 error_free_read_queue: 00536 osync_queue_free(*read_queue); 00537 error: 00538 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00539 return FALSE; 00540 } 00541 00542 void osync_queue_free(OSyncQueue *queue) 00543 { 00544 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, queue); 00545 OSyncMessage *message = NULL; 00546 OSyncPendingMessage *pending = NULL; 00547 00548 g_mutex_free(queue->pendingLock); 00549 00550 g_main_context_unref(queue->context); 00551 00552 _osync_queue_stop_incoming(queue); 00553 00554 while ((message = g_async_queue_try_pop(queue->incoming))) { 00555 osync_message_unref(message); 00556 } 00557 g_async_queue_unref(queue->incoming); 00558 00559 while ((message = g_async_queue_try_pop(queue->outgoing))) { 00560 osync_message_unref(message); 00561 } 00562 g_async_queue_unref(queue->outgoing); 00563 00564 while (queue->pendingReplies) { 00565 pending = queue->pendingReplies->data; 00566 g_free(pending); 00567 queue->pendingReplies = g_list_remove(queue->pendingReplies, pending); 00568 } 00569 00570 if (queue->name) 00571 g_free(queue->name); 00572 00573 g_free(queue); 00574 00575 osync_trace(TRACE_EXIT, "%s", __func__); 00576 } 00577 00578 osync_bool osync_queue_exists(OSyncQueue *queue) 00579 { 00580 return g_file_test(queue->name, G_FILE_TEST_EXISTS) ? TRUE : FALSE; 00581 } 00582 00583 osync_bool osync_queue_create(OSyncQueue *queue, OSyncError **error) 00584 { 00585 if (mkfifo(queue->name, 0600) != 0) { 00586 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create fifo"); 00587 return FALSE; 00588 } 00589 00590 return TRUE; 00591 } 00592 00593 osync_bool osync_queue_remove(OSyncQueue *queue, OSyncError **error) 00594 { 00595 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error); 00596 00597 if (unlink(queue->name) != 0) { 00598 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to remove queue"); 00599 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00600 return FALSE; 00601 } 00602 00603 osync_trace(TRACE_EXIT, "%s", __func__); 00604 return TRUE; 00605 } 00606 00607 static osync_bool __osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, osync_bool nonblocking, OSyncError **error) 00608 { 00609 osync_assert(queue); 00610 osync_assert(queue->connected == FALSE); 00611 OSyncQueue **queueptr = NULL; 00612 00613 queue->type = type; 00614 00615 if (queue->fd == -1) { 00616 /* First, open the queue with the flags provided by the user */ 00617 int fd = open(queue->name, (type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY) | (nonblocking ? O_NONBLOCK : 0)); 00618 if (fd == -1) { 00619 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo"); 00620 goto error; 00621 } 00622 queue->fd = fd; 00623 00624 int oldflags = fcntl(queue->fd, F_GETFD); 00625 if (oldflags == -1) { 00626 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags"); 00627 goto error_close; 00628 } 00629 if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) { 00630 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags"); 00631 goto error_close; 00632 } 00633 } 00634 00635 queue->connected = TRUE; 00636 signal(SIGPIPE, SIG_IGN); 00637 00638 /* now we start a thread which handles reading/writing of the queue */ 00639 queue->thread = osync_thread_new(queue->context, error); 00640 00641 if (!queue->thread) 00642 goto error; 00643 00644 queue->write_functions = g_malloc0(sizeof(GSourceFuncs)); 00645 queue->write_functions->prepare = _queue_prepare; 00646 queue->write_functions->check = _queue_check; 00647 queue->write_functions->dispatch = _queue_dispatch; 00648 queue->write_functions->finalize = NULL; 00649 00650 queue->write_source = g_source_new(queue->write_functions, sizeof(GSource) + sizeof(OSyncQueue *)); 00651 queueptr = (OSyncQueue **)(queue->write_source + 1); 00652 *queueptr = queue; 00653 g_source_set_callback(queue->write_source, NULL, queue, NULL); 00654 g_source_attach(queue->write_source, queue->context); 00655 g_main_context_ref(queue->context); 00656 00657 queue->read_functions = g_malloc0(sizeof(GSourceFuncs)); 00658 queue->read_functions->prepare = _source_prepare; 00659 queue->read_functions->check = _source_check; 00660 queue->read_functions->dispatch = _source_dispatch; 00661 queue->read_functions->finalize = NULL; 00662 00663 queue->read_source = g_source_new(queue->read_functions, sizeof(GSource) + sizeof(OSyncQueue *)); 00664 queueptr = (OSyncQueue **)(queue->read_source + 1); 00665 *queueptr = queue; 00666 g_source_set_callback(queue->read_source, NULL, queue, NULL); 00667 g_source_attach(queue->read_source, queue->context); 00668 g_main_context_ref(queue->context); 00669 00670 osync_thread_start(queue->thread); 00671 00672 return TRUE; 00673 00674 error_close: 00675 close(queue->fd); 00676 error: 00677 return FALSE; 00678 } 00679 00680 00681 osync_bool osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error) 00682 { 00683 return __osync_queue_connect(queue, type, FALSE, error); 00684 } 00685 00686 osync_bool osync_queue_try_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error) 00687 { 00688 return __osync_queue_connect(queue, type, TRUE, error); 00689 } 00690 00691 osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error) 00692 { 00693 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error); 00694 osync_assert(queue); 00695 00696 if (queue->thread) { 00697 osync_thread_stop(queue->thread); 00698 osync_thread_free(queue->thread); 00699 queue->thread = NULL; 00700 } 00701 00702 //g_source_unref(queue->write_source); 00703 00704 if (queue->write_functions) 00705 g_free(queue->write_functions); 00706 00707 //g_source_unref(queue->read_source); 00708 00709 _osync_queue_stop_incoming(queue); 00710 00711 /* We have to empty the incoming queue if we disconnect the queue. Otherwise, the 00712 * consumer threads might try to pick up messages even after we are done. */ 00713 OSyncMessage *message = NULL; 00714 while ((message = g_async_queue_try_pop(queue->incoming))) { 00715 osync_message_unref(message); 00716 } 00717 00718 if (close(queue->fd) != 0) { 00719 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue"); 00720 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00721 return FALSE; 00722 } 00723 00724 queue->fd = -1; 00725 queue->connected = FALSE; 00726 00727 osync_trace(TRACE_EXIT, "%s", __func__); 00728 return TRUE; 00729 } 00730 00731 osync_bool osync_queue_is_connected(OSyncQueue *queue) 00732 { 00733 osync_assert(queue); 00734 return queue->connected; 00735 } 00736 00746 void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data) 00747 { 00748 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data); 00749 00750 queue->message_handler = handler; 00751 queue->user_data = user_data; 00752 00753 osync_trace(TRACE_EXIT, "%s", __func__); 00754 } 00755 00766 void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context) 00767 { 00768 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context); 00769 00770 queue->incoming_functions = g_malloc0(sizeof(GSourceFuncs)); 00771 queue->incoming_functions->prepare = _incoming_prepare; 00772 queue->incoming_functions->check = _incoming_check; 00773 queue->incoming_functions->dispatch = _incoming_dispatch; 00774 queue->incoming_functions->finalize = NULL; 00775 00776 queue->incoming_source = g_source_new(queue->incoming_functions, sizeof(GSource) + sizeof(OSyncQueue *)); 00777 OSyncQueue **queueptr = (OSyncQueue **)(queue->incoming_source + 1); 00778 *queueptr = queue; 00779 g_source_set_callback(queue->incoming_source, NULL, queue, NULL); 00780 g_source_attach(queue->incoming_source, context); 00781 queue->incomingContext = context; 00782 // For the source 00783 g_main_context_ref(context); 00784 00785 //To unref it later 00786 g_main_context_ref(context); 00787 00788 osync_trace(TRACE_EXIT, "%s", __func__); 00789 } 00790 00791 osync_bool osync_queue_dispatch(OSyncQueue *queue, OSyncError **error) 00792 { 00793 _incoming_dispatch(NULL, NULL, queue); 00794 return TRUE; 00795 } 00796 00797 OSyncQueueEvent osync_queue_poll(OSyncQueue *queue) 00798 { 00799 struct pollfd pfd; 00800 pfd.fd = queue->fd; 00801 pfd.events = POLLIN; 00802 00803 /* Here we poll on the queue. If we read on the queue, we either receive a 00804 * POLLIN or POLLHUP. Since we cannot write to the queue, we can block pretty long here. 00805 * 00806 * If we are sending, we can only receive a POLLERR which means that the remote side has 00807 * disconnected. Since we mainly dispatch the write IO, we dont want to block here. */ 00808 int ret = poll(&pfd, 1, queue->type == OSYNC_QUEUE_SENDER ? 0 : 100); 00809 00810 if (ret < 0 && errno == EINTR) 00811 return OSYNC_QUEUE_EVENT_NONE; 00812 00813 if (ret == 0) 00814 return OSYNC_QUEUE_EVENT_NONE; 00815 00816 if (pfd.revents & POLLERR) 00817 return OSYNC_QUEUE_EVENT_ERROR; 00818 else if (pfd.revents & POLLHUP) 00819 return OSYNC_QUEUE_EVENT_HUP; 00820 else if (pfd.revents & POLLIN) 00821 return OSYNC_QUEUE_EVENT_READ; 00822 00823 return OSYNC_QUEUE_EVENT_ERROR; 00824 } 00825 00827 OSyncMessage *osync_queue_get_message(OSyncQueue *queue) 00828 { 00829 return g_async_queue_pop(queue->incoming); 00830 } 00831 00832 void gen_id(long long int *part1, int *part2) 00833 { 00834 struct timeval tv; 00835 struct timezone tz; 00836 00837 gettimeofday(&tv, &tz); 00838 00839 long long int now = tv.tv_sec * 1000000 + tv.tv_usec; 00840 00841 int rnd = (int)random(); 00842 rnd = rnd << 16 | getpid(); 00843 00844 *part1 = now; 00845 *part2 = rnd; 00846 } 00847 00848 osync_bool osync_queue_send_message(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, OSyncError **error) 00849 { 00850 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, replyqueue, message, error); 00851 00852 if (message->callback) { 00853 osync_assert(replyqueue); 00854 OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error); 00855 if (!pending) 00856 goto error; 00857 00858 gen_id(&(message->id1), &(message->id2)); 00859 pending->id1 = message->id1; 00860 pending->id2 = message->id2; 00861 00862 pending->callback = message->callback; 00863 pending->user_data = message->user_data; 00864 00865 g_mutex_lock(replyqueue->pendingLock); 00866 replyqueue->pendingReplies = g_list_append(replyqueue->pendingReplies, pending); 00867 g_mutex_unlock(replyqueue->pendingLock); 00868 } 00869 00870 osync_message_ref(message); 00871 g_async_queue_push(queue->outgoing, message); 00872 00873 g_main_context_wakeup(queue->context); 00874 00875 osync_trace(TRACE_EXIT, "%s", __func__); 00876 return TRUE; 00877 00878 error: 00879 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00880 return FALSE; 00881 } 00882 00883 osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, int timeout, OSyncError **error) 00884 { 00885 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, message, error); 00886 00887 /*TODO: add timeout handling */ 00888 00889 osync_bool ret = osync_queue_send_message(queue, replyqueue, message, error); 00890 00891 osync_trace(ret ? TRACE_EXIT : TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00892 return ret; 00893 } 00894 00895 osync_bool osync_queue_is_alive(OSyncQueue *queue) 00896 { 00897 00898 if (!osync_queue_try_connect(queue, OSYNC_QUEUE_SENDER, NULL)) { 00899 return FALSE; 00900 } 00901 00902 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, NULL); 00903 if (!message) { 00904 return FALSE; 00905 } 00906 00907 if (!osync_queue_send_message(queue, NULL, message, NULL)) { 00908 return FALSE; 00909 } 00910 00911 osync_queue_disconnect(queue, NULL); 00912 00913 return TRUE; 00914 }