aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odb/pgsql/binding.hxx18
-rw-r--r--odb/pgsql/database.cxx6
-rw-r--r--odb/pgsql/database.hxx27
-rw-r--r--odb/pgsql/database.ixx28
-rw-r--r--odb/pgsql/error.cxx59
-rw-r--r--odb/pgsql/error.hxx18
-rw-r--r--odb/pgsql/error.ixx14
-rw-r--r--odb/pgsql/exceptions.cxx5
-rw-r--r--odb/pgsql/no-id-object-statements.hxx8
-rw-r--r--odb/pgsql/no-id-object-statements.txx8
-rw-r--r--odb/pgsql/pgsql-types.hxx19
-rw-r--r--odb/pgsql/query.hxx10
-rw-r--r--odb/pgsql/simple-object-statements.hxx28
-rw-r--r--odb/pgsql/simple-object-statements.txx36
-rw-r--r--odb/pgsql/statement.cxx690
-rw-r--r--odb/pgsql/statement.hxx70
16 files changed, 896 insertions, 148 deletions
diff --git a/odb/pgsql/binding.hxx b/odb/pgsql/binding.hxx
index 63cf2eb..1adf144 100644
--- a/odb/pgsql/binding.hxx
+++ b/odb/pgsql/binding.hxx
@@ -41,17 +41,27 @@ namespace odb
public:
typedef pgsql::bind bind_type;
- binding (): bind (0), count (0), version (0) {}
+ binding ()
+ : bind (0), count (0), version (0),
+ batch (0), skip (0), status (0) {}
binding (bind_type* b, std::size_t n)
- : bind (b), count (n), version (0)
- {
- }
+ : bind (b), count (n), version (0),
+ batch (0), skip (0), status (0) {}
+
+ binding (bind_type* b, std::size_t n,
+ std::size_t bt, std::size_t s, unsigned long long* st)
+ : bind (b), count (n), version (0),
+ batch (bt), skip (s), status (st) {}
bind_type* bind;
std::size_t count;
std::size_t version;
+ std::size_t batch;
+ std::size_t skip;
+ unsigned long long* status; // Batch status array.
+
private:
binding (const binding&);
binding& operator= (const binding&);
diff --git a/odb/pgsql/database.cxx b/odb/pgsql/database.cxx
index b4d3732..09bf6f0 100644
--- a/odb/pgsql/database.cxx
+++ b/odb/pgsql/database.cxx
@@ -249,10 +249,11 @@ namespace odb
// Bind parameters and results.
//
+ char* pbuf[1] = {const_cast<char*> (name.c_str ())};
size_t psize[1] = {name.size ()};
bool pnull[1] = {false};
bind pbind[1] = {{bind::text,
- const_cast<char*> (name.c_str ()),
+ &pbuf[0],
&psize[0],
psize[0],
&pnull[0],
@@ -302,10 +303,11 @@ namespace odb
bool exists (true);
if (cp == 0 && c.server_version () >= 90400)
{
+ char* pbuf[1] = {const_cast<char*> (table)};
size_t psize[1] = {strlen (table)};
bool pnull[1] = {false};
bind pbind[1] = {{bind::text,
- const_cast<char*> (table),
+ &pbuf[0],
&psize[0],
psize[0],
&pnull[0],
diff --git a/odb/pgsql/database.hxx b/odb/pgsql/database.hxx
index d3b805d..fcb3ad3 100644
--- a/odb/pgsql/database.hxx
+++ b/odb/pgsql/database.hxx
@@ -124,6 +124,13 @@ namespace odb
typename object_traits<T>::id_type
persist (const typename object_traits<T>::pointer_type& obj_ptr);
+ // Bulk persist. Can be a range of references or pointers (including
+ // smart pointers) to objects.
+ //
+ template <typename I>
+ void
+ persist (I begin, I end, bool continue_failed = true);
+
// Load an object. Throw object_not_persistent if not found.
//
template <typename T>
@@ -210,6 +217,13 @@ namespace odb
void
update (const typename object_traits<T>::pointer_type& obj_ptr);
+ // Bulk update. Can be a range of references or pointers (including
+ // smart pointers) to objects.
+ //
+ template <typename I>
+ void
+ update (I begin, I end, bool continue_failed = true);
+
// Update a section of an object. Throws the section_not_loaded
// exception if the section is not loaded. Note also that this
// function does not clear the changed flag if it is set.
@@ -253,6 +267,19 @@ namespace odb
void
erase (const typename object_traits<T>::pointer_type& obj_ptr);
+ // Bulk erase.
+ //
+ template <typename T, typename I>
+ void
+ erase (I id_begin, I id_end, bool continue_failed = true);
+
+ // Can be a range of references or pointers (including smart pointers)
+ // to objects.
+ //
+ template <typename I>
+ void
+ erase (I obj_begin, I obj_end, bool continue_failed = true);
+
// Erase multiple objects matching a query predicate.
//
template <typename T>
diff --git a/odb/pgsql/database.ixx b/odb/pgsql/database.ixx
index ecc94f9..f04c3e6 100644
--- a/odb/pgsql/database.ixx
+++ b/odb/pgsql/database.ixx
@@ -119,6 +119,13 @@ namespace odb
return persist_<T, id_pgsql> (pobj);
}
+ template <typename I>
+ inline void database::
+ persist (I b, I e, bool cont)
+ {
+ persist_<I, id_pgsql> (b, e, cont);
+ }
+
template <typename T>
inline typename object_traits<T>::pointer_type database::
load (const typename object_traits<T>::id_type& id)
@@ -280,6 +287,13 @@ namespace odb
update_<T, id_pgsql> (pobj);
}
+ template <typename I>
+ inline void database::
+ update (I b, I e, bool cont)
+ {
+ update_<I, id_pgsql> (b, e, cont);
+ }
+
template <typename T>
inline void database::
update (const T& obj, const section& s)
@@ -369,6 +383,20 @@ namespace odb
erase_<T, id_pgsql> (pobj);
}
+ template <typename T, typename I>
+ inline void database::
+ erase (I idb, I ide, bool cont)
+ {
+ erase_id_<I, T, id_pgsql> (idb, ide, cont);
+ }
+
+ template <typename I>
+ inline void database::
+ erase (I ob, I oe, bool cont)
+ {
+ erase_object_<I, id_pgsql> (ob, oe, cont);
+ }
+
template <typename T>
inline unsigned long long database::
erase_query ()
diff --git a/odb/pgsql/error.cxx b/odb/pgsql/error.cxx
index 5d34fde..ba8451e 100644
--- a/odb/pgsql/error.cxx
+++ b/odb/pgsql/error.cxx
@@ -15,11 +15,12 @@ namespace odb
namespace pgsql
{
void
- translate_error (connection& c, PGresult* r)
+ translate_error (connection& c, PGresult* r,
+ size_t pos, multiple_exceptions* mex)
{
if (!r)
{
- if (CONNECTION_BAD == PQstatus (c.handle ()))
+ if (PQstatus (c.handle ()) == CONNECTION_BAD)
{
c.mark_failed ();
throw connection_lost ();
@@ -28,50 +29,58 @@ namespace odb
throw bad_alloc ();
}
- string msg;
- {
- // Can be NULL in case of PGRES_BAD_RESPONSE.
- //
- const char* m (PQresultErrorMessage (r));
- msg = (m != 0 ? m : "bad server response");
-
- // Get rid of a trailing newline if there is one.
- //
- string::size_type n (msg.size ());
- if (n != 0 && msg[n - 1] == '\n')
- msg.resize (n - 1);
- }
-
+ // Note that we expect the caller to handle PGRES_PIPELINE_ABORTED since
+ // it's not really an error but rather an indication that no attempt was
+ // made to execute this statement.
+ //
+ string ss;
switch (PQresultStatus (r))
{
case PGRES_BAD_RESPONSE:
{
- throw database_exception (msg);
+ throw database_exception ("bad server response");
}
case PGRES_FATAL_ERROR:
{
- string ss;
- {
- const char* s (PQresultErrorField (r, PG_DIAG_SQLSTATE));
- ss = (s != 0 ? s : "?????");
- }
+ const char* s (PQresultErrorField (r, PG_DIAG_SQLSTATE));
+ ss = (s != 0 ? s : "?????");
// Deadlock detected.
//
if (ss == "40001" || ss == "40P01")
throw deadlock ();
- else if (CONNECTION_BAD == PQstatus (c.handle ()))
+ else if (PQstatus (c.handle ()) == CONNECTION_BAD)
{
c.mark_failed ();
throw connection_lost ();
}
- else
- throw database_exception (ss, msg);
+ break;
}
default:
assert (false);
break;
}
+
+ string msg;
+ {
+ // Can be NULL in case of PGRES_BAD_RESPONSE.
+ //
+ const char* m (PQresultErrorMessage (r));
+ msg = (m != 0 ? m : "bad server response");
+
+ // Get rid of the trailing newline if there is one.
+ //
+ string::size_type n (msg.size ());
+ if (n != 0 && msg[n - 1] == '\n')
+ msg.resize (n - 1);
+ }
+
+ if (mex == 0)
+ throw database_exception (ss, msg);
+ else
+ // In PosgreSQL all errors are fatal.
+ //
+ mex->insert (pos, database_exception (ss, msg), true);
}
}
}
diff --git a/odb/pgsql/error.hxx b/odb/pgsql/error.hxx
index 36ecc44..8d2793d 100644
--- a/odb/pgsql/error.hxx
+++ b/odb/pgsql/error.hxx
@@ -9,23 +9,25 @@
#include <libpq-fe.h>
#include <odb/pgsql/version.hxx>
-#include <odb/pgsql/forward.hxx> // connection
+#include <odb/pgsql/forward.hxx> // connection, multiple_exceptions
#include <odb/pgsql/details/export.hxx>
namespace odb
{
namespace pgsql
{
- // Translate an error condition involving a PGresult*. If r is null, it is
- // assumed that the error was caused due to a bad connection or a memory
- // allocation error.
+ // Translate an error condition involving PGresult* and throw (or return,
+ // in case multiple_exceptions is not NULL) an appropriate exception. If
+ // result is NULL, it is assumed that the error was caused due to a bad
+ // connection or a memory allocation error.
//
LIBODB_PGSQL_EXPORT void
- translate_error (connection& c, PGresult* r);
+ translate_error (connection& c, PGresult* r,
+ std::size_t pos = 0, multiple_exceptions* = 0);
- // Return true if the PGresult is in an error state. If both s and r are
- // non-null, the pointed to value will be populated with the result status.
- // Otherwise, s is ignored.
+ // Return true if PGresult is not NULL and is not in an error state. If
+ // both s and r are non-NULL, the pointed to value will be populated with
+ // the result status. Otherwise, s is ignored.
//
LIBODB_PGSQL_EXPORT bool
is_good_result (PGresult* r, ExecStatusType* s = 0);
diff --git a/odb/pgsql/error.ixx b/odb/pgsql/error.ixx
index 0cda31b..6a010aa 100644
--- a/odb/pgsql/error.ixx
+++ b/odb/pgsql/error.ixx
@@ -5,8 +5,8 @@ namespace odb
{
namespace pgsql
{
- bool
- inline is_good_result (PGresult* r, ExecStatusType* s)
+ inline bool
+ is_good_result (PGresult* r, ExecStatusType* s)
{
if (r != 0)
{
@@ -16,9 +16,13 @@ namespace odb
*s = status;
return
- status != PGRES_BAD_RESPONSE &&
- status != PGRES_NONFATAL_ERROR &&
- status != PGRES_FATAL_ERROR;
+ status != PGRES_BAD_RESPONSE
+ && status != PGRES_NONFATAL_ERROR
+ && status != PGRES_FATAL_ERROR
+#ifdef LIBPQ_HAS_PIPELINING
+ && status != PGRES_PIPELINE_ABORTED
+#endif
+ ;
}
return false;
diff --git a/odb/pgsql/exceptions.cxx b/odb/pgsql/exceptions.cxx
index 01d9ab7..28e7fc4 100644
--- a/odb/pgsql/exceptions.cxx
+++ b/odb/pgsql/exceptions.cxx
@@ -26,7 +26,10 @@ namespace odb
const string& message)
: sqlstate_ (sqlstate), message_ (message)
{
- what_ = sqlstate_ + ": " + message_;
+ if (!sqlstate_.empty ())
+ what_ = sqlstate_ + ": " + message_;
+ else
+ what_ = message_;
}
database_exception::
diff --git a/odb/pgsql/no-id-object-statements.hxx b/odb/pgsql/no-id-object-statements.hxx
index 6e6b53f..baa1b2a 100644
--- a/odb/pgsql/no-id-object-statements.hxx
+++ b/odb/pgsql/no-id-object-statements.hxx
@@ -48,7 +48,10 @@ namespace odb
// Object image.
//
image_type&
- image () {return image_;}
+ image (std::size_t i = 0)
+ {
+ return image_[i];
+ }
// Insert binding.
//
@@ -112,7 +115,8 @@ namespace odb
no_id_object_statements& operator= (const no_id_object_statements&);
private:
- image_type image_;
+ image_type image_[object_traits::batch];
+ unsigned long long status_[object_traits::batch];
// Select binding.
//
diff --git a/odb/pgsql/no-id-object-statements.txx b/odb/pgsql/no-id-object-statements.txx
index ced26ee..0c340ab 100644
--- a/odb/pgsql/no-id-object-statements.txx
+++ b/odb/pgsql/no-id-object-statements.txx
@@ -24,13 +24,17 @@ namespace odb
// select
select_image_binding_ (select_image_bind_, select_column_count),
// insert
- insert_image_binding_ (insert_image_bind_, insert_column_count),
+ insert_image_binding_ (insert_image_bind_,
+ insert_column_count,
+ object_traits::batch,
+ sizeof (image_type),
+ status_),
insert_image_native_binding_ (insert_image_values_,
insert_image_lengths_,
insert_image_formats_,
insert_column_count)
{
- image_.version = 0;
+ image_[0].version = 0; // Only version in the first element used.
select_image_version_ = 0;
insert_image_version_ = 0;
diff --git a/odb/pgsql/pgsql-types.hxx b/odb/pgsql/pgsql-types.hxx
index 93e4870..117a41e 100644
--- a/odb/pgsql/pgsql-types.hxx
+++ b/odb/pgsql/pgsql-types.hxx
@@ -14,8 +14,13 @@ namespace odb
{
namespace pgsql
{
- // The libpq result binding. This data structures is
- // modelled after MYSQL_BIND from MySQL.
+ // The libpq result binding. This data structures is roughly modeled
+ // after MYSQL_BIND from MySQL.
+ //
+ // Types that may need to grow are bound as pointers to pointers to char
+ // array (normally in details::buffer) in order to allow simple offsetting
+ // in bulk operation support. Note that if we were to do the same for
+ // capacity, we could get rid of the buffer growth tracking altogether.
//
struct bind
{
@@ -27,14 +32,14 @@ namespace odb
bigint, // Buffer is long long; size, capacity, truncated are unused.
real, // Buffer is float; size, capacity, truncated are unused.
double_, // Buffer is double; size, capacity, truncated are unused.
- numeric, // Buffer is a char array.
+ numeric, // Buffer is a pointer to pointer to char array.
date, // Buffer is int; size, capacity, truncated are unused.
time, // Buffer is long long; size, capacity, truncated are unused.
timestamp,// Buffer is long long; size, capacity, truncated are unused.
- text, // Buffer is a char array.
- bytea, // Buffer is a char array.
- bit, // Buffer is a char array.
- varbit, // Buffer is a char array.
+ text, // Buffer is a pointer to pointer to char array.
+ bytea, // Buffer is a pointer to pointer to char array.
+ bit, // Buffer is a pointer to char array.
+ varbit, // Buffer is a pointer to pointer to char array.
uuid // Buffer is a 16-byte char array; size capacity, truncated
// are unused. Note: big-endian, in RFC 4122/4.1.2 order.
};
diff --git a/odb/pgsql/query.hxx b/odb/pgsql/query.hxx
index b940261..42182d6 100644
--- a/odb/pgsql/query.hxx
+++ b/odb/pgsql/query.hxx
@@ -1671,7 +1671,7 @@ namespace odb
bind (bind_type* b)
{
b->type = bind::numeric;
- b->buffer = buffer_.data ();
+ b->buffer = buffer_.data_ptr ();
b->capacity = buffer_.capacity ();
b->size = &size_;
}
@@ -1836,7 +1836,7 @@ namespace odb
bind (bind_type* b)
{
b->type = bind::text;
- b->buffer = buffer_.data ();
+ b->buffer = buffer_.data_ptr ();
b->capacity = buffer_.capacity ();
b->size = &size_;
}
@@ -1881,7 +1881,7 @@ namespace odb
bind (bind_type* b)
{
b->type = bind::bytea;
- b->buffer = buffer_.data ();
+ b->buffer = buffer_.data_ptr ();
b->capacity = buffer_.capacity ();
b->size = &size_;
}
@@ -1926,7 +1926,7 @@ namespace odb
bind (bind_type* b)
{
b->type = bind::bit;
- b->buffer = buffer_.data ();
+ b->buffer = buffer_.data_ptr ();
b->capacity = buffer_.capacity ();
b->size = &size_;
}
@@ -1970,7 +1970,7 @@ namespace odb
bind (bind_type* b)
{
b->type = bind::varbit;
- b->buffer = buffer_.data ();
+ b->buffer = buffer_.data_ptr ();
b->capacity = buffer_.capacity ();
b->size = &size_;
}
diff --git a/odb/pgsql/simple-object-statements.hxx b/odb/pgsql/simple-object-statements.hxx
index 64acbe9..086ef5f 100644
--- a/odb/pgsql/simple-object-statements.hxx
+++ b/odb/pgsql/simple-object-statements.hxx
@@ -167,7 +167,8 @@ namespace odb
typedef T object_type;
typedef object_traits_impl<object_type, id_pgsql> object_traits;
- optimistic_data (bind*, char** nv, int* nl, int* nf);
+ optimistic_data (bind*, char** nv, int* nl, int* nf,
+ std::size_t skip, unsigned long long* status);
binding*
id_image_binding () {return &id_image_binding_;}
@@ -190,7 +191,8 @@ namespace odb
template <typename T>
struct optimistic_data<T, false>
{
- optimistic_data (bind*, char**, int*, int*) {}
+ optimistic_data (bind*, char**, int*, int*,
+ std::size_t, unsigned long long*) {}
binding*
id_image_binding () {return 0;}
@@ -301,7 +303,7 @@ namespace odb
// Object image.
//
image_type&
- image () {return image_;}
+ image (std::size_t i = 0) {return images_[i].obj;}
// Insert binding.
//
@@ -348,7 +350,7 @@ namespace odb
// Object id image and binding.
//
id_image_type&
- id_image () {return id_image_;}
+ id_image (std::size_t i = 0) {return images_[i].id;}
std::size_t
id_image_version () const {return id_image_version_;}
@@ -471,8 +473,8 @@ namespace odb
{
return extra_statement_cache_.get (
conn_,
- image_,
- id_image_,
+ images_[0].obj,
+ images_[0].id,
id_image_binding_,
od_.id_image_binding (),
id_image_native_binding_,
@@ -527,7 +529,18 @@ namespace odb
image_type,
id_image_type> extra_statement_cache_;
- image_type image_;
+ // The UPDATE statement uses both the object and id image. Keep them
+ // next to each other so that the same skip distance can be used in
+ // batch binding.
+ //
+ struct images
+ {
+ image_type obj;
+ id_image_type id;
+ };
+
+ images images_[object_traits::batch];
+ unsigned long long status_[object_traits::batch];
// Select binding.
//
@@ -574,7 +587,6 @@ namespace odb
// Id image binding (only used as a parameter). Uses the suffix in
// the update bind.
//
- id_image_type id_image_;
std::size_t id_image_version_;
binding id_image_binding_;
native_binding id_image_native_binding_;
diff --git a/odb/pgsql/simple-object-statements.txx b/odb/pgsql/simple-object-statements.txx
index ad87e73..bb47b43 100644
--- a/odb/pgsql/simple-object-statements.txx
+++ b/odb/pgsql/simple-object-statements.txx
@@ -19,11 +19,15 @@ namespace odb
template <typename T>
optimistic_data<T, true>::
- optimistic_data (bind* b, char** nv, int* nl, int* nf)
+ optimistic_data (bind* b, char** nv, int* nl, int* nf,
+ std::size_t skip, unsigned long long* status)
: id_image_binding_ (
b,
object_traits::id_column_count +
- object_traits::managed_optimistic_column_count),
+ object_traits::managed_optimistic_column_count,
+ object_traits::batch,
+ skip,
+ status),
id_image_native_binding_ (
nv, nl, nf,
object_traits::id_column_count +
@@ -48,7 +52,11 @@ namespace odb
// select
select_image_binding_ (select_image_bind_, select_column_count),
// insert
- insert_image_binding_ (insert_image_bind_, insert_column_count),
+ insert_image_binding_ (insert_image_bind_,
+ insert_column_count,
+ object_traits::batch,
+ sizeof (images),
+ status_),
insert_image_native_binding_ (insert_image_values_,
insert_image_lengths_,
insert_image_formats_,
@@ -56,7 +64,10 @@ namespace odb
// update
update_image_binding_ (update_image_bind_,
update_column_count + id_column_count +
- managed_optimistic_column_count),
+ managed_optimistic_column_count,
+ object_traits::batch,
+ sizeof (images),
+ status_),
update_image_native_binding_ (update_image_values_,
update_image_lengths_,
update_image_formats_,
@@ -64,7 +75,10 @@ namespace odb
managed_optimistic_column_count),
// id
id_image_binding_ (update_image_bind_ + update_column_count,
- id_column_count),
+ id_column_count,
+ object_traits::batch,
+ sizeof (images),
+ status_),
id_image_native_binding_ (
update_image_values_ + update_column_count,
update_image_lengths_ + update_column_count,
@@ -74,15 +88,19 @@ namespace odb
od_ (update_image_bind_ + update_column_count,
update_image_values_ + update_column_count,
update_image_lengths_ + update_column_count,
- update_image_formats_ + update_column_count)
+ update_image_formats_ + update_column_count,
+ sizeof (images),
+ status_)
{
- image_.version = 0;
+ // Only versions in the first element used.
+ //
+ images_[0].obj.version = 0;
+ images_[0].id.version = 0;
+
select_image_version_ = 0;
insert_image_version_ = 0;
update_image_version_ = 0;
update_id_image_version_ = 0;
-
- id_image_.version = 0;
id_image_version_ = 0;
std::memset (insert_image_bind_, 0, sizeof (insert_image_bind_));
diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx
index 977db72..c88e621 100644
--- a/odb/pgsql/statement.cxx
+++ b/odb/pgsql/statement.cxx
@@ -1,16 +1,32 @@
// file : odb/pgsql/statement.cxx
// license : GNU GPL v2; see accompanying LICENSE file
-#include <cstdlib> // std::atol
-#include <cassert>
-#include <sstream> // istringstream
+#include <odb/details/config.hxx> // ODB_CXX11
#include <libpq-fe.h>
+#ifdef LIBPQ_HAS_PIPELINING
+# ifndef _WIN32
+# include <errno.h>
+# include <sys/select.h>
+# endif
+#endif
+
+#include <cstring> // strcmp
+#include <utility> // pair
+#include <cassert>
+
+#ifdef ODB_CXX11
+# include <cstdlib> // strtoull
+#else
+# include <sstream> // istringstream
+#endif
+
#include <odb/tracer.hxx>
#include <odb/pgsql/pgsql-oid.hxx>
#include <odb/pgsql/statement.hxx>
+#include <odb/pgsql/exceptions.hxx>
#include <odb/pgsql/connection.hxx>
#include <odb/pgsql/transaction.hxx>
#include <odb/pgsql/auto-handle.hxx>
@@ -36,11 +52,12 @@ namespace odb
count = static_cast<unsigned long long> (s[0] - '0');
else
{
- // @@ Using stringstream conversion for now. See if we can optimize
- // this (atoll possibly, even though it is not standard).
- //
+#ifdef ODB_CXX11
+ count = strtoull (s, 0, 10);
+#else
istringstream ss (s);
ss >> count;
+#endif
}
return count;
@@ -256,30 +273,39 @@ namespace odb
return text_;
}
+ template <typename T>
+ static inline T*
+ offset (T* base, size_t count, size_t size)
+ {
+ return reinterpret_cast<T*> (
+ reinterpret_cast<char*> (base) + count * size);
+ }
+
void statement::
- bind_param (native_binding& n, const binding& b)
+ bind_param (native_binding& ns, const binding& bs, size_t pos)
{
- assert (n.count == b.count);
+ assert (ns.count == bs.count);
- for (size_t i (0); i < n.count; ++i)
+ for (size_t i (0); i < ns.count; ++i)
{
- const bind& current_bind (b.bind[i]);
+ const bind& b (bs.bind[i]);
- n.formats[i] = 1;
+ ns.formats[i] = 1;
- if (current_bind.buffer == 0 || // Skip NULL entries.
- (current_bind.is_null != 0 && *current_bind.is_null))
+ bool* n (b.is_null != 0 ? offset (b.is_null, pos, bs.skip) : 0);
+
+ if ((n != 0 && *n) || b.buffer == 0) // Handle NULL entries.
{
- n.values[i] = 0;
- n.lengths[i] = 0;
+ ns.values[i] = 0;
+ ns.lengths[i] = 0;
continue;
}
- n.values[i] = static_cast<char*> (current_bind.buffer);
+ ns.values[i] = static_cast<char*> (offset (b.buffer, pos, bs.skip));
size_t l (0);
- switch (current_bind.type)
+ switch (b.type)
{
case bind::boolean_:
{
@@ -325,10 +351,18 @@ namespace odb
case bind::numeric:
case bind::text:
case bind::bytea:
- case bind::bit:
case bind::varbit:
{
- l = *current_bind.size;
+ // In this case b.buffer is a pointer to pointer to buffer so we
+ // need to chase one level.
+ //
+ ns.values[i] = static_cast<char*> (
+ *reinterpret_cast<void**> (ns.values[i]));
+ }
+ // Fall through.
+ case bind::bit:
+ {
+ l = *offset (b.size, pos, bs.skip);
break;
}
case bind::uuid:
@@ -340,54 +374,61 @@ namespace odb
}
}
- n.lengths[i] = static_cast<int> (l);
+ ns.lengths[i] = static_cast<int> (l);
}
}
bool statement::
- bind_result (bind* p,
- size_t count,
+ bind_result (const binding& bs,
PGresult* result,
size_t row,
- bool truncated)
+ bool truncated,
+ size_t pos)
{
bool r (true);
int col_count (PQnfields (result));
int col (0);
- for (size_t i (0); i != count && col != col_count; ++i)
+ for (size_t i (0); i != bs.count && col != col_count; ++i)
{
- const bind& b (p[i]);
+ const bind& b (bs.bind[i]);
if (b.buffer == 0) // Skip NULL entries.
continue;
int c (col++);
- if (truncated && (b.truncated == 0 || !*b.truncated))
- continue;
+ {
+ bool* t (b.truncated != 0 ? offset (b.truncated, pos, bs.skip) : 0);
- if (b.truncated != 0)
- *b.truncated = false;
+ if (truncated && (t == 0 || !*t))
+ continue;
+
+ if (t != 0)
+ *t = false;
+ }
// Check for NULL unless we are reloading a truncated result.
//
if (!truncated)
{
- *b.is_null = PQgetisnull (result, static_cast<int> (row), c) == 1;
+ bool* n (offset (b.is_null, pos, bs.skip));
+
+ *n = PQgetisnull (result, static_cast<int> (row), c) == 1;
- if (*b.is_null)
+ if (*n)
continue;
}
+ void* buf (offset (b.buffer, pos, bs.skip));
+
const char* v (PQgetvalue (result, static_cast<int> (row), c));
switch (b.type)
{
case bind::boolean_:
{
- *static_cast<bool*> (b.buffer) =
- *reinterpret_cast<const bool*> (v);
+ *static_cast<bool*> (buf) = *reinterpret_cast<const bool*> (v);
break;
}
case bind::smallint:
@@ -430,19 +471,19 @@ namespace odb
{
case bind::smallint:
{
- *static_cast<short*> (b.buffer) =
+ *static_cast<short*> (buf) =
endian_traits::hton (static_cast<short> (i));
break;
}
case bind::integer:
{
- *static_cast<int*> (b.buffer) =
+ *static_cast<int*> (buf) =
endian_traits::hton (static_cast<int> (i));
break;
}
case bind::bigint:
{
- *static_cast<long long*> (b.buffer) = endian_traits::hton (i);
+ *static_cast<long long*> (buf) = endian_traits::hton (i);
break;
}
default:
@@ -453,25 +494,23 @@ namespace odb
}
case bind::real:
{
- *static_cast<float*> (b.buffer) =
- *reinterpret_cast<const float*> (v);
+ *static_cast<float*> (buf) = *reinterpret_cast<const float*> (v);
break;
}
case bind::double_:
{
- *static_cast<double*> (b.buffer) =
- *reinterpret_cast<const double*> (v);
+ *static_cast<double*> (buf) = *reinterpret_cast<const double*> (v);
break;
}
case bind::date:
{
- *static_cast<int*> (b.buffer) = *reinterpret_cast<const int*> (v);
+ *static_cast<int*> (buf) = *reinterpret_cast<const int*> (v);
break;
}
case bind::time:
case bind::timestamp:
{
- *static_cast<long long*> (b.buffer) =
+ *static_cast<long long*> (buf) =
*reinterpret_cast<const long long*> (v);
break;
}
@@ -481,24 +520,37 @@ namespace odb
case bind::bit:
case bind::varbit:
{
+ // Currently this is neither supported (due to capacity) nor used
+ // in batches.
+ //
+#ifdef LIBPGSQL_EXTRA_CHECKS
+ assert (pos == 0);
+#endif
+
*b.size = static_cast<size_t> (
PQgetlength (result, static_cast<int> (row), c));
- if (b.capacity < *b.size)
- {
- if (b.truncated)
- *b.truncated = true;
+ if (b.capacity < *b.size)
+ {
+ if (b.truncated)
+ *b.truncated = true;
- r = false;
- continue;
- }
+ r = false;
+ continue;
+ }
+
+ // In these cases b.buffer is a pointer to pointer to buffer so we
+ // need to chase one level.
+ //
+ if (b.type != bind::bit)
+ buf = *static_cast<void**> (buf);
- memcpy (b.buffer, v, *b.size);
- break;
+ memcpy (buf, v, *b.size);
+ break;
}
case bind::uuid:
{
- memcpy (b.buffer, v, 16);
+ memcpy (buf, v, 16);
break;
}
}
@@ -514,6 +566,378 @@ namespace odb
return r;
}
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ // Note that this function always marks the connection as failed.
+ //
+ static void
+ translate_connection_error (connection& conn)
+ {
+ const char* m (PQerrorMessage (conn.handle ()));
+
+ if (PQstatus (conn.handle ()) == CONNECTION_BAD)
+ {
+ conn.mark_failed ();
+ throw connection_lost ();
+ }
+ else
+ {
+ conn.mark_failed ();
+ throw database_exception (m != 0 ? m : "bad connection state");
+ }
+ }
+
+ // A RAII object for PGconn's non-blocking pipeline mode.
+ //
+ struct pipeline
+ {
+ connection& conn;
+ int sock;
+
+ explicit
+ pipeline (connection& c)
+ : conn (c)
+ {
+ PGconn* ch (conn.handle ());
+
+ if ((sock = PQsocket (ch)) == -1 ||
+ PQsetnonblocking (ch, 1) == -1 ||
+ PQenterPipelineMode (ch) == 0)
+ {
+ translate_connection_error (conn);
+ }
+ }
+
+ void
+ close (bool throw_ = true)
+ {
+ if (!conn.failed ())
+ {
+ PGconn* ch (conn.handle ());
+
+ if (PQexitPipelineMode (ch) == 0 ||
+ PQsetnonblocking (ch, 0) == -1)
+ {
+ if (throw_)
+ translate_connection_error (conn);
+ else
+ conn.mark_failed ();
+ }
+ }
+ }
+
+ ~pipeline ()
+ {
+ close (false);
+ }
+
+ pair<bool /* read */, bool /* write */>
+ wait (bool write, bool throw_ = true)
+ {
+ fd_set wds;
+ fd_set rds;
+
+ for (;;)
+ {
+ if (write)
+ {
+ FD_ZERO (&wds);
+ FD_SET (sock, &wds);
+ }
+
+ FD_ZERO (&rds);
+ FD_SET (sock, &rds);
+
+ if (select (sock + 1, &rds, write ? &wds : 0, 0, 0) != -1)
+ break;
+
+ if (errno != EINTR)
+ {
+ if (throw_)
+ translate_connection_error (conn);
+ else
+ {
+ conn.mark_failed ();
+ return pair<bool, bool> (false, false);
+ }
+ }
+ }
+
+ return pair<bool, bool> (FD_ISSET (sock, &rds),
+ write && FD_ISSET (sock, &wds));
+ }
+ };
+
+ // A RAII object for recovering from an error in a pipeline.
+ //
+ // Specifically, it reads and discards results until reaching
+ // PGRES_PIPELINE_SYNC.
+ //
+ struct pipeline_recovery
+ {
+ pipeline_recovery (pipeline& pl, bool wdone, bool sync)
+ : pl_ (&pl), wdone_ (wdone), sync_ (sync)
+ {
+ }
+
+ ~pipeline_recovery ()
+ {
+ if (pl_ != 0 && !pl_->conn.failed ())
+ {
+ PGconn* ch (pl_->conn.handle ());
+
+ // This code runs as part of stack unwinding caused by an exception
+ // so if we encounter an error, we "upgrade" the existing exception
+ // by marking the connection as failed.
+ //
+ // The rest is essentially a special version of execute() below.
+ //
+ // Note that on the first iteration we may still have results from
+ // the previous call to PQconsumeInput() (and these results may
+ // be the entire outstanding sequence, in which case calling wait()
+ // will block indefinitely).
+ //
+ for (bool first (true);; first = false)
+ {
+ if (sync_)
+ {
+ assert (!wdone_);
+
+ if (PQpipelineSync (ch) == 0)
+ break;
+
+ sync_ = false;
+ }
+
+ pair<bool, bool> r (false, false);
+
+ if (!first)
+ {
+ r = pl_->wait (!wdone_);
+ if (!r.first && !r.second)
+ break;
+ }
+
+ if (r.first /* read */ || first)
+ {
+ if (r.first && PQconsumeInput (ch) == 0)
+ break;
+
+ while (PQisBusy (ch) == 0)
+ {
+ auto_handle<PGresult> res (PQgetResult (ch));
+
+ // We should only get NULLs as well as PGRES_PIPELINE_ABORTED
+ // finished with PGRES_PIPELINE_SYNC.
+ //
+ if (res != 0)
+ {
+ ExecStatusType stat (PQresultStatus (res));
+
+ if (stat == PGRES_PIPELINE_SYNC)
+ return;
+
+ assert (stat == PGRES_PIPELINE_ABORTED);
+ }
+ }
+ }
+
+ if (r.second /* write */)
+ {
+ int r (PQflush (ch));
+ if (r == -1)
+ break;
+
+ if (r == 0)
+ wdone_ = true;
+ }
+ }
+
+ pl_->conn.mark_failed ();
+ }
+ }
+
+ void
+ cancel ()
+ {
+ pl_ = 0;
+ }
+
+ private:
+ pipeline* pl_;
+ bool wdone_;
+ bool sync_;
+ };
+
+ size_t statement::
+ execute (const binding& param,
+ native_binding& native_param,
+ size_t n,
+ multiple_exceptions& mex,
+ bool (*process) (size_t, PGresult*, bool, void*),
+ void* data)
+ {
+ size_t i (0); // Parameter set being attempted.
+ mex.current (i);
+
+ PGconn* ch (conn_.handle ());
+
+ pipeline pl (conn_);
+
+ // True if we've written and read everything, respectively.
+ //
+ bool wdone (false), rdone (false);
+
+ for (size_t wn (0), rn (0); !rdone; )
+ {
+ // Note that there is a special version of this code above in
+ // ~pipeline_recovery().
+ //
+ pair<bool, bool> r (pl.wait (!wdone));
+
+ // Note that once we start the pipeline, any call that may throw
+ // without marking the connection as failed should be guarded by
+ // pipeline_recovery.
+
+ // Try to minimize the chance of blocking the server by first
+ // processing the result and then sending more queries.
+ //
+ if (r.first /* read */)
+ {
+ if (PQconsumeInput (ch) == 0)
+ translate_connection_error (conn_);
+
+ while (PQisBusy (ch) == 0)
+ {
+ auto_handle<PGresult> res (PQgetResult (ch));
+
+ ExecStatusType stat (PGRES_FATAL_ERROR);
+ bool gr (is_good_result (res, &stat));
+
+ if (stat == PGRES_PIPELINE_SYNC)
+ {
+ assert (wdone && rn == n);
+ rdone = true;
+ break;
+ }
+
+ assert (rn != n);
+ ++rn;
+
+ if (stat != PGRES_PIPELINE_ABORTED)
+ {
+ // translate_error() may throw an exception (e.g., deadlock)
+ // without marking the connection as failed.
+ //
+ {
+ pipeline_recovery plr (pl, wdone, wn != n);
+
+ if (!process (i, res, gr, data))
+ translate_error (conn_, res, i, &mex);
+
+ plr.cancel ();
+ }
+
+ mex.attempted (++i);
+ mex.current (i);
+ }
+ else
+ {
+ // Should we treat PGRES_PIPELINE_ABORTED entries as attempted
+ // or not? While we did issue PQsendQueryPrepared() for them,
+ // the server tells us that it did not attemp to execute them.
+ // So it feels like they should not be treated as attempted.
+ //
+ // Note that for this to fit into out multiple_exceptions model,
+ // such an incomplete batch should be fatal (otherwise we could
+ // end up with unattempted "holes"). This is currently the case
+ // for errors handled by translate_error() but not necessarily
+ // the case for those handled by the process function (e.g.,
+ // duplicate id handled by process_insert_result() below). So in
+ // a somewhat hackish way we assume the error (e.g., duplicate
+ // id) will always be translated to an exception and pre-mark
+ // multiple_exceptions as fatal.
+ //
+ mex.fatal (true);
+ }
+
+ // We get a NULL result after each query result.
+ //
+ {
+ PGresult* end (PQgetResult (ch));
+ assert (end == 0);
+ }
+ }
+ }
+
+ if (r.second /* write */)
+ {
+ // Send queries until we get blocked (write-biased). This feels like
+ // a better overall strategy to keep the server busy compared to
+ // sending one query at a time and then re-checking if there is
+ // anything to read because the results of INSERT/UPDATE/DELETE are
+ // presumably small and quite a few of them can get buffered before
+ // the server gets blocked.
+ //
+ for (;;)
+ {
+ if (wn != n)
+ {
+ bind_param (native_param, param, wn);
+
+ if (PQsendQueryPrepared (ch,
+ name_,
+ static_cast<int> (native_param.count),
+ native_param.values,
+ native_param.lengths,
+ native_param.formats,
+ 1) == 0)
+ translate_connection_error (conn_);
+
+ if (++wn == n)
+ {
+ if (PQpipelineSync (ch) == 0)
+ translate_connection_error (conn_);
+ }
+ }
+
+ // PQflush() result:
+ //
+ // 0 -- success (queue is now empty)
+ // 1 -- blocked
+ // -1 -- error
+ //
+ int r (PQflush (ch));
+ if (r == -1)
+ translate_connection_error (conn_);
+
+ if (r == 0)
+ {
+ if (wn != n)
+ {
+ // If we continue here, then we are write-biased. And if we
+ // break, then we are read-biased.
+ //
+#ifdef LIBPGSQL_READ_BIASED
+ break;
+#else
+ continue;
+#endif
+ }
+
+ wdone = true;
+ }
+
+ break; // Blocked or done.
+ }
+ }
+ }
+
+ pl.close ();
+ return i;
+ }
+#endif
+
//
// select_statement
//
@@ -689,10 +1113,7 @@ namespace odb
return no_data;
assert (current_row_ > 0);
- return bind_result (result_.bind,
- result_.count,
- handle_,
- current_row_ - 1)
+ return bind_result (result_, handle_, current_row_ - 1)
? success
: truncated;
}
@@ -703,11 +1124,7 @@ namespace odb
assert (current_row_ > 0);
assert (current_row_ <= row_count_);
- if (!bind_result (result_.bind,
- result_.count,
- handle_,
- current_row_ - 1,
- true))
+ if (!bind_result (result_, handle_, current_row_ - 1, true))
assert (false);
}
@@ -738,6 +1155,8 @@ namespace odb
native_param_ (native_param),
returning_ (returning)
{
+ if (returning_ != 0)
+ assert (returning_->count == 1);
}
insert_statement::
@@ -759,6 +1178,8 @@ namespace odb
native_param_ (native_param),
returning_ (returning)
{
+ if (returning_ != 0)
+ assert (returning_->count == 1);
}
bool insert_statement::
@@ -792,9 +1213,9 @@ namespace odb
//
if (returning_ == 0 && stat == PGRES_FATAL_ERROR)
{
- string s (PQresultErrorField (h, PG_DIAG_SQLSTATE));
+ const char* ss (PQresultErrorField (h, PG_DIAG_SQLSTATE));
- if (s == "23505")
+ if (ss != 0 && strcmp (ss, "23505") == 0)
return false;
}
@@ -802,11 +1223,76 @@ namespace odb
}
if (returning_ != 0)
- bind_result (returning_->bind, 1, h, 0, false);
+ bind_result (*returning_, h, 0);
+
+ return true;
+ }
+
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ struct insert_data
+ {
+ binding& param;
+ binding* returning;
+ };
+
+ static bool
+ process_insert_result (size_t i, PGresult* r, bool gr, void* data)
+ {
+ insert_data& d (*static_cast<insert_data*> (data));
+
+ unsigned long long& s (d.param.status[i]);
+ s = 1;
+
+ if (gr)
+ {
+ // Note that the result can never be truncated.
+ //
+ if (d.returning != 0)
+ statement::bind_result (*d.returning, r, 0, false, i);
+ }
+ else
+ {
+ // An auto-assigned object id should never cause a duplicate
+ // primary key.
+ //
+ if (d.returning == 0 &&
+ r != 0 && PQresultStatus (r) == PGRES_FATAL_ERROR)
+ {
+ // Note that statement::execute() assumes that this will eventually
+ // be translated to an entry in multiple_exceptions.
+ //
+ const char* ss (PQresultErrorField (r, PG_DIAG_SQLSTATE));
+
+ if (ss != 0 && strcmp (ss, "23505") == 0)
+ s = 0;
+ }
+
+ if (s == 1)
+ return false;
+ }
return true;
}
+ size_t insert_statement::
+ execute (size_t n, multiple_exceptions& mex)
+ {
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+
+ insert_data d {param_, returning_};
+
+ return statement::execute (
+ param_, native_param_, n, mex, &process_insert_result, &d);
+ }
+#endif
+
//
// update_statement
//
@@ -881,6 +1367,43 @@ namespace odb
return affected_row_count (h);
}
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ static bool
+ process_update_result (size_t i, PGresult* r, bool gr, void* data)
+ {
+ binding& param (*static_cast<binding*> (data));
+
+ unsigned long long& s (param.status[i]);
+
+ if (gr)
+ {
+ s = affected_row_count (r);
+ return true;
+ }
+ else
+ {
+ s = update_statement::result_unknown;
+ return false;
+ }
+ }
+
+ size_t update_statement::
+ execute (size_t n, multiple_exceptions& mex)
+ {
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+
+ return statement::execute (
+ param_, native_param_, n, mex, &process_update_result, &param_);
+ }
+#endif
+
//
// delete_statement
//
@@ -969,5 +1492,44 @@ namespace odb
return affected_row_count (h);
}
+
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ static bool
+ process_delete_result (size_t i, PGresult* r, bool gr, void* data)
+ {
+ binding& param (*static_cast<binding*> (data));
+
+ unsigned long long& s (param.status[i]);
+
+ if (gr)
+ {
+ s = affected_row_count (r);
+ return true;
+ }
+ else
+ {
+ s = delete_statement::result_unknown;
+ return false;
+ }
+ }
+
+ size_t delete_statement::
+ execute (size_t n, multiple_exceptions& mex)
+ {
+ assert (param_ != 0);
+
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+
+ return statement::execute (
+ *param_, native_param_, n, mex, &process_delete_result, param_);
+ }
+#endif
}
}
diff --git a/odb/pgsql/statement.hxx b/odb/pgsql/statement.hxx
index b417f1c..139d2d6 100644
--- a/odb/pgsql/statement.hxx
+++ b/odb/pgsql/statement.hxx
@@ -63,22 +63,24 @@ namespace odb
void
deallocate ();
- // Adapt an ODB binding to a native PostgreSQL parameter binding.
+ // Adapt an ODB binding to a native PostgreSQL parameter binding. If pos
+ // is not 0, then bind the parameter set at this position in a batch.
//
static void
- bind_param (native_binding&, const binding&);
+ bind_param (native_binding&, const binding&, std::size_t pos = 0);
// Populate an ODB binding given a PostgreSQL result. If the truncated
// argument is true, then only truncated columns are extracted. Return
// true if all the data was extracted successfully and false if one or
- // more columns were truncated.
+ // more columns were truncated. If pos is not 0, then populate the
+ // parameter set at this position in a batch.
//
static bool
- bind_result (bind*,
- std::size_t count,
+ bind_result (const binding&,
PGresult*,
std::size_t row,
- bool truncated = false);
+ bool truncated = false,
+ std::size_t pos = 0);
protected:
// We keep two versions to take advantage of std::string COW.
@@ -102,6 +104,16 @@ namespace odb
const Oid* types,
std::size_t types_count);
+ // Bulk execute implementation.
+ //
+ std::size_t
+ execute (const binding& param,
+ native_binding& native_param,
+ std::size_t n,
+ multiple_exceptions&,
+ bool (*process) (size_t i, PGresult*, bool good, void* data),
+ void* data);
+
private:
void
init (statement_kind,
@@ -296,6 +308,20 @@ namespace odb
bool
execute ();
+ // Return the number of parameter sets (out of n) that were attempted.
+ //
+ std::size_t
+ execute (std::size_t n, multiple_exceptions&);
+
+ // Return true if successful and false if this row is a duplicate.
+ // All other errors are reported via exceptions.
+ //
+ bool
+ result (std::size_t i)
+ {
+ return param_.status[i] != 0;
+ }
+
private:
insert_statement (const insert_statement&);
insert_statement& operator= (const insert_statement&);
@@ -334,6 +360,22 @@ namespace odb
unsigned long long
execute ();
+ // Return the number of parameter sets (out of n) that were attempted.
+ //
+ std::size_t
+ execute (std::size_t n, multiple_exceptions&);
+
+ // Return the number of rows affected (updated) by the parameter
+ // set. All errors are reported by throwing exceptions.
+ //
+ static const unsigned long long result_unknown = ~0ULL;
+
+ unsigned long long
+ result (std::size_t i)
+ {
+ return param_.status[i];
+ }
+
private:
update_statement (const update_statement&);
update_statement& operator= (const update_statement&);
@@ -376,6 +418,22 @@ namespace odb
unsigned long long
execute ();
+ // Return the number of parameter sets (out of n) that were attempted.
+ //
+ std::size_t
+ execute (std::size_t n, multiple_exceptions&);
+
+ // Return the number of rows affected (deleted) by the parameter
+ // set. All errors are reported by throwing exceptions.
+ //
+ static const unsigned long long result_unknown = ~0ULL;
+
+ unsigned long long
+ result (std::size_t i)
+ {
+ return param_->status[i];
+ }
+
private:
delete_statement (const delete_statement&);
delete_statement& operator= (const delete_statement&);