ASSA::Reactor Class Reference

#include <Reactor.h>

List of all members.

Public Member Functions

 Reactor ()
 Constructor.
 ~Reactor ()
 Destructor.
TimerId registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
 Register Timer Event handler with Reactor.
bool registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
 Register I/O Event handler with Reactor.
bool removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS)
 Remove Event handler from reactor for either all I/O events or timeout event or both.
bool removeTimerHandler (TimerId id_)
 Remove Timer event from the queue.
bool removeIOHandler (handler_t fd_)
 Remove IO Event handler from reactor.
void waitForEvents (void)
 Main waiting loop that blocks indefinitely processing events.
void waitForEvents (TimeVal *tv_)
 Wait for events for time specified.
void stopReactor (void)
 Stop Reactor's activity.
void deactivate (void)
 Deactivate Reactor.

Private Types

typedef std::map< u_int, EventHandler * > Fd2Eh_Map_Type
 no cloning
typedef Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter

Private Member Functions

 Reactor (const Reactor &)
Reactoroperator= (const Reactor &)
 no cloning
void adjust_maxfdp1 (handler_t fd_, handler_t rmax_, handler_t wmax_, handler_t emax_)
 Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether).
bool handleError (void)
 Handle error in select(2) loop appropriately.
bool dispatch (int minimum_)
 Notify all EventHandlers registered on respecful events occured.
int isAnyReady (void)
 Return number of file descriptors ready accross all sets.
bool checkFDs (void)
 Check mask for bad file descriptors.
void dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
 Call handler's callback and, if callback returns negative value, remove it from the Reactor.
void calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_)
 Calculate closest timeout.

Private Attributes

int m_fd_setsize
 Max number of open files per process.
handler_t m_maxfd_plus1
 Max file descriptor number (in all sets) plus 1.
bool m_active
 Flag that indicates whether Reactor is active or had been stopped.
Fd2Eh_Map_Type m_readSet
 Event handlers awaiting on READ_EVENT.
Fd2Eh_Map_Type m_writeSet
 Event handlers awaiting on WRITE_EVENT.
Fd2Eh_Map_Type m_exceptSet
 Event handlers awaiting on EXCEPT_EVENT.
MaskSet m_waitSet
 Handlers to wait for event on.
MaskSet m_readySet
 Handlers that are ready for processing.
TimerQueue m_tqueue
 The queue of Timers.


Detailed Description

Definition at line 57 of file Reactor.h.


Member Typedef Documentation

typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter [private]

Definition at line 155 of file Reactor.h.

typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type [private]

no cloning

Definition at line 154 of file Reactor.h.


Constructor & Destructor Documentation

Reactor::Reactor (  ) 

Constructor.

Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.

Initialize winsock2 library

Definition at line 24 of file Reactor.cpp.

References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.

00024            : 
00025     m_fd_setsize  (1024), 
00026     m_maxfd_plus1 (0), 
00027     m_active      (true)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030 
00034 #if defined(WIN32)
00035     m_fd_setsize = FD_SETSIZE;
00036 
00037 #else  // POSIX
00038     struct rlimit rlim;
00039     rlim.rlim_max = 0;
00040 
00041     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042         m_fd_setsize = rlim.rlim_cur;
00043     }
00044 #endif
00045 
00048 #if defined (WIN32)             
00049     WSADATA data;
00050     WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }

Reactor::~Reactor (  ) 

Destructor.

Definition at line 55 of file Reactor.cpp.

References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.

00056 {   
00057     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00058 
00059     m_readSet.clear   ();
00060     m_writeSet.clear  ();
00061     m_exceptSet.clear ();
00062     deactivate ();
00063 }

ASSA::Reactor::Reactor ( const Reactor  )  [private]


Member Function Documentation

void Reactor::adjust_maxfdp1 ( handler_t  fd_,
handler_t  rmax_,
handler_t  wmax_,
handler_t  emax_ 
) [private]

Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether).

Win32 implementation of select() ignores this value altogether.

Definition at line 718 of file Reactor.cpp.

References DL, m_maxfd_plus1, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by removeHandler(), and removeIOHandler().

00722 {
00723 #if !defined (WIN32)  /* POSIX */
00724 
00725     trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00726 
00727     if (m_maxfd_plus1 == fd_ + 1) {
00728         m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_));
00729 
00730         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00731     }
00732 #endif
00733 }

