48 #include "BESInterface.h" 50 #include "TheBESKeys.h" 51 #include "BESResponseHandler.h" 52 #include "BESAggFactory.h" 53 #include "BESAggregationServer.h" 54 #include "BESReporterList.h" 55 #include "BESContextManager.h" 57 #include "BESExceptionManager.h" 59 #include "BESDataNames.h" 62 #include "BESStopWatch.h" 63 #include "BESTimeoutError.h" 64 #include "BESInternalError.h" 65 #include "BESInternalFatalError.h" 71 list<p_bes_init> BESInterface::_init_list;
72 list<p_bes_end> BESInterface::_end_list;
74 static jmp_buf timeout_jump;
75 static bool timeout_jump_valid =
false;
86 static volatile int timeout = 0;
88 #define BES_TIMEOUT_KEY "BES.TimeOutInSeconds" 99 static void catch_sig_alarm(
int sig)
101 if (sig == SIGALRM) {
102 LOG(
"Child listener timeout after " << timeout <<
" seconds, exiting." << endl);
107 if (timeout_jump_valid)
108 longjmp(timeout_jump, 1);
113 signal(SIGTERM, SIG_DFL);
119 static void register_signal_handler()
121 struct sigaction act;
122 sigemptyset(&act.sa_mask);
123 sigaddset(&act.sa_mask, SIGALRM);
129 act.sa_handler = catch_sig_alarm;
130 if (sigaction(SIGALRM, &act, 0))
131 throw BESInternalFatalError(
"Could not register a handler to catch alarm/timeout.", __FILE__, __LINE__);
162 static pthread_t alarm_thread;
164 static void* alarm_wait(
void * )
166 BESDEBUG(
"bes",
"Starting: " << __PRETTY_FUNCTION__ << endl);
170 sigemptyset(&sigset);
171 sigaddset(&sigset, SIGALRM);
172 sigprocmask(SIG_BLOCK, &sigset, NULL);
177 int result = sigwait(&sigset, &sig);
179 BESDEBUG(
"bes",
"Fatal error establishing timeout: " << strerror(result) << endl);
180 throw BESInternalFatalError(
string(
"Fatal error establishing timeout: ") + strerror(result), __FILE__, __LINE__);
182 else if (result == 0 && sig == SIGALRM) {
183 BESDEBUG(
"bes",
"Timeout found in " << __PRETTY_FUNCTION__ << endl);
188 oss <<
"While waiting for a timeout, found signal '" << result <<
"' in " << __PRETTY_FUNCTION__ << ends;
189 BESDEBUG(
"bes", oss.str() << endl);
194 static void wait_for_timeout()
196 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
198 pthread_attr_t thread_attr;
200 if (pthread_attr_init(&thread_attr) != 0)
202 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED ) != 0)
203 throw BESInternalFatalError(
"Failed to complete pthread attribute initialization.", __FILE__, __LINE__);
205 int status = pthread_create(&alarm_thread, &thread_attr, alarm_wait, NULL);
211 BESInterface::BESInterface(ostream *output_stream) :
212 _strm(output_stream), _timeout_from_keys(0), _dhi(0), _transmitter(0)
214 if (!output_stream) {
215 throw BESInternalError(
"output stream must be set in order to output responses", __FILE__, __LINE__);
223 string timeout_key_value;
226 istringstream iss(timeout_key_value);
227 iss >> _timeout_from_keys;
231 register_signal_handler();
238 BESInterface::~BESInterface()
280 extern BESStopWatch *bes_timing::elapsedTimeToReadStart;
281 extern BESStopWatch *bes_timing::elapsedTimeToTransmitStart;
283 int BESInterface::execute_request(
const string &from)
285 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
288 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
292 if (BESISDEBUG(TIMING_LOG)) {
293 sw.
start(
"BESInterface::execute_request", _dhi->data[REQUEST_ID]);
295 bes_timing::elapsedTimeToReadStart =
new BESStopWatch();
296 bes_timing::elapsedTimeToReadStart->
start(
"TIME_TO_READ_START", _dhi->data[REQUEST_ID]);
298 bes_timing::elapsedTimeToTransmitStart =
new BESStopWatch();
299 bes_timing::elapsedTimeToTransmitStart->
start(
"TIME_TO_TRANSMIT_START", _dhi->data[REQUEST_ID]);
303 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
306 _dhi->set_output_stream(_strm);
307 _dhi->data[REQUEST_FROM] = from;
309 pid_t thepid = getpid();
312 _dhi->data[SERVER_PID] = ss.str();
324 *(BESLog::TheLog()) << _dhi->data[SERVER_PID] <<
" from " << _dhi->data[REQUEST_FROM] <<
" request received" 329 validate_data_request();
331 build_data_request_plan();
334 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
343 if (setjmp(timeout_jump) == 0) {
344 timeout_jump_valid =
true;
345 execute_data_request_plan();
347 timeout_jump_valid =
false;
351 oss <<
"BES listener timeout after " << timeout <<
" seconds." << ends;
355 _dhi->executed =
true;
358 timeout_jump_valid =
false;
359 return exception_manager(ex);
361 catch (bad_alloc &e) {
362 timeout_jump_valid =
false;
364 return exception_manager(ex);
366 catch (exception &e) {
367 timeout_jump_valid =
false;
369 return exception_manager(ex);
372 timeout_jump_valid =
false;
373 BESInternalError ex(
"An undefined exception has been thrown", __FILE__, __LINE__);
374 return exception_manager(ex);
377 delete bes_timing::elapsedTimeToReadStart;
378 bes_timing::elapsedTimeToReadStart = 0;
380 delete bes_timing::elapsedTimeToTransmitStart;
381 bes_timing::elapsedTimeToTransmitStart = 0;
389 int BESInterface::finish(
int )
391 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ <<
" ***" << endl);
400 if (_dhi->error_info) {
402 delete _dhi->error_info;
403 _dhi->error_info = 0;
407 status = exception_manager(ex);
409 catch (bad_alloc &) {
410 string serr =
"BES out of memory";
412 status = exception_manager(ex);
415 string serr =
"An undefined exception has been thrown";
417 status = exception_manager(ex);
424 if (_dhi->error_info) {
425 _dhi->error_info->print(cout);
426 delete _dhi->error_info;
427 _dhi->error_info = 0;
437 (*BESLog::TheLog()) <<
"Problem logging status: " << ex.
get_message() << endl;
440 (*BESLog::TheLog()) <<
"Unknown problem logging status" << endl;
447 (*BESLog::TheLog()) <<
"Problem reporting request: " << ex.
get_message() << endl;
450 (*BESLog::TheLog()) <<
"Unknown problem reporting request" << endl;
457 (*BESLog::TheLog()) <<
"Problem ending request: " << ex.
get_message() << endl;
460 (*BESLog::TheLog()) <<
"Unknown problem ending request" << endl;
466 int BESInterface::finish_with_error(
int status)
468 if (!_dhi->error_info) {
470 string serr =
"Finish_with_error called with no error object";
472 status = exception_manager(ex);
475 return finish(status);
478 void BESInterface::add_init_callback(p_bes_init init)
480 _init_list.push_back(init);
491 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::initialize", _dhi->data[REQUEST_ID]);
493 BESDEBUG(
"bes",
"Initializing request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
494 bool do_continue =
true;
495 init_iter i = _init_list.begin();
497 for (; i != _init_list.end() && do_continue ==
true; i++) {
499 do_continue = p(*_dhi);
503 BESDEBUG(
"bes",
"FAILED" << endl);
504 string se =
"Initialization callback failed, exiting";
508 BESDEBUG(
"bes",
"OK" << endl);
537 if (BESISDEBUG(TIMING_LOG))
538 sw.
start(
"BESInterface::execute_data_request_plan(\"" + _dhi->data[DATA_REQUEST] +
"\")",
539 _dhi->data[REQUEST_ID]);
544 string context = BESContextManager::TheManager()->
get_context(
"bes_timeout", found);
546 timeout = strtol(context.c_str(), NULL, 10);
547 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from context)." << endl);
550 else if (_timeout_from_keys != 0) {
551 timeout = _timeout_from_keys;
552 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from keys)." << endl);
558 "Executing request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
563 BESDEBUG(
"bes",
"FAILED" << endl);
564 string se =
"The response handler \"" + _dhi->action
565 +
"\" does not exist";
568 BESDEBUG(
"bes",
"OK" << endl);
571 invoke_aggregation();
597 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::invoke_aggregation", _dhi->data[REQUEST_ID]);
599 if (_dhi->data[AGG_CMD] !=
"") {
600 BESDEBUG(
"bes",
"aggregating with: " << _dhi->data[AGG_CMD] <<
" ... "<< endl);
606 BESDEBUG(
"bes",
"FAILED" << endl);
607 string se =
"The aggregation handler " + _dhi->data[AGG_HANDLER] +
"does not exist";
610 BESDEBUG(
"bes",
"OK" << endl);
630 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::transmit_data", _dhi->data[REQUEST_ID]);
632 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting request: " << _dhi->data[DATA_REQUEST] << endl);
636 if (_dhi->error_info) {
638 _dhi->error_info->print(strm);
639 (*BESLog::TheLog()) << strm.str() << endl;
640 BESDEBUG(
"bes",
" transmitting error info using transmitter ... " << endl << strm.str() << endl);
642 _dhi->error_info->transmit(_transmitter, *_dhi);
644 else if (_dhi->response_handler) {
645 BESDEBUG(
"bes",
" BESInterface::transmit_data() - Response handler " << _dhi->response_handler->get_name() << endl);
647 _dhi->response_handler->transmit(_transmitter, *_dhi);
653 if (_dhi->error_info) {
654 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting error info using cout ... " << endl);
655 _dhi->error_info->print(cout);
656 delete _dhi->error_info;
657 _dhi->error_info = 0;
660 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Unable to transmit the response ... FAILED " << endl);
662 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
667 BESDEBUG(
"bes",
"BESInterface::transmit_data() - OK" << endl);
689 BESDEBUG(
"bes",
"Reporting on request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
691 BESReporterList::TheList()->report(*_dhi);
693 BESDEBUG(
"bes",
"OK" << endl);
696 void BESInterface::add_end_callback(p_bes_end end)
698 _end_list.push_back(end);
708 BESDEBUG(
"bes",
"Ending request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
709 end_iter i = _end_list.begin();
710 for (; i != _end_list.end(); i++) {
717 _dhi->first_container();
718 while (_dhi->container) {
719 BESDEBUG(
"bes",
"Calling BESContainer::release()" << endl);
720 _dhi->container->release();
721 _dhi->next_container();
724 BESDEBUG(
"bes",
"OK" << endl);
731 if (_dhi) _dhi->clean();
761 strm << BESIndent::LMarg <<
"BESInterface::dump - (" << (
void *)
this <<
")" << endl;
764 if (_init_list.size()) {
765 strm << BESIndent::LMarg <<
"termination functions:" << endl;
767 init_iter i = _init_list.begin();
768 for (; i != _init_list.end(); i++) {
771 strm << BESIndent::LMarg << (
void *) (*i) << endl;
773 BESIndent::UnIndent();
776 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
779 if (_end_list.size()) {
780 strm << BESIndent::LMarg <<
"termination functions:" << endl;
782 end_iter i = _end_list.begin();
783 for (; i != _end_list.end(); i++) {
784 strm << BESIndent::LMarg << (
void *) (*i) << endl;
786 BESIndent::UnIndent();
789 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
792 strm << BESIndent::LMarg <<
"data handler interface:" << endl;
795 BESIndent::UnIndent();
798 strm << BESIndent::LMarg <<
"transmitter:" << endl;
800 _transmitter->dump(strm);
801 BESIndent::UnIndent();
804 strm << BESIndent::LMarg <<
"transmitter: not set" << endl;
806 BESIndent::UnIndent();
virtual void dump(ostream &strm) const
dumps information about this object
error thrown if there is a user syntax error in the request or any other user error ...
exception thrown if an internal error is found and is fatal to the BES
exception thrown if inernal error encountered
virtual void initialize()
Initialize the BES object.
virtual std::string get_message()
get the error message for this exception
virtual void execute(BESDataHandlerInterface &dhi)=0
knows how to build a requested response object
virtual int exception_manager(BESError &e)
Manage any exceptions thrown during the whole process.
virtual void aggregate(BESDataHandlerInterface &dhi)=0
aggregate the response object
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
virtual bool start(string name)
virtual void transmit_data()
Transmit the resulting response object.
handler object that knows how to create a specific response object
Abstract exception class for the BES with basic string message.
virtual void report_request()
Report the request and status of the request to BESReporterList::TheList()
virtual void validate_data_request()
Validate the incoming request information.
virtual void clean()
Clean up after the request.
void get_value(const string &s, string &val, bool &found)
Retrieve the value of a given key, if set.
virtual void invoke_aggregation()
Aggregate the resulting response object.
virtual void end_request()
End the BES request.
virtual BESAggregationServer * find_handler(const string &handler_name)
returns the aggregation handler with the given name in the list
virtual void log_status()
Log the status of the request.
virtual int handle_exception(BESError &e, BESDataHandlerInterface &dhi)
Manage any exceptions thrown during the handling of a request.
Abstraction representing mechanism for aggregating data.
virtual void execute_data_request_plan()
Execute the data request plan.
static BESKeys * TheKeys()