include/dmlite/cpp/utils/poolcontainer.h

Go to the documentation of this file.
00001 /// @file    include/dmlite/cpp/utils/poolcontainer.h
00002 /// @brief   Pooling
00003 /// @author  Alejandro Álvarez Ayllón <aalvarez@cern.ch>
00004 #ifndef DMLITE_CPP_UTILS_POOLCONTAINER_H
00005 #define DMLITE_CPP_UTILS_POOLCONTAINER_H
00006 
00007 #include <boost/thread/mutex.hpp>
00008 #include <boost/thread/condition.hpp>
00009 #include <boost/date_time/posix_time/posix_time.hpp>
00010 #include <map>
00011 #include <syslog.h>
00012 #include <queue>
00013 #include "../exceptions.h"
00014 
00015 namespace dmlite {
00016 
00017   /// Classes implementing this interface creates the actual element
00018   /// since the pool is agnosstic
00019   template <class E>
00020   class PoolElementFactory {
00021    public:
00022     /// Destructor
00023     virtual ~PoolElementFactory() {};
00024 
00025     /// Creates an element
00026     virtual E create() = 0;
00027 
00028     /// Destroys an element
00029     virtual void destroy(E) = 0;
00030 
00031     /// Check it is still valid
00032     virtual bool isValid(E) = 0;
00033   };
00034 
00035 
00036   /// Implements a pool of whichever resource
00037   template <class E>
00038   class PoolContainer {
00039    public:
00040     /// Constructor
00041     /// @param factory The factory to use when spawning a new resource.
00042     /// @param n       The number of resources to keep in the pool. Up to 2*n slots can be created without penalty (but only n will be pooled)
00043     PoolContainer(PoolElementFactory<E>* factory, int n): max_(n), factory_(factory), freeSlots_(2*n)
00044     {
00045     }
00046 
00047     /// Destructor
00048     ~PoolContainer()
00049     {
00050       boost::mutex::scoped_lock lock(mutex_);
00051       // Free 'free'
00052       while (free_.size() > 0) {
00053         E e = free_.front();
00054         free_.pop_front();
00055         factory_->destroy(e);
00056       }
00057       // Freeing used is dangerous, as we might block if the client code
00058       // forgot about something. Assume the memory leak :(
00059       if (used_.size() > 0) {
00060         syslog(LOG_USER | LOG_WARNING, "%ld used elements from a pool not released on destruction!", (long)used_.size());
00061       }
00062     }
00063 
00064     /// Acquires a free resource.
00065     E  acquire(bool block = true)
00066     {
00067       bool found = false; 
00068       E e;
00069       
00070       { // lock scope
00071         boost::mutex::scoped_lock lock(mutex_);
00072       
00073         // Wait for one free
00074         if (!block && (freeSlots_ == 0)) {
00075           throw DmException(DMLITE_SYSERR(EBUSY),
00076                             std::string("No resources available"));
00077         }
00078 
00079         boost::system_time const timeout = boost::get_system_time() + boost::posix_time::seconds(60);
00080       
00081         while (freeSlots_ < 1) {
00082           if (boost::get_system_time() >= timeout) {
00083             syslog(LOG_USER | LOG_WARNING, "Timeout...%d seconds", 60);
00084             break;
00085           }
00086           available_.timed_wait(lock, timeout);
00087         }
00088 
00089         // If there is any in the queue, give one from there
00090         if (free_.size() > 0) {
00091           e = free_.front();
00092           free_.pop_front();
00093           // May have expired! In this case the element as to be destroyed,
00094           // and the conclusion is that we have not found a good one in the pool
00095           if (!factory_->isValid(e)) {
00096             factory_->destroy(e);
00097           }
00098           else
00099             found = true;
00100         }
00101       
00102       } // lock
00103 
00104       // We create a new element out of the lock. This may help for elements that need other elements
00105       // of the same type to be constructed (sigh)
00106       if (!found) 
00107         e = factory_->create();
00108     
00109       { // lock scope (again, sigh)
00110         boost::mutex::scoped_lock lock(mutex_);
00111         // Keep track of used
00112         used_.insert(std::pair<E, unsigned>(e, 1));
00113 
00114         // Note that in case of timeout freeSlots_ can become negative
00115         --freeSlots_;
00116       }
00117       return e;
00118     }
00119 
00120     /// Increases the reference count of a resource.
00121     E acquire(E e)
00122     {
00123       boost::mutex::scoped_lock lock(mutex_);
00124 
00125       // Make sure it is there
00126       typename std::map<E, unsigned>::const_iterator i = used_.find(e);
00127       if (i == used_.end()) {
00128         throw DmException(DMLITE_SYSERR(EINVAL), std::string("The resource has not been locked previously!"));
00129       }
00130 
00131       // Increase
00132       used_[e]++;
00133 
00134       // End
00135       return e;
00136     }
00137 
00138     /// Releases a resource
00139     /// @param e The resource to release.
00140     /// @return  The reference count after releasing.
00141     unsigned release(E e)
00142     {
00143       boost::mutex::scoped_lock lock(mutex_);
00144       // Decrease reference count
00145       unsigned remaining = --used_[e];
00146       // No one else using it (hopefully...)
00147       if (used_[e] == 0) {
00148         // Remove from used
00149         used_.erase(e);
00150         // If the free size is less than the maximum, push to free and notify
00151         if ((long)free_.size() < max_) {
00152           free_.push_back(e);
00153         }
00154         else {
00155           // If we are fine, destroy
00156           factory_->destroy(e);
00157         }
00158       }
00159       available_.notify_one();
00160       ++freeSlots_;
00161 
00162       return remaining;
00163     }
00164 
00165     /// Count the number of instances
00166     unsigned refCount(E e)
00167     {
00168       typename std::map<E, unsigned>::const_iterator i = used_.find(e);
00169       if (i == used_.end())
00170         return 0;
00171       return used_[e];
00172     }
00173 
00174     /// Change the pool size
00175     /// @param ns The new size.
00176     void resize(int ns)
00177     {
00178       // The resizing will be done as we get requests
00179       boost::mutex::scoped_lock lock(mutex_);
00180       max_ = ns;
00181 
00182 
00183       freeSlots_ = 2*max_ - used_.size();
00184       // Increment the semaphore size if needed
00185       // Take into account the used
00186       if (freeSlots_ > 0)
00187         available_.notify_all();
00188     }
00189 
00190    private:
00191     // The max count of pooled instances
00192     int max_;
00193 
00194     PoolElementFactory<E> *factory_;
00195 
00196     std::deque<E>         free_;
00197     std::map<E, unsigned> used_;
00198     int freeSlots_;
00199 
00200     boost::mutex              mutex_;
00201     boost::condition_variable available_;
00202   };
00203 
00204   /// Convenience class that releases a resource on destruction
00205   template <class E>
00206   class PoolGrabber {
00207    public:
00208     PoolGrabber(PoolContainer<E>& pool, bool block = true): pool_(pool)
00209     {
00210       element_ = pool_.acquire(block);
00211     }
00212 
00213     ~PoolGrabber() {
00214       pool_.release(element_);
00215     }
00216 
00217     operator E ()
00218     {
00219       return element_;
00220     }
00221 
00222    private:
00223     PoolContainer<E>& pool_;
00224     E element_;
00225   };
00226 };
00227 
00228 #endif // DMLITE_CPP_UTILS_POOLCONTAINER_H

Generated on 4 May 2016 for dmlite by  doxygen 1.4.7