void Reactor::calculateTimeout ( TimeVal *&  howlong_,
TimeVal maxwait_ 
) [private]

Calculate closest timeout.

If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.

Parameters:
maxwait_ (in) how long we are expected to wait for event(s).
howlong_ (out) how long we are going to wait.

Definition at line 438 of file Reactor.cpp.

References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().

Referenced by waitForEvents().

00439 {
00440     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00441 
00442     TimeVal now;
00443     TimeVal tv;
00444 
00445     if (m_tqueue.isEmpty () ) {
00446         howlong_ = maxwait_;
00447         goto done;
00448     }
00449     now = TimeVal::gettimeofday ();
00450     tv = m_tqueue.top ();
00451     
00452     if (tv < now) {
00453         /*--- 
00454           It took too long to get here (fraction of a millisecond), 
00455           and top timer had already expired. In this case,
00456           perform non-blocking select in order to drain the timer queue.
00457           ---*/
00458         *howlong_ = 0;
00459     }
00460     else {  
00461         DL((REACT,"--------- Timer Queue ----------\n"));
00462         m_tqueue.dump();
00463         DL((REACT,"--------------------------------\n"));
00464 
00465         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00466             *howlong_ = tv - now;
00467         }
00468         else {
00469             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00470         }
00471     }
00472 
00473  done:
00474     if (howlong_ != NULL) {
00475         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00476     }
00477     else {
00478         DL((REACT,"delay (forever)\n"));
00479     }
00480 }

bool Reactor::checkFDs ( void   )  [private]

Check mask for bad file descriptors.

Returns:
true if any fd(s) were found and removed; false otherwise

Definition at line 334 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by handleError().

00335 {
00336     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00337     
00338     bool num_removed = false;
00339     FdSet mask;
00340     timeval poll = { 0, 0 };
00341 
00342     for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00343         if ( m_readSet[fd] != NULL ) {
00344             mask.setFd (fd);
00345             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00346                 removeIOHandler (fd);
00347                 num_removed = true;
00348                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00349             }
00350             mask.clear (fd);
00351         }
00352     }
00353     return (num_removed);
00354 }

void ASSA::Reactor::deactivate ( void   )  [inline]

Deactivate Reactor.

This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.

Definition at line 237 of file Reactor.h.

References m_active.

Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().

00237 {  m_active = false; }

bool Reactor::dispatch ( int  minimum_  )  [private]

Notify all EventHandlers registered on respecful events occured.

Parameters:
minimum_ number of file descriptors ready.

Definition at line 643 of file Reactor.cpp.

References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00644 {
00645     trace_with_mask("Reactor::dispatch", REACTTRACE);
00646 
00647     m_tqueue.expire (TimeVal::gettimeofday ());
00648 
00649     if ( ready_ < 0 ) 
00650     {
00651 #if !defined (WIN32)
00652         EL((ASSAERR,"::select(3) error\n"));
00653 #endif
00654         return (false);
00655     }
00656     if ( ready_ == 0 ) {
00657         return (true);
00658     }
00659 
00660     DL((REACT,"Dispatching %d FDs.\n",ready_));
00661     DL((REACT,"m_readySet:\n"));
00662     m_readySet.dump ();
00663 
00664     /*--- Writes first ---*/
00665     dispatchHandler (m_readySet.m_wset, 
00666                      m_writeSet, 
00667                      &EventHandler::handle_write);
00668 
00669     /*--- Exceptions next ---*/
00670     dispatchHandler (m_readySet.m_eset, 
00671                      m_exceptSet, 
00672                      &EventHandler::handle_except);
00673 
00674     /*--- Finally, the Reads ---*/
00675     dispatchHandler (m_readySet.m_rset, 
00676                      m_readSet, 
00677                      &EventHandler::handle_read);
00678 
00679     return (true);
00680 }

void Reactor::dispatchHandler ( FdSet mask_,
Fd2Eh_Map_Type fdSet_,
EH_IO_Callback  callback_ 
) [private]

Call handler's callback and, if callback returns negative value, remove it from the Reactor.

When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.

WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).

Definition at line 585 of file Reactor.cpp.

References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.

Referenced by dispatch().

