aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odb/mysql/connection-factory.cxx156
-rw-r--r--odb/mysql/connection-factory.hxx90
-rw-r--r--odb/mysql/database.cxx10
3 files changed, 251 insertions, 5 deletions
diff --git a/odb/mysql/connection-factory.cxx b/odb/mysql/connection-factory.cxx
index 64fdc57..db7a702 100644
--- a/odb/mysql/connection-factory.cxx
+++ b/odb/mysql/connection-factory.cxx
@@ -3,12 +3,45 @@
// copyright : Copyright (c) 2009-2010 Code Synthesis Tools CC
// license : GNU GPL v2; see accompanying LICENSE file
+#include <mysql/mysql.h>
+#include <mysql/errmsg.h> // CR_UNKNOWN_ERROR
+
#include <odb/mysql/connection-factory.hxx>
+#include <odb/mysql/exceptions.hxx>
+
+#include <odb/details/tls.hxx>
+#include <odb/details/lock.hxx>
+
+using namespace std;
namespace odb
{
+ using namespace details;
+
namespace mysql
{
+ namespace
+ {
+ struct mysql_init
+ {
+ mysql_init ()
+ {
+ if (my_thread_init ())
+ {
+ throw database_exception (
+ CR_UNKNOWN_ERROR, "?????", "thread initialization failed");
+ }
+ }
+
+ ~mysql_init ()
+ {
+ my_thread_end ();
+ }
+ };
+
+ static ODB_TLS_OBJECT (mysql_init) mysql_init_;
+ }
+
//
// connection_factory
//
@@ -25,6 +58,8 @@ namespace odb
shared_ptr<connection> new_connection_factory::
connect ()
{
+ tls_get (mysql_init_);
+
return shared_ptr<connection> (new (shared) connection (*db_));
}
@@ -33,5 +68,126 @@ namespace odb
{
db_ = &db;
}
+
+ //
+ // connection_pool_factory
+ //
+
+ connection_pool_factory::
+ ~connection_pool_factory ()
+ {
+ // Wait for all the connections currently in use to return to
+ // the pool.
+ //
+ lock l (mutex_);
+ while (in_use_ != 0)
+ {
+ waiters_++;
+ cond_.wait ();
+ waiters_--;
+ }
+ }
+
+ shared_ptr<connection> connection_pool_factory::
+ connect ()
+ {
+ tls_get (mysql_init_);
+
+ lock l (mutex_);
+
+ while (true)
+ {
+ // See if we have a spare connection.
+ //
+ if (connections_.size () != 0)
+ {
+ shared_ptr<pooled_connection> c (connections_.back ());
+ c->pool_ = this;
+ connections_.pop_back ();
+ in_use_++;
+ return c;
+ }
+
+ // See if we can create a new one.
+ //
+ if(max_ == 0 || in_use_ < max_)
+ {
+ shared_ptr<pooled_connection> c (
+ new (shared) pooled_connection (*db_, this));
+ in_use_++;
+ return c;
+ }
+
+ // Wait until someone releases a connection.
+ //
+ waiters_++;
+ cond_.wait ();
+ waiters_--;
+ }
+ }
+
+ void connection_pool_factory::
+ database (database_type& db)
+ {
+ tls_get (mysql_init_);
+
+ db_ = &db;
+
+ if (min_ > 0)
+ {
+ connections_.reserve (min_);
+
+ for(size_t i (0); i < min_; ++i)
+ {
+ connections_.push_back (
+ shared_ptr<pooled_connection> (
+ new (shared) pooled_connection (*db_, 0)));
+ }
+ }
+ }
+
+ void connection_pool_factory::
+ release (pooled_connection* c)
+ {
+ c->pool_ = 0;
+ lock l (mutex_);
+
+ // Determine if we need to keep or free this connection.
+ //
+ bool keep (waiters_ != 0 ||
+ min_ == 0 ||
+ (connections_.size () + in_use_ <= min_));
+
+ in_use_--;
+
+ if (keep)
+ connections_.push_back (
+ shared_ptr<pooled_connection> (inc_ref (c)));
+
+ if (waiters_ != 0)
+ cond_.signal ();
+ }
+
+ //
+ // connection_pool_factory::pooled_connection
+ //
+
+ connection_pool_factory::pooled_connection::
+ pooled_connection (database_type& db, connection_pool_factory* pool)
+ : connection (db), pool_ (pool)
+ {
+ callback_.arg = this;
+ callback_.zero_counter = &zero_counter;
+ shared_base::callback_ = &callback_;
+ }
+
+ void connection_pool_factory::pooled_connection::
+ zero_counter (void* arg)
+ {
+ pooled_connection* c (static_cast<pooled_connection*> (arg));
+
+ if (c->pool_)
+ c->pool_->release (c);
+ }
}
}
diff --git a/odb/mysql/connection-factory.hxx b/odb/mysql/connection-factory.hxx
index ef86224..bb439a8 100644
--- a/odb/mysql/connection-factory.hxx
+++ b/odb/mysql/connection-factory.hxx
@@ -6,12 +6,18 @@
#ifndef ODB_MYSQL_CONNECTION_FACTORY_HXX
#define ODB_MYSQL_CONNECTION_FACTORY_HXX
+#include <vector>
+#include <cstddef> // std::size_t
+
#include <odb/shared-ptr.hxx>
#include <odb/mysql/version.hxx>
#include <odb/mysql/forward.hxx>
#include <odb/mysql/connection.hxx>
+#include <odb/details/mutex.hxx>
+#include <odb/details/condition.hxx>
+
namespace odb
{
namespace mysql
@@ -47,8 +53,92 @@ namespace odb
database (database_type&);
private:
+ new_connection_factory (const new_connection_factory&);
+ new_connection_factory& operator= (const new_connection_factory&);
+
+ private:
database_type* db_;
};
+
+ class connection_pool_factory: public connection_factory
+ {
+ public:
+ // The max_connections argument specifies the maximum number of
+ // concurrent connections this pool will maintain. If this value
+ // is 0 then the pool will create a new connection every time all
+ // of the existing connections are in use.
+ //
+ // The min_connections argument specifies the minimum number of
+ // connections that should be maintained by the pool. If the
+ // number of connections maintained by the pool exceeds this
+ // number and there are no active waiters for a new connection,
+ // then the pool will release the excess connections. If this
+ // value is 0 then the pool will maintain all the connections
+ // that were ever created.
+ //
+ connection_pool_factory (std::size_t max_connections = 0,
+ std::size_t min_connections = 0)
+ : max_ (max_connections),
+ min_ (min_connections),
+ in_use_ (0),
+ waiters_ (0),
+ db_ (0),
+ cond_ (mutex_)
+ {
+ }
+
+ virtual shared_ptr<connection>
+ connect ();
+
+ virtual void
+ database (database_type&);
+
+ virtual
+ ~connection_pool_factory ();
+
+ private:
+ connection_pool_factory (const connection_pool_factory&);
+ connection_pool_factory& operator= (const connection_pool_factory&);
+
+ private:
+ class pooled_connection: public connection
+ {
+ public:
+ // NULL pool value indicates that the connection is not in use.
+ //
+ pooled_connection (database_type&, connection_pool_factory*);
+
+ private:
+ static void
+ zero_counter (void*);
+
+ private:
+ friend class connection_pool_factory;
+
+ shared_base::refcount_callback callback_;
+ connection_pool_factory* pool_;
+ };
+
+ friend class pooled_connection;
+ typedef std::vector<shared_ptr<pooled_connection> > connections;
+
+ private:
+ void
+ release (pooled_connection*);
+
+ private:
+ const std::size_t max_;
+ const std::size_t min_;
+
+ std::size_t in_use_; // Number of connections currently in use.
+ std::size_t waiters_; // Number of threads waiting for a connection.
+
+ database_type* db_;
+ connections connections_;
+
+ details::mutex mutex_;
+ details::condition cond_;
+ };
}
}
diff --git a/odb/mysql/database.cxx b/odb/mysql/database.cxx
index 8ad4bd3..b73e48f 100644
--- a/odb/mysql/database.cxx
+++ b/odb/mysql/database.cxx
@@ -40,7 +40,7 @@ namespace odb
factory_ (factory)
{
if (factory_.get () == 0)
- factory_.reset (new new_connection_factory ());
+ factory_.reset (new connection_pool_factory ());
factory_->database (*this);
}
@@ -66,7 +66,7 @@ namespace odb
factory_ (factory)
{
if (factory_.get () == 0)
- factory_.reset (new new_connection_factory ());
+ factory_.reset (new connection_pool_factory ());
factory_->database (*this);
}
@@ -92,7 +92,7 @@ namespace odb
factory_ (factory)
{
if (factory_.get () == 0)
- factory_.reset (new new_connection_factory ());
+ factory_.reset (new connection_pool_factory ());
factory_->database (*this);
}
@@ -118,7 +118,7 @@ namespace odb
factory_ (factory)
{
if (factory_.get () == 0)
- factory_.reset (new new_connection_factory ());
+ factory_.reset (new connection_pool_factory ());
factory_->database (*this);
}
@@ -144,7 +144,7 @@ namespace odb
factory_ (factory)
{
if (factory_.get () == 0)
- factory_.reset (new new_connection_factory ());
+ factory_.reset (new connection_pool_factory ());
factory_->database (*this);
}