From a243907cfe4f39f9b74eea88a41bf535a42158ce Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 18 Aug 2010 17:56:18 +0200 Subject: Add connection_pool_factory, use it as the default --- odb/mysql/connection-factory.cxx | 156 +++++++++++++++++++++++++++++++++++++++ odb/mysql/connection-factory.hxx | 90 ++++++++++++++++++++++ odb/mysql/database.cxx | 10 +-- 3 files changed, 251 insertions(+), 5 deletions(-) diff --git a/odb/mysql/connection-factory.cxx b/odb/mysql/connection-factory.cxx index 64fdc57..db7a702 100644 --- a/odb/mysql/connection-factory.cxx +++ b/odb/mysql/connection-factory.cxx @@ -3,12 +3,45 @@ // copyright : Copyright (c) 2009-2010 Code Synthesis Tools CC // license : GNU GPL v2; see accompanying LICENSE file +#include +#include // CR_UNKNOWN_ERROR + #include +#include + +#include +#include + +using namespace std; namespace odb { + using namespace details; + namespace mysql { + namespace + { + struct mysql_init + { + mysql_init () + { + if (my_thread_init ()) + { + throw database_exception ( + CR_UNKNOWN_ERROR, "?????", "thread initialization failed"); + } + } + + ~mysql_init () + { + my_thread_end (); + } + }; + + static ODB_TLS_OBJECT (mysql_init) mysql_init_; + } + // // connection_factory // @@ -25,6 +58,8 @@ namespace odb shared_ptr new_connection_factory:: connect () { + tls_get (mysql_init_); + return shared_ptr (new (shared) connection (*db_)); } @@ -33,5 +68,126 @@ namespace odb { db_ = &db; } + + // + // connection_pool_factory + // + + connection_pool_factory:: + ~connection_pool_factory () + { + // Wait for all the connections currently in use to return to + // the pool. + // + lock l (mutex_); + while (in_use_ != 0) + { + waiters_++; + cond_.wait (); + waiters_--; + } + } + + shared_ptr connection_pool_factory:: + connect () + { + tls_get (mysql_init_); + + lock l (mutex_); + + while (true) + { + // See if we have a spare connection. + // + if (connections_.size () != 0) + { + shared_ptr c (connections_.back ()); + c->pool_ = this; + connections_.pop_back (); + in_use_++; + return c; + } + + // See if we can create a new one. + // + if(max_ == 0 || in_use_ < max_) + { + shared_ptr c ( + new (shared) pooled_connection (*db_, this)); + in_use_++; + return c; + } + + // Wait until someone releases a connection. + // + waiters_++; + cond_.wait (); + waiters_--; + } + } + + void connection_pool_factory:: + database (database_type& db) + { + tls_get (mysql_init_); + + db_ = &db; + + if (min_ > 0) + { + connections_.reserve (min_); + + for(size_t i (0); i < min_; ++i) + { + connections_.push_back ( + shared_ptr ( + new (shared) pooled_connection (*db_, 0))); + } + } + } + + void connection_pool_factory:: + release (pooled_connection* c) + { + c->pool_ = 0; + lock l (mutex_); + + // Determine if we need to keep or free this connection. + // + bool keep (waiters_ != 0 || + min_ == 0 || + (connections_.size () + in_use_ <= min_)); + + in_use_--; + + if (keep) + connections_.push_back ( + shared_ptr (inc_ref (c))); + + if (waiters_ != 0) + cond_.signal (); + } + + // + // connection_pool_factory::pooled_connection + // + + connection_pool_factory::pooled_connection:: + pooled_connection (database_type& db, connection_pool_factory* pool) + : connection (db), pool_ (pool) + { + callback_.arg = this; + callback_.zero_counter = &zero_counter; + shared_base::callback_ = &callback_; + } + + void connection_pool_factory::pooled_connection:: + zero_counter (void* arg) + { + pooled_connection* c (static_cast (arg)); + + if (c->pool_) + c->pool_->release (c); + } } } diff --git a/odb/mysql/connection-factory.hxx b/odb/mysql/connection-factory.hxx index ef86224..bb439a8 100644 --- a/odb/mysql/connection-factory.hxx +++ b/odb/mysql/connection-factory.hxx @@ -6,12 +6,18 @@ #ifndef ODB_MYSQL_CONNECTION_FACTORY_HXX #define ODB_MYSQL_CONNECTION_FACTORY_HXX +#include +#include // std::size_t + #include #include #include #include +#include +#include + namespace odb { namespace mysql @@ -47,8 +53,92 @@ namespace odb database (database_type&); private: + new_connection_factory (const new_connection_factory&); + new_connection_factory& operator= (const new_connection_factory&); + + private: database_type* db_; }; + + class connection_pool_factory: public connection_factory + { + public: + // The max_connections argument specifies the maximum number of + // concurrent connections this pool will maintain. If this value + // is 0 then the pool will create a new connection every time all + // of the existing connections are in use. + // + // The min_connections argument specifies the minimum number of + // connections that should be maintained by the pool. If the + // number of connections maintained by the pool exceeds this + // number and there are no active waiters for a new connection, + // then the pool will release the excess connections. If this + // value is 0 then the pool will maintain all the connections + // that were ever created. + // + connection_pool_factory (std::size_t max_connections = 0, + std::size_t min_connections = 0) + : max_ (max_connections), + min_ (min_connections), + in_use_ (0), + waiters_ (0), + db_ (0), + cond_ (mutex_) + { + } + + virtual shared_ptr + connect (); + + virtual void + database (database_type&); + + virtual + ~connection_pool_factory (); + + private: + connection_pool_factory (const connection_pool_factory&); + connection_pool_factory& operator= (const connection_pool_factory&); + + private: + class pooled_connection: public connection + { + public: + // NULL pool value indicates that the connection is not in use. + // + pooled_connection (database_type&, connection_pool_factory*); + + private: + static void + zero_counter (void*); + + private: + friend class connection_pool_factory; + + shared_base::refcount_callback callback_; + connection_pool_factory* pool_; + }; + + friend class pooled_connection; + typedef std::vector > connections; + + private: + void + release (pooled_connection*); + + private: + const std::size_t max_; + const std::size_t min_; + + std::size_t in_use_; // Number of connections currently in use. + std::size_t waiters_; // Number of threads waiting for a connection. + + database_type* db_; + connections connections_; + + details::mutex mutex_; + details::condition cond_; + }; } } diff --git a/odb/mysql/database.cxx b/odb/mysql/database.cxx index 8ad4bd3..b73e48f 100644 --- a/odb/mysql/database.cxx +++ b/odb/mysql/database.cxx @@ -40,7 +40,7 @@ namespace odb factory_ (factory) { if (factory_.get () == 0) - factory_.reset (new new_connection_factory ()); + factory_.reset (new connection_pool_factory ()); factory_->database (*this); } @@ -66,7 +66,7 @@ namespace odb factory_ (factory) { if (factory_.get () == 0) - factory_.reset (new new_connection_factory ()); + factory_.reset (new connection_pool_factory ()); factory_->database (*this); } @@ -92,7 +92,7 @@ namespace odb factory_ (factory) { if (factory_.get () == 0) - factory_.reset (new new_connection_factory ()); + factory_.reset (new connection_pool_factory ()); factory_->database (*this); } @@ -118,7 +118,7 @@ namespace odb factory_ (factory) { if (factory_.get () == 0) - factory_.reset (new new_connection_factory ()); + factory_.reset (new connection_pool_factory ()); factory_->database (*this); } @@ -144,7 +144,7 @@ namespace odb factory_ (factory) { if (factory_.get () == 0) - factory_.reset (new new_connection_factory ()); + factory_.reset (new connection_pool_factory ()); factory_->database (*this); } -- cgit v1.1