00586 {
00587     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00588 
00589     int ret = 0;
00590     handler_t fd;
00591     EventHandler* ehp = NULL;
00592     std::string eh_id;
00593 
00594     Fd2Eh_Map_Iter iter = fdSet_.begin ();
00595 
00596     while (iter != fdSet_.end ()) 
00597     {
00598         fd  = (*iter).first;
00599         ehp = (*iter).second;
00600 
00601         if (mask_.isSet (fd) && ehp != NULL) 
00602         {
00603             eh_id = ehp->get_id ();
00604             DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00605                 eh_id.c_str (), fd));
00606 
00607             ret = (ehp->*callback_) (fd); /* Fire up a callback */
00608 
00609             if (ret == -1) {
00610                 removeIOHandler (fd);
00611             }
00612             else if (ret > 0) {
00613                 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00614                     ret, fd, eh_id.c_str ()));
00615                 //return;   <-- would starve other connections
00616             }
00617             else {
00618                 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 
00619                     eh_id.c_str (), fd));
00620                 mask_.clear (fd);
00621             }
00628             iter = fdSet_.begin ();
00629         }
00630         else {
00631             iter++;
00632         }
00633     }
00634 }

bool Reactor::handleError ( void   )  [private]

Handle error in select(2) loop appropriately.

If commanded to stop, do so

Definition at line 358 of file Reactor.cpp.

References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00359 {
00360     trace_with_mask("Reactor::handleError",REACTTRACE);
00361 
00364     if ( !m_active ) {
00365         DL((REACT,"Received cmd to stop Reactor\n"));
00366         return (false);
00367     }
00368 
00369     /*---
00370       TODO: If select(2) returns before time expires, with
00371       a descriptor ready or with EINTR, timeval is not
00372       going to be updated with number of seconds remaining.
00373       This is true for all systems except Linux, which will
00374       do so. Therefore, to restart correctly in case of
00375       EINTR, we ought to take time measurement before and
00376       after select, and try to select() for remaining time.
00377     
00378       For now, we restart with the initial timing value.
00379       ---*/
00380     /*---
00381       BSD kernel never restarts select(2). SVR4 will restart if
00382       the SA_RESTART flag is specified when the signal handler
00383       for the signal delivered is installed. This means taht for
00384       portability, we must handle signal interrupts.
00385       ---*/
00386 
00387     if ( errno == EINTR ) {
00388         EL((REACT,"EINTR: interrupted select(2)\n"));
00389         /*
00390           If I was sitting in select(2) and received SIGTERM,
00391           the signal handler would have set m_active to 'false',
00392           and this function would have returned 'false' as above.
00393           For any other non-critical signals (USR1,...),
00394           we retry select.
00395         */
00396         return (true);
00397     }
00398     /*
00399       EBADF - bad file number. One of the file descriptors does
00400       not reference an open file to open(), close(), ioctl().
00401       This can happen if user closed fd and forgot to remove
00402       handler from Reactor.
00403     */
00404     if ( errno == EBADF ) {
00405         DL((REACT,"EBADF: bad file descriptor\n"));
00406         return (checkFDs ());
00407     }
00408     /*
00409       Any other error from select
00410     */
00411 #if defined (WIN32) 
00412     DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00413 #else
00414     EL((ASSAERR,"select(3) error\n"));
00415 #endif
00416     return (false);
00417 }

int Reactor::isAnyReady ( void   )  [private]

Return number of file descriptors ready accross all sets.

Definition at line 421 of file Reactor.cpp.

References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by waitForEvents().

00422 {
00423     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00424 
00425     int n = m_readySet.m_rset.numSet () +
00426         m_readySet.m_wset.numSet () +
00427         m_readySet.m_eset.numSet ();
00428 
00429     if ( n > 0 ) {
00430         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00431         m_readySet.dump ();
00432     }
00433     return (n);
00434 }

Reactor& ASSA::Reactor::operator= ( const Reactor  )  [private]

no cloning

bool Reactor::registerIOHandler ( EventHandler eh_,
handler_t  fd_,
EventType  et_ = RWE_EVENTS 
)

Register I/O Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
fd_ File descriptor
et_ Event Type
Returns:
true if success, false if error

Definition at line 93 of file Reactor.cpp.

