IBSimu 1.0.4

scheduler.hpp

Go to the documentation of this file.
00001 
00005 /* Copyright (c) 2005-2009 Taneli Kalvas. All rights reserved.
00006  *
00007  * You can redistribute this software and/or modify it under the terms
00008  * of the GNU General Public License as published by the Free Software
00009  * Foundation; either version 2 of the License, or (at your option)
00010  * any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but
00013  * WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00015  * General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with this library (file "COPYING" included in the package);
00019  * if not, write to the Free Software Foundation, Inc., 51 Franklin
00020  * Street, Fifth Floor, Boston, MA 02110-1301 USA
00021  * 
00022  * If you have questions about your rights to use or distribute this
00023  * software, please contact Berkeley Lab's Technology Transfer
00024  * Department at TTD@lbl.gov. Other questions, comments and bug
00025  * reports should be sent directly to the author via email at
00026  * taneli.kalvas@jyu.fi.
00027  * 
00028  * NOTICE. This software was developed under partial funding from the
00029  * U.S.  Department of Energy.  As such, the U.S. Government has been
00030  * granted for itself and others acting on its behalf a paid-up,
00031  * nonexclusive, irrevocable, worldwide license in the Software to
00032  * reproduce, prepare derivative works, and perform publicly and
00033  * display publicly.  Beginning five (5) years after the date
00034  * permission to assert copyright is obtained from the U.S. Department
00035  * of Energy, and subject to any subsequent five (5) year renewals,
00036  * the U.S. Government is granted for itself and others acting on its
00037  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
00038  * the Software to reproduce, prepare derivative works, distribute
00039  * copies to the public, perform publicly and display publicly, and to
00040  * permit others to do so.
00041  */
00042 
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045 
00046 
00047 #include <pthread.h>
00048 #include <iostream>
00049 #include <vector>
00050 #include <deque>
00051 //#include <sys/time.h>
00052 
00053 
00054 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
00055 
00056 
00083 template <class Solv, class Prob, class Err>
00084 class Scheduler {
00085 
00086     class Consumer {
00087 
00088         /*
00089         enum consumer_status_e {
00090             CONSUMER_CREATED = 0,
00091             CONSUMER_RUNNING,
00092             CONSUMER_FINISHED
00093         };
00094         */
00095 
00096         //pthread_mutex_t      _mutex;            //!< \brief Mutex for active check
00097         pthread_t            _thread;
00098         Solv                *_solver;
00099         Scheduler           *_scheduler;
00100         //struct timeval       _t0;
00101         //std::vector<struct timeval> _t;
00102     
00103         void *consumer_main( void ) {
00104             Prob *p;
00105             //struct timeval t;
00106             
00107             //pthread_mutex_lock( &_mutex );
00108             //_status = CONSUMER_RUNNING;
00109             //pthread_mutex_unlock( &_mutex );
00110 
00111             while( (p = _scheduler->get_next_problem()) ) {
00112                 try {
00113                     //gettimeofday( &t, NULL );
00114                     //_t.push_back( t );
00115                     (*_solver)( p, *_scheduler );
00116                     //gettimeofday( &t, NULL );
00117                     //_t.push_back( t );
00118                 } catch( Err e ) {
00119                     //std::cout << "on_error\n";
00120                     // Handle error and stop solving
00121                     _scheduler->on_error( e, p );
00122                     break;
00123                 };
00124                 _scheduler->put_solved_problem( p );
00125             }
00126       
00127             //std::cout << "Exiting consumer\n";
00128             //pthread_mutex_lock( &_mutex );
00129             //_status = CONSUMER_FINISHED;
00130             //pthread_mutex_unlock( &_mutex );
00131             return( NULL );
00132         }
00133     
00134     public:
00135 
00136         static void *consumer_entry( void *data ) {
00137             Consumer *consumer = (Consumer *)data;
00138             return( consumer->consumer_main() );
00139         }
00140 
00141         Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 
00142 
00143             //pthread_mutex_init( &_mutex, NULL );
00144             //std::cout << "Start\n";
00145             //gettimeofday( &_t0, NULL );
00146         }
00147 
00148         ~Consumer() {
00149             //pthread_mutex_lock( &cout_mutex );
00150             //std::cout << "End\n";
00151             //for( size_t a = 0; a < _t.size(); a++ ) {
00152             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00153             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
00154             //a++;
00155             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00156             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
00157             //}
00158             //pthread_mutex_unlock( &cout_mutex );
00159         }
00160 
00161         void run( void ) {
00162             pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00163         }
00164 
00165         void join( void ) {
00166             //pthread_mutex_lock( &_mutex );
00167             //if( _status == CONSUMER_FINISHED ) {
00168             //pthread_mutex_unlock( &_mutex );
00169             //return;
00170             //} else if( _status == CONSUMER_CREATED ) {
00171             //
00172             //}
00173             //pthread_mutex_unlock( &_mutex );
00174             pthread_join( _thread, NULL );
00175         }
00176 
00177     };
00178 
00179 
00180     pthread_mutex_t         _mutex;            
00181     pthread_cond_t          _scheduler_cond;   
00182     pthread_cond_t          _producer_cond;    
00183     pthread_cond_t          _consumer_cond;    
00184 
00185     size_t                  _problems_in_c;    
00186     size_t                  _problems_out_c;   
00187     size_t                  _problems_err_c;   
00188     std::deque<Prob*>       _problems_in;      
00189     std::deque<Prob*>       _problems_out;     
00190 
00191     pthread_t               _scheduler_thread; 
00192     std::vector<Consumer *> _consumers;        
00193 
00194     bool                    _running;          
00195     bool                    _error;            
00196     bool                    _done;             
00197     bool                    _finish;           
00198     std::vector<Err>        _err;              
00199     std::vector<Prob *>     _prob;             
00200 
00201 
00207     void on_error( Err &e, Prob *p ) {
00208         pthread_mutex_lock( &_mutex );
00209         _err.push_back( e );
00210         _prob.push_back( p );
00211         _problems_err_c++;
00212         _error = true;
00213         pthread_cond_broadcast( &_scheduler_cond );
00214         pthread_mutex_unlock( &_mutex );
00215     }
00216 
00217 
00218     Prob *get_next_problem( void ) {
00219         Prob *ret;
00220         pthread_mutex_lock( &_mutex );
00221     
00222         if( _done || _error ) {
00223             pthread_mutex_unlock( &_mutex );
00224             return( NULL );
00225         }
00226 
00227         if( _problems_in.empty() ) {
00228             // Signal producer that problems are spent
00229             pthread_cond_signal( &_scheduler_cond );
00230             while( _problems_in.empty() ) {
00231                 // Wait for new problems
00232                 pthread_cond_wait( &_consumer_cond, &_mutex );
00233                 if( _done || _error ) {
00234                     pthread_mutex_unlock( &_mutex );
00235                     return( NULL );
00236                 }
00237             }
00238         }
00239 
00240         // Return next problem
00241         ret = _problems_in.front();
00242         _problems_in.pop_front();
00243         pthread_mutex_unlock( &_mutex );
00244         return( ret );
00245     }
00246 
00247 
00248     void put_solved_problem( Prob *p ) {
00249         pthread_mutex_lock( &_mutex );
00250         _problems_out_c++;
00251         //std::cout << "put_solved_problem(): " << _problems_out_c << "\n";     
00252         _problems_out.push_back( p );
00253         pthread_mutex_unlock( &_mutex );
00254     }
00255 
00256 
00257     void *scheduler_main( void ) {
00258 
00259         // Moved from
00260         for( size_t a = 0; a < _consumers.size(); a++ )
00261             _consumers[a]->run();
00262 
00263         pthread_mutex_lock( &_mutex );
00264 
00265         while( 1 ) {
00266             // Wait until all consumers are done with all problems or error occurs
00267             while( !(_problems_in.empty() || _done || _error) ) {
00268                 //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
00269                 pthread_cond_wait( &_scheduler_cond, &_mutex );
00270             }
00271 
00272             if( (_finish && _problems_in_c == _problems_out_c+_problems_err_c) || 
00273                 _done || _error )
00274                 break;
00275 
00276             // Problems temporarily done
00277             pthread_cond_wait( &_scheduler_cond, &_mutex );
00278             //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
00279             //<< " prob_out = " << _problems_out_c << "\n";
00280             //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
00281 
00282             // Signal consumers to wake up
00283             pthread_cond_broadcast( &_consumer_cond );
00284         }
00285 
00286         // Broadcast done
00287         _done = true;
00288         pthread_cond_broadcast( &_consumer_cond );
00289         pthread_mutex_unlock( &_mutex );
00290 
00291         // Join all consumers
00292         //std::cout << "scheduler_main(): Scheduler waiting in join\n";
00293         for( size_t a = 0; a < _consumers.size(); a++ )
00294             _consumers[a]->join();
00295 
00296         pthread_cond_broadcast( &_producer_cond );
00297         //std::cout << "scheduler_main(): Exiting scheduler\n";
00298         _running = false;
00299         return( NULL );
00300     }
00301 
00302 
00303 
00304 
00305     static void *scheduler_entry( void *data ) {
00306         Scheduler *scheduler = (Scheduler *)data;
00307         return( scheduler->scheduler_main() );
00308     }
00309 
00310 
00311 public:
00312 
00313 
00319     Scheduler( std::vector<Solv *> s )
00320         : _problems_in_c(0), _problems_out_c(0), _problems_err_c(0), _running(false) {
00321 
00322         pthread_mutex_init( &_mutex, NULL );
00323         pthread_cond_init( &_scheduler_cond, NULL );
00324         pthread_cond_init( &_consumer_cond, NULL );
00325         pthread_cond_init( &_producer_cond, NULL );
00326 
00327         // Create consumer threads
00328         for( size_t a = 0; a < s.size(); a++ )
00329             _consumers.push_back( new Consumer( s[a], this ) );
00330     }
00331 
00332 
00335     ~Scheduler() {
00336         finish();
00337         pthread_join( _scheduler_thread, NULL );
00338 
00339         pthread_mutex_destroy( &_mutex );
00340         pthread_cond_destroy( &_scheduler_cond );
00341         pthread_cond_destroy( &_consumer_cond );
00342         pthread_cond_destroy( &_producer_cond );
00343 
00344         // Delete consumer threads
00345         for( size_t a = 0; a < _consumers.size(); a++ )
00346             delete _consumers[a];
00347     }
00348 
00349 
00355     template <class Cont>
00356     size_t get_solved_problems( Cont &c ) {
00357         pthread_mutex_lock( &_mutex );
00358         size_t r = _problems_out.size();
00359         while( !_problems_out.empty() ) {
00360             c.push_back( _problems_out.front() );
00361             _problems_out.pop_front();
00362         }
00363         pthread_mutex_unlock( &_mutex );
00364         return( r );
00365     }
00366 
00367     
00370     bool is_error( void ) {
00371         // No mutex needed for one bit read
00372         return( _error );
00373     }
00374 
00375 
00378     bool is_running( void ) {
00379         // No mutex needed for one bit read
00380         return( _running );
00381     }
00382 
00383 
00390     template <class Cont1, class Cont2>
00391     size_t get_errors( Cont1 &e, Cont2 &p ) {
00392         pthread_mutex_lock( &_mutex );
00393         size_t r = _err.size();
00394         for( size_t a = 0; a < _err.size(); a++ ) {
00395             e.push_back( _err[a] );
00396             p.push_back( _prob[a] );
00397         }
00398         _err.clear();
00399         _prob.clear();
00400         pthread_mutex_unlock( &_mutex );
00401         return( r );
00402     }    
00403 
00410     void run( void ) {
00411 
00412         if( _running )
00413             return;
00414         _running = true;
00415         _error = false;
00416         _done = false;
00417         _finish = false;
00418         _err.clear();
00419         _prob.clear();
00420         pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00421     }
00422 
00423 
00426     void add_problem( Prob *p ) {
00427 
00428         pthread_mutex_lock( &_mutex );
00429         _problems_in_c++;
00430         _problems_in.push_back( p );
00431         pthread_cond_broadcast( &_scheduler_cond );     
00432         pthread_mutex_unlock( &_mutex );
00433     }
00434 
00435 
00438     void add_problems( std::vector<Prob *> p ) {
00439 
00440         pthread_mutex_lock( &_mutex );
00441         _problems_in_c += p.size();
00442         _problems_in.insert( _problems_in.end(), p.begin(), p.end() );
00443         pthread_cond_broadcast( &_scheduler_cond );     
00444         pthread_mutex_unlock( &_mutex );
00445     }
00446 
00447 
00453     bool finish( void ) {
00454         if( _finish )
00455             return( true );
00456         if( !_running )
00457             return( false );
00458 
00459         pthread_mutex_lock( &_mutex );
00460         _finish = true;
00461         //std::cout << "finish(): scheduler_cond broadcast\n";
00462         pthread_cond_broadcast( &_scheduler_cond );
00463 
00464         //std::cout << "finish(): producer_cond wait\n";
00465         pthread_cond_wait( &_producer_cond, &_mutex );
00466         pthread_mutex_unlock( &_mutex );
00467 
00468         if( _error )
00469             return( false );
00470         return( true );
00471     }
00472 
00473 
00474     friend class Consumer;
00475 };
00476 
00477 
00478 
00479 #endif
00480 
00481 
00482 
00483 
00484 
00485 
00486 
00487 
00488 
00489 
00490 
00491 
00492 
00493 
00494 
00495 
00496 
00497 
00498