References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::ends(), ASSA::EventHandler::get_id(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::RemoteLogger::log_open(), and ASSA::Acceptor< SERVICE_HANDLER, PEER_ACCEPTOR >::open().

00094 {
00095     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00096 
00097     std::ostringstream msg;
00098     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00099 
00100     if (isReadEvent (et_)) 
00101     {
00102         if (!m_waitSet.m_rset.setFd (fd_)) 
00103         {
00104             DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00105             return (false);
00106         }
00107         m_readSet[fd_] = eh_;
00108         msg << "READ_EVENT";
00109     }
00110 
00111     if (isWriteEvent (et_)) 
00112     {
00113         if (!m_waitSet.m_wset.setFd (fd_)) 
00114         {
00115             DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00116             return (false);
00117         }
00118         m_writeSet[fd_] = eh_;
00119         msg << " WRITE_EVENT";
00120     }
00121 
00122     if (isExceptEvent (et_)) 
00123     {
00124         if (!m_waitSet.m_eset.setFd (fd_)) 
00125         {
00126             DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00127             return (false);
00128         }
00129         m_exceptSet[fd_] = eh_;
00130         msg << " EXCEPT_EVENT";
00131     }
00132     msg << std::ends;
00133 
00134     DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 
00135         eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00136 
00137 #if !defined (WIN32)
00138     if (m_maxfd_plus1 < fd_+1) {
00139         m_maxfd_plus1 = fd_+1;
00140         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00141     }
00142 #endif
00143 
00144     DL((REACT,"Modified waitSet:\n"));
00145     m_waitSet.dump ();
00146 
00147     return (true);
00148 }

TimerId Reactor::registerTimerHandler ( EventHandler eh_,
const TimeVal tv_,
const std::string &  name_ = "<unknown>" 
)

Register Timer Event handler with Reactor.

Reactor will dispatch appropriate callback when event of EventType is received.

Parameters:
eh_ Pointer to the EventHandler
tv_ Timeout value
name_ Name of the timer
Returns:
Timer ID that can be used to cancel timer and find out its name.

Definition at line 67 of file Reactor.cpp.

References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00070 {
00071     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00072     Assure_return (eh_);
00073 
00074     TimeVal now (TimeVal::gettimeofday());
00075     TimeVal t (now + timeout_);
00076 
00077     DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",  
00078         timeout_.sec(),timeout_.msec()));
00079     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00080     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00081 
00082     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00083 
00084     DL((REACT,"---Modified Timer Queue----\n"));
00085     m_tqueue.dump();
00086     DL((REACT,"---------------------------\n"));
00087 
00088     return (tid);
00089 }

bool Reactor::removeHandler ( EventHandler eh_,
EventType  et_ = ALL_EVENTS 
)

Remove Event handler from reactor for either all I/O events or timeout event or both.

If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.

Parameters:
eh_ Pointer to the EventHandler
et_ Event Type to remove. Default will remove Event Handler for all events.
Returns:
true if success, false if wasn't registered for any events.

Definition at line 173 of file Reactor.cpp.

References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), ASSA::RemoteLogger::log_close(), and stopReactor().

00174 {
00175     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00176 
00177     bool ret = false;
00178     handler_t fd;
00179     handler_t rfdmax;
00180     handler_t wfdmax;
00181     handler_t efdmax;
00182     Fd2Eh_Map_Iter iter;
00183 
00184     rfdmax = wfdmax = efdmax = 0;
00185 
00186     if (eh_ == NULL) {
00187         return false;
00188     }
00189 
00190     if (isTimeoutEvent (event_)) {
00191         ret = m_tqueue.remove (eh_);
00192         ret = true;
00193     }
00194 
00195     if (isReadEvent (event_)) {
00196         iter = m_readSet.begin ();
00197         while (iter != m_readSet.end ()) {
00198             if ((*iter).second == eh_) {
00199                 fd = (*iter).first;
00200                 m_readSet.erase (iter);
00201                 m_waitSet.m_rset.clear (fd);
00202                 ret = true;
00203                 break;
00204             }
00205             rfdmax = fd;
00206             iter++;
00207         }
00208     } 
00209     
00210     if (isWriteEvent (event_)) {
00211         iter = m_writeSet.begin ();
00212         while (iter != m_writeSet.end ()) {
00213             if ((*iter).second == eh_) {
00214                 fd = (*iter).first;
00215                 m_writeSet.erase (iter);
00216                 m_waitSet.m_wset.clear (fd);
00217                 ret = true;
00218                 break;
00219             }
00220             wfdmax = fd;
00221             iter++;
00222         }
00223     }
00224 
00225     if (isExceptEvent (event_)) {
00226         iter = m_exceptSet.begin ();
00227         while (iter != m_exceptSet.end ()) {
00228             if ((*iter).second == eh_) {
00229                 fd = (*iter).first;
00230                 m_exceptSet.erase (iter);
00231                 m_waitSet.m_eset.clear (fd);
00232                 ret = true;
00233                 break;
00234             }
00235             efdmax = fd;
00236             iter++;
00237         }
00238     }
00239 
00240     if (ret == true) {
00241         DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
00242         eh_->handle_close (fd);
00243     }
00244 
00245     adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax);
00246 
00247     DL((REACT,"Modifies waitSet:\n"));
00248     m_waitSet.dump ();
00249 
00250     return (ret);
00251 }

bool Reactor::removeIOHandler ( handler_t  fd_  ) 

Remove IO Event handler from reactor.

This will remove handler from receiving all I/O events.

Parameters:
fd_ File descriptor
Returns:
true on success, false if fd_ is out of range

We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().

Definition at line 255 of file Reactor.cpp.

References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.

Referenced by checkFDs(), and dispatchHandler().

00256 {
00257     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00258 
00259     bool ret = false;
00260     EventHandler*  ehp = NULL;
00261     Fd2Eh_Map_Iter iter;
00262 
00263     handler_t      rfdmax;
00264     handler_t      wfdmax;
00265     handler_t      efdmax;
00266 
00267     rfdmax = wfdmax = efdmax = 0;
00268 
00269     Assure_return (ASSA::is_valid_handler (fd_));
00270 
00271     DL((REACT,"Removing handler for fd=%d\n",fd_));
00272 
00277     if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 
00278     {
00279         ehp = (*iter).second;
00280         m_readSet.erase (iter);
00281         m_waitSet.m_rset.clear (fd_);
00282         m_readySet.m_rset.clear (fd_);
00283         if (m_readSet.size () > 0) {
00284             iter = m_readSet.end ();
00285             iter--;
00286             rfdmax = (*iter).first;
00287         }
00288         ret = true;
00289     }
00290 
00291     if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 
00292     {
00293         ehp = (*iter).second;
00294         m_writeSet.erase (iter);
00295         m_waitSet.m_wset.clear (fd_);
00296         m_readySet.m_wset.clear (fd_);
00297         if (m_writeSet.size () > 0) {
00298             iter = m_writeSet.end ();
00299             iter--;
00300             wfdmax = (*iter).first;
00301         }
00302         ret = true;
00303     }
00304 
00305     if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 
00306     {
00307         ehp = (*iter).second;
00308         m_exceptSet.erase (iter);
00309         m_waitSet.m_eset.clear (fd_);
00310         m_readySet.m_eset.clear (fd_);
00311         if (m_exceptSet.size () > 0) {
00312             iter = m_exceptSet.end ();
00313             iter--;
00314             efdmax = (*iter).first;
00315         }
00316         ret = true;
00317     }
00318 
00319     if (ret == true && ehp != NULL) {
00320         DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
00321         ehp->handle_close (fd_);
00322     }
00323 
00324     adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax);
00325 
00326     DL((REACT,"Modifies waitSet:\n"));
00327     m_waitSet.dump ();
00328 
00329     return (ret);
00330 }

bool Reactor::removeTimerHandler ( TimerId  id_  ) 

Remove Timer event from the queue.

This removes particular event.

Parameters:
id_ Timer Id returned by registerTimer.
Returns:
true if timer found and removed; false otherwise

Definition at line 152 of file Reactor.cpp.

References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write().

00153 {
00154     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00155     bool ret;
00156 
00157     if ((ret = m_tqueue.remove (tid_))) {
00158         DL((REACT,"---Modified Timer Queue----\n"));
00159         m_tqueue.dump();
00160         DL((REACT,"---------------------------\n"));
00161     }
00162     else {
00163         EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00164     }
00165     return (ret);
00166 }

void Reactor::stopReactor ( void   ) 

Stop Reactor's activity.

This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.

Definition at line 684 of file Reactor.cpp.

References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.

00685 { 
00686     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00687 
00688     m_active = false; 
00689 
00690     Fd2Eh_Map_Iter iter;
00691     EventHandler* ehp;
00692 
00693     while (m_readSet.size () > 0) {
00694         iter = m_readSet.begin ();
00695         ehp = (*iter).second;
00696         removeHandler (ehp);
00697     }
00698 
00699     while (m_writeSet.size () > 0) {
00700         iter = m_writeSet.begin ();
00701         ehp = (*iter).second;
00702         removeHandler (ehp);
00703     }
00704 
00705     while (m_exceptSet.size () > 0) {
00706         iter = m_exceptSet.begin ();
00707         ehp = (*iter).second;
00708         removeHandler (ehp);
00709     }
00710 }

void Reactor::waitForEvents ( TimeVal tv_  ) 

Wait for events for time specified.

Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.

Parameters:
tv_ [RW] is time to wait for.

Definition at line 512 of file Reactor.cpp.

References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.

00513 {
00514     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00515 
00516     TimerCountdown traceTime (tv_);
00517     DL((REACT,"======================================\n"));
00518 
00519     /*--- Expire all stale Timers ---*/
00520     m_tqueue.expire (TimeVal::gettimeofday ());
00521 
00522     /* Test to see if Reactor has been deactivated as a result
00523      * of processing done by any TimerHandlers.
00524      */
00525     if (!m_active) {
00526         return;
00527     }
00528 
00529     int      nReady;
00530     TimeVal  delay;
00531     TimeVal* dlp = &delay;
00532 
00533     /*---
00534       In case if not all data have been processed by the EventHandler,
00535       and EventHandler stated so in its callback's return value
00536       to dispatcher (), it will be called again. This way 
00537       underlying file/socket stream can efficiently utilize its
00538       buffering mechaninsm.
00539       ---*/
00540     if ((nReady = isAnyReady ())) {
00541         DL((REACT,"isAnyReady returned: %d\n",nReady));
00542         dispatch (nReady);
00543         return;
00544     }
00545 
00546     DL((REACT,"=== m_waitSet ===\n"));
00547     m_waitSet.dump ();
00548 
00549     do {
00550         m_readySet.reset ();
00551         DL ((REACT,"m_readySet after reset():\n"));
00552         m_readySet.dump ();
00553 
00554         m_readySet = m_waitSet;
00555         DL ((REACT,"m_readySet after assign:\n"));
00556         m_readySet.dump ();
00557 
00558         calculateTimeout (dlp, tv_);
00559 
00560         nReady = ::select (m_maxfd_plus1, 
00561                            &m_readySet.m_rset,
00562                            &m_readySet.m_wset, 
00563                            &m_readySet.m_eset, 
00564                            dlp);
00565         DL((REACT,"::select() returned: %d\n",nReady));
00566 
00567         m_readySet.sync ();
00568         DL ((REACT,"m_readySet after select:\n"));
00569         m_readySet.dump ();
00570 
00571     } 
00572     while (nReady < 0 && handleError ());
00573 
00574     dispatch (nReady);
00575 }

void Reactor::waitForEvents ( void   ) 

Main waiting loop that blocks indefinitely processing events.

Definition at line 487 of file Reactor.cpp.

References m_active.

Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().

00488 {
00489     while ( m_active ) {
00490         waitForEvents ((TimeVal*) NULL);
00491     }
00492 }


Member Data Documentation

bool ASSA::Reactor::m_active [private]

Flag that indicates whether Reactor is active or had been stopped.

Definition at line 212 of file Reactor.h.

Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().

Fd2Eh_Map_Type ASSA::Reactor::m_exceptSet [private]

Event handlers awaiting on EXCEPT_EVENT.

Definition at line 221 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

int ASSA::Reactor::m_fd_setsize [private]

Max number of open files per process.

This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.

Definition at line 203 of file Reactor.h.

Referenced by checkFDs(), and Reactor().

handler_t ASSA::Reactor::m_maxfd_plus1 [private]

Max file descriptor number (in all sets) plus 1.

This value is ignored by WIN32 implementation of select()

Definition at line 209 of file Reactor.h.

Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().

Fd2Eh_Map_Type ASSA::Reactor::m_readSet [private]

Event handlers awaiting on READ_EVENT.

Definition at line 215 of file Reactor.h.

Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().

MaskSet ASSA::Reactor::m_readySet [private]

Handlers that are ready for processing.

Definition at line 227 of file Reactor.h.

Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().

TimerQueue ASSA::Reactor::m_tqueue [private]

The queue of Timers.

Definition at line 230 of file Reactor.h.

Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().

MaskSet ASSA::Reactor::m_waitSet [private]

Handlers to wait for event on.

Definition at line 224 of file Reactor.h.

Referenced by registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().

Fd2Eh_Map_Type ASSA::Reactor::m_writeSet [private]

Event handlers awaiting on WRITE_EVENT.

Definition at line 218 of file Reactor.h.

Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().


The documentation for this class was generated from the following files:
Generated on Sun Nov 5 12:14:53 2006 for libassa by  doxygen 1.4.7