From 7b51842728b6ee99945afe401fca317c703a12d9 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 16 Jun 2021 11:28:56 +0200 Subject: Add support for bulk operations using pipeline mode in libpq 14 --- odb/pgsql/binding.hxx | 18 +- odb/pgsql/database.cxx | 6 +- odb/pgsql/database.hxx | 27 ++ odb/pgsql/database.ixx | 28 ++ odb/pgsql/error.cxx | 59 +-- odb/pgsql/error.hxx | 18 +- odb/pgsql/error.ixx | 14 +- odb/pgsql/exceptions.cxx | 5 +- odb/pgsql/no-id-object-statements.hxx | 8 +- odb/pgsql/no-id-object-statements.txx | 8 +- odb/pgsql/pgsql-types.hxx | 19 +- odb/pgsql/query.hxx | 10 +- odb/pgsql/simple-object-statements.hxx | 28 +- odb/pgsql/simple-object-statements.txx | 36 +- odb/pgsql/statement.cxx | 690 ++++++++++++++++++++++++++++++--- odb/pgsql/statement.hxx | 70 +++- 16 files changed, 896 insertions(+), 148 deletions(-) diff --git a/odb/pgsql/binding.hxx b/odb/pgsql/binding.hxx index 63cf2eb..1adf144 100644 --- a/odb/pgsql/binding.hxx +++ b/odb/pgsql/binding.hxx @@ -41,17 +41,27 @@ namespace odb public: typedef pgsql::bind bind_type; - binding (): bind (0), count (0), version (0) {} + binding () + : bind (0), count (0), version (0), + batch (0), skip (0), status (0) {} binding (bind_type* b, std::size_t n) - : bind (b), count (n), version (0) - { - } + : bind (b), count (n), version (0), + batch (0), skip (0), status (0) {} + + binding (bind_type* b, std::size_t n, + std::size_t bt, std::size_t s, unsigned long long* st) + : bind (b), count (n), version (0), + batch (bt), skip (s), status (st) {} bind_type* bind; std::size_t count; std::size_t version; + std::size_t batch; + std::size_t skip; + unsigned long long* status; // Batch status array. + private: binding (const binding&); binding& operator= (const binding&); diff --git a/odb/pgsql/database.cxx b/odb/pgsql/database.cxx index b4d3732..09bf6f0 100644 --- a/odb/pgsql/database.cxx +++ b/odb/pgsql/database.cxx @@ -249,10 +249,11 @@ namespace odb // Bind parameters and results. // + char* pbuf[1] = {const_cast (name.c_str ())}; size_t psize[1] = {name.size ()}; bool pnull[1] = {false}; bind pbind[1] = {{bind::text, - const_cast (name.c_str ()), + &pbuf[0], &psize[0], psize[0], &pnull[0], @@ -302,10 +303,11 @@ namespace odb bool exists (true); if (cp == 0 && c.server_version () >= 90400) { + char* pbuf[1] = {const_cast (table)}; size_t psize[1] = {strlen (table)}; bool pnull[1] = {false}; bind pbind[1] = {{bind::text, - const_cast (table), + &pbuf[0], &psize[0], psize[0], &pnull[0], diff --git a/odb/pgsql/database.hxx b/odb/pgsql/database.hxx index d3b805d..fcb3ad3 100644 --- a/odb/pgsql/database.hxx +++ b/odb/pgsql/database.hxx @@ -124,6 +124,13 @@ namespace odb typename object_traits::id_type persist (const typename object_traits::pointer_type& obj_ptr); + // Bulk persist. Can be a range of references or pointers (including + // smart pointers) to objects. + // + template + void + persist (I begin, I end, bool continue_failed = true); + // Load an object. Throw object_not_persistent if not found. // template @@ -210,6 +217,13 @@ namespace odb void update (const typename object_traits::pointer_type& obj_ptr); + // Bulk update. Can be a range of references or pointers (including + // smart pointers) to objects. + // + template + void + update (I begin, I end, bool continue_failed = true); + // Update a section of an object. Throws the section_not_loaded // exception if the section is not loaded. Note also that this // function does not clear the changed flag if it is set. @@ -253,6 +267,19 @@ namespace odb void erase (const typename object_traits::pointer_type& obj_ptr); + // Bulk erase. + // + template + void + erase (I id_begin, I id_end, bool continue_failed = true); + + // Can be a range of references or pointers (including smart pointers) + // to objects. + // + template + void + erase (I obj_begin, I obj_end, bool continue_failed = true); + // Erase multiple objects matching a query predicate. // template diff --git a/odb/pgsql/database.ixx b/odb/pgsql/database.ixx index ecc94f9..f04c3e6 100644 --- a/odb/pgsql/database.ixx +++ b/odb/pgsql/database.ixx @@ -119,6 +119,13 @@ namespace odb return persist_ (pobj); } + template + inline void database:: + persist (I b, I e, bool cont) + { + persist_ (b, e, cont); + } + template inline typename object_traits::pointer_type database:: load (const typename object_traits::id_type& id) @@ -280,6 +287,13 @@ namespace odb update_ (pobj); } + template + inline void database:: + update (I b, I e, bool cont) + { + update_ (b, e, cont); + } + template inline void database:: update (const T& obj, const section& s) @@ -369,6 +383,20 @@ namespace odb erase_ (pobj); } + template + inline void database:: + erase (I idb, I ide, bool cont) + { + erase_id_ (idb, ide, cont); + } + + template + inline void database:: + erase (I ob, I oe, bool cont) + { + erase_object_ (ob, oe, cont); + } + template inline unsigned long long database:: erase_query () diff --git a/odb/pgsql/error.cxx b/odb/pgsql/error.cxx index 5d34fde..ba8451e 100644 --- a/odb/pgsql/error.cxx +++ b/odb/pgsql/error.cxx @@ -15,11 +15,12 @@ namespace odb namespace pgsql { void - translate_error (connection& c, PGresult* r) + translate_error (connection& c, PGresult* r, + size_t pos, multiple_exceptions* mex) { if (!r) { - if (CONNECTION_BAD == PQstatus (c.handle ())) + if (PQstatus (c.handle ()) == CONNECTION_BAD) { c.mark_failed (); throw connection_lost (); @@ -28,50 +29,58 @@ namespace odb throw bad_alloc (); } - string msg; - { - // Can be NULL in case of PGRES_BAD_RESPONSE. - // - const char* m (PQresultErrorMessage (r)); - msg = (m != 0 ? m : "bad server response"); - - // Get rid of a trailing newline if there is one. - // - string::size_type n (msg.size ()); - if (n != 0 && msg[n - 1] == '\n') - msg.resize (n - 1); - } - + // Note that we expect the caller to handle PGRES_PIPELINE_ABORTED since + // it's not really an error but rather an indication that no attempt was + // made to execute this statement. + // + string ss; switch (PQresultStatus (r)) { case PGRES_BAD_RESPONSE: { - throw database_exception (msg); + throw database_exception ("bad server response"); } case PGRES_FATAL_ERROR: { - string ss; - { - const char* s (PQresultErrorField (r, PG_DIAG_SQLSTATE)); - ss = (s != 0 ? s : "?????"); - } + const char* s (PQresultErrorField (r, PG_DIAG_SQLSTATE)); + ss = (s != 0 ? s : "?????"); // Deadlock detected. // if (ss == "40001" || ss == "40P01") throw deadlock (); - else if (CONNECTION_BAD == PQstatus (c.handle ())) + else if (PQstatus (c.handle ()) == CONNECTION_BAD) { c.mark_failed (); throw connection_lost (); } - else - throw database_exception (ss, msg); + break; } default: assert (false); break; } + + string msg; + { + // Can be NULL in case of PGRES_BAD_RESPONSE. + // + const char* m (PQresultErrorMessage (r)); + msg = (m != 0 ? m : "bad server response"); + + // Get rid of the trailing newline if there is one. + // + string::size_type n (msg.size ()); + if (n != 0 && msg[n - 1] == '\n') + msg.resize (n - 1); + } + + if (mex == 0) + throw database_exception (ss, msg); + else + // In PosgreSQL all errors are fatal. + // + mex->insert (pos, database_exception (ss, msg), true); } } } diff --git a/odb/pgsql/error.hxx b/odb/pgsql/error.hxx index 36ecc44..8d2793d 100644 --- a/odb/pgsql/error.hxx +++ b/odb/pgsql/error.hxx @@ -9,23 +9,25 @@ #include #include -#include // connection +#include // connection, multiple_exceptions #include namespace odb { namespace pgsql { - // Translate an error condition involving a PGresult*. If r is null, it is - // assumed that the error was caused due to a bad connection or a memory - // allocation error. + // Translate an error condition involving PGresult* and throw (or return, + // in case multiple_exceptions is not NULL) an appropriate exception. If + // result is NULL, it is assumed that the error was caused due to a bad + // connection or a memory allocation error. // LIBODB_PGSQL_EXPORT void - translate_error (connection& c, PGresult* r); + translate_error (connection& c, PGresult* r, + std::size_t pos = 0, multiple_exceptions* = 0); - // Return true if the PGresult is in an error state. If both s and r are - // non-null, the pointed to value will be populated with the result status. - // Otherwise, s is ignored. + // Return true if PGresult is not NULL and is not in an error state. If + // both s and r are non-NULL, the pointed to value will be populated with + // the result status. Otherwise, s is ignored. // LIBODB_PGSQL_EXPORT bool is_good_result (PGresult* r, ExecStatusType* s = 0); diff --git a/odb/pgsql/error.ixx b/odb/pgsql/error.ixx index 0cda31b..6a010aa 100644 --- a/odb/pgsql/error.ixx +++ b/odb/pgsql/error.ixx @@ -5,8 +5,8 @@ namespace odb { namespace pgsql { - bool - inline is_good_result (PGresult* r, ExecStatusType* s) + inline bool + is_good_result (PGresult* r, ExecStatusType* s) { if (r != 0) { @@ -16,9 +16,13 @@ namespace odb *s = status; return - status != PGRES_BAD_RESPONSE && - status != PGRES_NONFATAL_ERROR && - status != PGRES_FATAL_ERROR; + status != PGRES_BAD_RESPONSE + && status != PGRES_NONFATAL_ERROR + && status != PGRES_FATAL_ERROR +#ifdef LIBPQ_HAS_PIPELINING + && status != PGRES_PIPELINE_ABORTED +#endif + ; } return false; diff --git a/odb/pgsql/exceptions.cxx b/odb/pgsql/exceptions.cxx index 01d9ab7..28e7fc4 100644 --- a/odb/pgsql/exceptions.cxx +++ b/odb/pgsql/exceptions.cxx @@ -26,7 +26,10 @@ namespace odb const string& message) : sqlstate_ (sqlstate), message_ (message) { - what_ = sqlstate_ + ": " + message_; + if (!sqlstate_.empty ()) + what_ = sqlstate_ + ": " + message_; + else + what_ = message_; } database_exception:: diff --git a/odb/pgsql/no-id-object-statements.hxx b/odb/pgsql/no-id-object-statements.hxx index 6e6b53f..baa1b2a 100644 --- a/odb/pgsql/no-id-object-statements.hxx +++ b/odb/pgsql/no-id-object-statements.hxx @@ -48,7 +48,10 @@ namespace odb // Object image. // image_type& - image () {return image_;} + image (std::size_t i = 0) + { + return image_[i]; + } // Insert binding. // @@ -112,7 +115,8 @@ namespace odb no_id_object_statements& operator= (const no_id_object_statements&); private: - image_type image_; + image_type image_[object_traits::batch]; + unsigned long long status_[object_traits::batch]; // Select binding. // diff --git a/odb/pgsql/no-id-object-statements.txx b/odb/pgsql/no-id-object-statements.txx index ced26ee..0c340ab 100644 --- a/odb/pgsql/no-id-object-statements.txx +++ b/odb/pgsql/no-id-object-statements.txx @@ -24,13 +24,17 @@ namespace odb // select select_image_binding_ (select_image_bind_, select_column_count), // insert - insert_image_binding_ (insert_image_bind_, insert_column_count), + insert_image_binding_ (insert_image_bind_, + insert_column_count, + object_traits::batch, + sizeof (image_type), + status_), insert_image_native_binding_ (insert_image_values_, insert_image_lengths_, insert_image_formats_, insert_column_count) { - image_.version = 0; + image_[0].version = 0; // Only version in the first element used. select_image_version_ = 0; insert_image_version_ = 0; diff --git a/odb/pgsql/pgsql-types.hxx b/odb/pgsql/pgsql-types.hxx index 93e4870..117a41e 100644 --- a/odb/pgsql/pgsql-types.hxx +++ b/odb/pgsql/pgsql-types.hxx @@ -14,8 +14,13 @@ namespace odb { namespace pgsql { - // The libpq result binding. This data structures is - // modelled after MYSQL_BIND from MySQL. + // The libpq result binding. This data structures is roughly modeled + // after MYSQL_BIND from MySQL. + // + // Types that may need to grow are bound as pointers to pointers to char + // array (normally in details::buffer) in order to allow simple offsetting + // in bulk operation support. Note that if we were to do the same for + // capacity, we could get rid of the buffer growth tracking altogether. // struct bind { @@ -27,14 +32,14 @@ namespace odb bigint, // Buffer is long long; size, capacity, truncated are unused. real, // Buffer is float; size, capacity, truncated are unused. double_, // Buffer is double; size, capacity, truncated are unused. - numeric, // Buffer is a char array. + numeric, // Buffer is a pointer to pointer to char array. date, // Buffer is int; size, capacity, truncated are unused. time, // Buffer is long long; size, capacity, truncated are unused. timestamp,// Buffer is long long; size, capacity, truncated are unused. - text, // Buffer is a char array. - bytea, // Buffer is a char array. - bit, // Buffer is a char array. - varbit, // Buffer is a char array. + text, // Buffer is a pointer to pointer to char array. + bytea, // Buffer is a pointer to pointer to char array. + bit, // Buffer is a pointer to char array. + varbit, // Buffer is a pointer to pointer to char array. uuid // Buffer is a 16-byte char array; size capacity, truncated // are unused. Note: big-endian, in RFC 4122/4.1.2 order. }; diff --git a/odb/pgsql/query.hxx b/odb/pgsql/query.hxx index b940261..42182d6 100644 --- a/odb/pgsql/query.hxx +++ b/odb/pgsql/query.hxx @@ -1671,7 +1671,7 @@ namespace odb bind (bind_type* b) { b->type = bind::numeric; - b->buffer = buffer_.data (); + b->buffer = buffer_.data_ptr (); b->capacity = buffer_.capacity (); b->size = &size_; } @@ -1836,7 +1836,7 @@ namespace odb bind (bind_type* b) { b->type = bind::text; - b->buffer = buffer_.data (); + b->buffer = buffer_.data_ptr (); b->capacity = buffer_.capacity (); b->size = &size_; } @@ -1881,7 +1881,7 @@ namespace odb bind (bind_type* b) { b->type = bind::bytea; - b->buffer = buffer_.data (); + b->buffer = buffer_.data_ptr (); b->capacity = buffer_.capacity (); b->size = &size_; } @@ -1926,7 +1926,7 @@ namespace odb bind (bind_type* b) { b->type = bind::bit; - b->buffer = buffer_.data (); + b->buffer = buffer_.data_ptr (); b->capacity = buffer_.capacity (); b->size = &size_; } @@ -1970,7 +1970,7 @@ namespace odb bind (bind_type* b) { b->type = bind::varbit; - b->buffer = buffer_.data (); + b->buffer = buffer_.data_ptr (); b->capacity = buffer_.capacity (); b->size = &size_; } diff --git a/odb/pgsql/simple-object-statements.hxx b/odb/pgsql/simple-object-statements.hxx index 64acbe9..086ef5f 100644 --- a/odb/pgsql/simple-object-statements.hxx +++ b/odb/pgsql/simple-object-statements.hxx @@ -167,7 +167,8 @@ namespace odb typedef T object_type; typedef object_traits_impl object_traits; - optimistic_data (bind*, char** nv, int* nl, int* nf); + optimistic_data (bind*, char** nv, int* nl, int* nf, + std::size_t skip, unsigned long long* status); binding* id_image_binding () {return &id_image_binding_;} @@ -190,7 +191,8 @@ namespace odb template struct optimistic_data { - optimistic_data (bind*, char**, int*, int*) {} + optimistic_data (bind*, char**, int*, int*, + std::size_t, unsigned long long*) {} binding* id_image_binding () {return 0;} @@ -301,7 +303,7 @@ namespace odb // Object image. // image_type& - image () {return image_;} + image (std::size_t i = 0) {return images_[i].obj;} // Insert binding. // @@ -348,7 +350,7 @@ namespace odb // Object id image and binding. // id_image_type& - id_image () {return id_image_;} + id_image (std::size_t i = 0) {return images_[i].id;} std::size_t id_image_version () const {return id_image_version_;} @@ -471,8 +473,8 @@ namespace odb { return extra_statement_cache_.get ( conn_, - image_, - id_image_, + images_[0].obj, + images_[0].id, id_image_binding_, od_.id_image_binding (), id_image_native_binding_, @@ -527,7 +529,18 @@ namespace odb image_type, id_image_type> extra_statement_cache_; - image_type image_; + // The UPDATE statement uses both the object and id image. Keep them + // next to each other so that the same skip distance can be used in + // batch binding. + // + struct images + { + image_type obj; + id_image_type id; + }; + + images images_[object_traits::batch]; + unsigned long long status_[object_traits::batch]; // Select binding. // @@ -574,7 +587,6 @@ namespace odb // Id image binding (only used as a parameter). Uses the suffix in // the update bind. // - id_image_type id_image_; std::size_t id_image_version_; binding id_image_binding_; native_binding id_image_native_binding_; diff --git a/odb/pgsql/simple-object-statements.txx b/odb/pgsql/simple-object-statements.txx index ad87e73..bb47b43 100644 --- a/odb/pgsql/simple-object-statements.txx +++ b/odb/pgsql/simple-object-statements.txx @@ -19,11 +19,15 @@ namespace odb template optimistic_data:: - optimistic_data (bind* b, char** nv, int* nl, int* nf) + optimistic_data (bind* b, char** nv, int* nl, int* nf, + std::size_t skip, unsigned long long* status) : id_image_binding_ ( b, object_traits::id_column_count + - object_traits::managed_optimistic_column_count), + object_traits::managed_optimistic_column_count, + object_traits::batch, + skip, + status), id_image_native_binding_ ( nv, nl, nf, object_traits::id_column_count + @@ -48,7 +52,11 @@ namespace odb // select select_image_binding_ (select_image_bind_, select_column_count), // insert - insert_image_binding_ (insert_image_bind_, insert_column_count), + insert_image_binding_ (insert_image_bind_, + insert_column_count, + object_traits::batch, + sizeof (images), + status_), insert_image_native_binding_ (insert_image_values_, insert_image_lengths_, insert_image_formats_, @@ -56,7 +64,10 @@ namespace odb // update update_image_binding_ (update_image_bind_, update_column_count + id_column_count + - managed_optimistic_column_count), + managed_optimistic_column_count, + object_traits::batch, + sizeof (images), + status_), update_image_native_binding_ (update_image_values_, update_image_lengths_, update_image_formats_, @@ -64,7 +75,10 @@ namespace odb managed_optimistic_column_count), // id id_image_binding_ (update_image_bind_ + update_column_count, - id_column_count), + id_column_count, + object_traits::batch, + sizeof (images), + status_), id_image_native_binding_ ( update_image_values_ + update_column_count, update_image_lengths_ + update_column_count, @@ -74,15 +88,19 @@ namespace odb od_ (update_image_bind_ + update_column_count, update_image_values_ + update_column_count, update_image_lengths_ + update_column_count, - update_image_formats_ + update_column_count) + update_image_formats_ + update_column_count, + sizeof (images), + status_) { - image_.version = 0; + // Only versions in the first element used. + // + images_[0].obj.version = 0; + images_[0].id.version = 0; + select_image_version_ = 0; insert_image_version_ = 0; update_image_version_ = 0; update_id_image_version_ = 0; - - id_image_.version = 0; id_image_version_ = 0; std::memset (insert_image_bind_, 0, sizeof (insert_image_bind_)); diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx index 977db72..c88e621 100644 --- a/odb/pgsql/statement.cxx +++ b/odb/pgsql/statement.cxx @@ -1,16 +1,32 @@ // file : odb/pgsql/statement.cxx // license : GNU GPL v2; see accompanying LICENSE file -#include // std::atol -#include -#include // istringstream +#include // ODB_CXX11 #include +#ifdef LIBPQ_HAS_PIPELINING +# ifndef _WIN32 +# include +# include +# endif +#endif + +#include // strcmp +#include // pair +#include + +#ifdef ODB_CXX11 +# include // strtoull +#else +# include // istringstream +#endif + #include #include #include +#include #include #include #include @@ -36,11 +52,12 @@ namespace odb count = static_cast (s[0] - '0'); else { - // @@ Using stringstream conversion for now. See if we can optimize - // this (atoll possibly, even though it is not standard). - // +#ifdef ODB_CXX11 + count = strtoull (s, 0, 10); +#else istringstream ss (s); ss >> count; +#endif } return count; @@ -256,30 +273,39 @@ namespace odb return text_; } + template + static inline T* + offset (T* base, size_t count, size_t size) + { + return reinterpret_cast ( + reinterpret_cast (base) + count * size); + } + void statement:: - bind_param (native_binding& n, const binding& b) + bind_param (native_binding& ns, const binding& bs, size_t pos) { - assert (n.count == b.count); + assert (ns.count == bs.count); - for (size_t i (0); i < n.count; ++i) + for (size_t i (0); i < ns.count; ++i) { - const bind& current_bind (b.bind[i]); + const bind& b (bs.bind[i]); - n.formats[i] = 1; + ns.formats[i] = 1; - if (current_bind.buffer == 0 || // Skip NULL entries. - (current_bind.is_null != 0 && *current_bind.is_null)) + bool* n (b.is_null != 0 ? offset (b.is_null, pos, bs.skip) : 0); + + if ((n != 0 && *n) || b.buffer == 0) // Handle NULL entries. { - n.values[i] = 0; - n.lengths[i] = 0; + ns.values[i] = 0; + ns.lengths[i] = 0; continue; } - n.values[i] = static_cast (current_bind.buffer); + ns.values[i] = static_cast (offset (b.buffer, pos, bs.skip)); size_t l (0); - switch (current_bind.type) + switch (b.type) { case bind::boolean_: { @@ -325,10 +351,18 @@ namespace odb case bind::numeric: case bind::text: case bind::bytea: - case bind::bit: case bind::varbit: { - l = *current_bind.size; + // In this case b.buffer is a pointer to pointer to buffer so we + // need to chase one level. + // + ns.values[i] = static_cast ( + *reinterpret_cast (ns.values[i])); + } + // Fall through. + case bind::bit: + { + l = *offset (b.size, pos, bs.skip); break; } case bind::uuid: @@ -340,54 +374,61 @@ namespace odb } } - n.lengths[i] = static_cast (l); + ns.lengths[i] = static_cast (l); } } bool statement:: - bind_result (bind* p, - size_t count, + bind_result (const binding& bs, PGresult* result, size_t row, - bool truncated) + bool truncated, + size_t pos) { bool r (true); int col_count (PQnfields (result)); int col (0); - for (size_t i (0); i != count && col != col_count; ++i) + for (size_t i (0); i != bs.count && col != col_count; ++i) { - const bind& b (p[i]); + const bind& b (bs.bind[i]); if (b.buffer == 0) // Skip NULL entries. continue; int c (col++); - if (truncated && (b.truncated == 0 || !*b.truncated)) - continue; + { + bool* t (b.truncated != 0 ? offset (b.truncated, pos, bs.skip) : 0); - if (b.truncated != 0) - *b.truncated = false; + if (truncated && (t == 0 || !*t)) + continue; + + if (t != 0) + *t = false; + } // Check for NULL unless we are reloading a truncated result. // if (!truncated) { - *b.is_null = PQgetisnull (result, static_cast (row), c) == 1; + bool* n (offset (b.is_null, pos, bs.skip)); + + *n = PQgetisnull (result, static_cast (row), c) == 1; - if (*b.is_null) + if (*n) continue; } + void* buf (offset (b.buffer, pos, bs.skip)); + const char* v (PQgetvalue (result, static_cast (row), c)); switch (b.type) { case bind::boolean_: { - *static_cast (b.buffer) = - *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::smallint: @@ -430,19 +471,19 @@ namespace odb { case bind::smallint: { - *static_cast (b.buffer) = + *static_cast (buf) = endian_traits::hton (static_cast (i)); break; } case bind::integer: { - *static_cast (b.buffer) = + *static_cast (buf) = endian_traits::hton (static_cast (i)); break; } case bind::bigint: { - *static_cast (b.buffer) = endian_traits::hton (i); + *static_cast (buf) = endian_traits::hton (i); break; } default: @@ -453,25 +494,23 @@ namespace odb } case bind::real: { - *static_cast (b.buffer) = - *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::double_: { - *static_cast (b.buffer) = - *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::date: { - *static_cast (b.buffer) = *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::time: case bind::timestamp: { - *static_cast (b.buffer) = + *static_cast (buf) = *reinterpret_cast (v); break; } @@ -481,24 +520,37 @@ namespace odb case bind::bit: case bind::varbit: { + // Currently this is neither supported (due to capacity) nor used + // in batches. + // +#ifdef LIBPGSQL_EXTRA_CHECKS + assert (pos == 0); +#endif + *b.size = static_cast ( PQgetlength (result, static_cast (row), c)); - if (b.capacity < *b.size) - { - if (b.truncated) - *b.truncated = true; + if (b.capacity < *b.size) + { + if (b.truncated) + *b.truncated = true; - r = false; - continue; - } + r = false; + continue; + } + + // In these cases b.buffer is a pointer to pointer to buffer so we + // need to chase one level. + // + if (b.type != bind::bit) + buf = *static_cast (buf); - memcpy (b.buffer, v, *b.size); - break; + memcpy (buf, v, *b.size); + break; } case bind::uuid: { - memcpy (b.buffer, v, 16); + memcpy (buf, v, 16); break; } } @@ -514,6 +566,378 @@ namespace odb return r; } +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + // Note that this function always marks the connection as failed. + // + static void + translate_connection_error (connection& conn) + { + const char* m (PQerrorMessage (conn.handle ())); + + if (PQstatus (conn.handle ()) == CONNECTION_BAD) + { + conn.mark_failed (); + throw connection_lost (); + } + else + { + conn.mark_failed (); + throw database_exception (m != 0 ? m : "bad connection state"); + } + } + + // A RAII object for PGconn's non-blocking pipeline mode. + // + struct pipeline + { + connection& conn; + int sock; + + explicit + pipeline (connection& c) + : conn (c) + { + PGconn* ch (conn.handle ()); + + if ((sock = PQsocket (ch)) == -1 || + PQsetnonblocking (ch, 1) == -1 || + PQenterPipelineMode (ch) == 0) + { + translate_connection_error (conn); + } + } + + void + close (bool throw_ = true) + { + if (!conn.failed ()) + { + PGconn* ch (conn.handle ()); + + if (PQexitPipelineMode (ch) == 0 || + PQsetnonblocking (ch, 0) == -1) + { + if (throw_) + translate_connection_error (conn); + else + conn.mark_failed (); + } + } + } + + ~pipeline () + { + close (false); + } + + pair + wait (bool write, bool throw_ = true) + { + fd_set wds; + fd_set rds; + + for (;;) + { + if (write) + { + FD_ZERO (&wds); + FD_SET (sock, &wds); + } + + FD_ZERO (&rds); + FD_SET (sock, &rds); + + if (select (sock + 1, &rds, write ? &wds : 0, 0, 0) != -1) + break; + + if (errno != EINTR) + { + if (throw_) + translate_connection_error (conn); + else + { + conn.mark_failed (); + return pair (false, false); + } + } + } + + return pair (FD_ISSET (sock, &rds), + write && FD_ISSET (sock, &wds)); + } + }; + + // A RAII object for recovering from an error in a pipeline. + // + // Specifically, it reads and discards results until reaching + // PGRES_PIPELINE_SYNC. + // + struct pipeline_recovery + { + pipeline_recovery (pipeline& pl, bool wdone, bool sync) + : pl_ (&pl), wdone_ (wdone), sync_ (sync) + { + } + + ~pipeline_recovery () + { + if (pl_ != 0 && !pl_->conn.failed ()) + { + PGconn* ch (pl_->conn.handle ()); + + // This code runs as part of stack unwinding caused by an exception + // so if we encounter an error, we "upgrade" the existing exception + // by marking the connection as failed. + // + // The rest is essentially a special version of execute() below. + // + // Note that on the first iteration we may still have results from + // the previous call to PQconsumeInput() (and these results may + // be the entire outstanding sequence, in which case calling wait() + // will block indefinitely). + // + for (bool first (true);; first = false) + { + if (sync_) + { + assert (!wdone_); + + if (PQpipelineSync (ch) == 0) + break; + + sync_ = false; + } + + pair r (false, false); + + if (!first) + { + r = pl_->wait (!wdone_); + if (!r.first && !r.second) + break; + } + + if (r.first /* read */ || first) + { + if (r.first && PQconsumeInput (ch) == 0) + break; + + while (PQisBusy (ch) == 0) + { + auto_handle res (PQgetResult (ch)); + + // We should only get NULLs as well as PGRES_PIPELINE_ABORTED + // finished with PGRES_PIPELINE_SYNC. + // + if (res != 0) + { + ExecStatusType stat (PQresultStatus (res)); + + if (stat == PGRES_PIPELINE_SYNC) + return; + + assert (stat == PGRES_PIPELINE_ABORTED); + } + } + } + + if (r.second /* write */) + { + int r (PQflush (ch)); + if (r == -1) + break; + + if (r == 0) + wdone_ = true; + } + } + + pl_->conn.mark_failed (); + } + } + + void + cancel () + { + pl_ = 0; + } + + private: + pipeline* pl_; + bool wdone_; + bool sync_; + }; + + size_t statement:: + execute (const binding& param, + native_binding& native_param, + size_t n, + multiple_exceptions& mex, + bool (*process) (size_t, PGresult*, bool, void*), + void* data) + { + size_t i (0); // Parameter set being attempted. + mex.current (i); + + PGconn* ch (conn_.handle ()); + + pipeline pl (conn_); + + // True if we've written and read everything, respectively. + // + bool wdone (false), rdone (false); + + for (size_t wn (0), rn (0); !rdone; ) + { + // Note that there is a special version of this code above in + // ~pipeline_recovery(). + // + pair r (pl.wait (!wdone)); + + // Note that once we start the pipeline, any call that may throw + // without marking the connection as failed should be guarded by + // pipeline_recovery. + + // Try to minimize the chance of blocking the server by first + // processing the result and then sending more queries. + // + if (r.first /* read */) + { + if (PQconsumeInput (ch) == 0) + translate_connection_error (conn_); + + while (PQisBusy (ch) == 0) + { + auto_handle res (PQgetResult (ch)); + + ExecStatusType stat (PGRES_FATAL_ERROR); + bool gr (is_good_result (res, &stat)); + + if (stat == PGRES_PIPELINE_SYNC) + { + assert (wdone && rn == n); + rdone = true; + break; + } + + assert (rn != n); + ++rn; + + if (stat != PGRES_PIPELINE_ABORTED) + { + // translate_error() may throw an exception (e.g., deadlock) + // without marking the connection as failed. + // + { + pipeline_recovery plr (pl, wdone, wn != n); + + if (!process (i, res, gr, data)) + translate_error (conn_, res, i, &mex); + + plr.cancel (); + } + + mex.attempted (++i); + mex.current (i); + } + else + { + // Should we treat PGRES_PIPELINE_ABORTED entries as attempted + // or not? While we did issue PQsendQueryPrepared() for them, + // the server tells us that it did not attemp to execute them. + // So it feels like they should not be treated as attempted. + // + // Note that for this to fit into out multiple_exceptions model, + // such an incomplete batch should be fatal (otherwise we could + // end up with unattempted "holes"). This is currently the case + // for errors handled by translate_error() but not necessarily + // the case for those handled by the process function (e.g., + // duplicate id handled by process_insert_result() below). So in + // a somewhat hackish way we assume the error (e.g., duplicate + // id) will always be translated to an exception and pre-mark + // multiple_exceptions as fatal. + // + mex.fatal (true); + } + + // We get a NULL result after each query result. + // + { + PGresult* end (PQgetResult (ch)); + assert (end == 0); + } + } + } + + if (r.second /* write */) + { + // Send queries until we get blocked (write-biased). This feels like + // a better overall strategy to keep the server busy compared to + // sending one query at a time and then re-checking if there is + // anything to read because the results of INSERT/UPDATE/DELETE are + // presumably small and quite a few of them can get buffered before + // the server gets blocked. + // + for (;;) + { + if (wn != n) + { + bind_param (native_param, param, wn); + + if (PQsendQueryPrepared (ch, + name_, + static_cast (native_param.count), + native_param.values, + native_param.lengths, + native_param.formats, + 1) == 0) + translate_connection_error (conn_); + + if (++wn == n) + { + if (PQpipelineSync (ch) == 0) + translate_connection_error (conn_); + } + } + + // PQflush() result: + // + // 0 -- success (queue is now empty) + // 1 -- blocked + // -1 -- error + // + int r (PQflush (ch)); + if (r == -1) + translate_connection_error (conn_); + + if (r == 0) + { + if (wn != n) + { + // If we continue here, then we are write-biased. And if we + // break, then we are read-biased. + // +#ifdef LIBPGSQL_READ_BIASED + break; +#else + continue; +#endif + } + + wdone = true; + } + + break; // Blocked or done. + } + } + } + + pl.close (); + return i; + } +#endif + // // select_statement // @@ -689,10 +1113,7 @@ namespace odb return no_data; assert (current_row_ > 0); - return bind_result (result_.bind, - result_.count, - handle_, - current_row_ - 1) + return bind_result (result_, handle_, current_row_ - 1) ? success : truncated; } @@ -703,11 +1124,7 @@ namespace odb assert (current_row_ > 0); assert (current_row_ <= row_count_); - if (!bind_result (result_.bind, - result_.count, - handle_, - current_row_ - 1, - true)) + if (!bind_result (result_, handle_, current_row_ - 1, true)) assert (false); } @@ -738,6 +1155,8 @@ namespace odb native_param_ (native_param), returning_ (returning) { + if (returning_ != 0) + assert (returning_->count == 1); } insert_statement:: @@ -759,6 +1178,8 @@ namespace odb native_param_ (native_param), returning_ (returning) { + if (returning_ != 0) + assert (returning_->count == 1); } bool insert_statement:: @@ -792,9 +1213,9 @@ namespace odb // if (returning_ == 0 && stat == PGRES_FATAL_ERROR) { - string s (PQresultErrorField (h, PG_DIAG_SQLSTATE)); + const char* ss (PQresultErrorField (h, PG_DIAG_SQLSTATE)); - if (s == "23505") + if (ss != 0 && strcmp (ss, "23505") == 0) return false; } @@ -802,11 +1223,76 @@ namespace odb } if (returning_ != 0) - bind_result (returning_->bind, 1, h, 0, false); + bind_result (*returning_, h, 0); + + return true; + } + +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + struct insert_data + { + binding& param; + binding* returning; + }; + + static bool + process_insert_result (size_t i, PGresult* r, bool gr, void* data) + { + insert_data& d (*static_cast (data)); + + unsigned long long& s (d.param.status[i]); + s = 1; + + if (gr) + { + // Note that the result can never be truncated. + // + if (d.returning != 0) + statement::bind_result (*d.returning, r, 0, false, i); + } + else + { + // An auto-assigned object id should never cause a duplicate + // primary key. + // + if (d.returning == 0 && + r != 0 && PQresultStatus (r) == PGRES_FATAL_ERROR) + { + // Note that statement::execute() assumes that this will eventually + // be translated to an entry in multiple_exceptions. + // + const char* ss (PQresultErrorField (r, PG_DIAG_SQLSTATE)); + + if (ss != 0 && strcmp (ss, "23505") == 0) + s = 0; + } + + if (s == 1) + return false; + } return true; } + size_t insert_statement:: + execute (size_t n, multiple_exceptions& mex) + { + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + + insert_data d {param_, returning_}; + + return statement::execute ( + param_, native_param_, n, mex, &process_insert_result, &d); + } +#endif + // // update_statement // @@ -881,6 +1367,43 @@ namespace odb return affected_row_count (h); } +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + static bool + process_update_result (size_t i, PGresult* r, bool gr, void* data) + { + binding& param (*static_cast (data)); + + unsigned long long& s (param.status[i]); + + if (gr) + { + s = affected_row_count (r); + return true; + } + else + { + s = update_statement::result_unknown; + return false; + } + } + + size_t update_statement:: + execute (size_t n, multiple_exceptions& mex) + { + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + + return statement::execute ( + param_, native_param_, n, mex, &process_update_result, ¶m_); + } +#endif + // // delete_statement // @@ -969,5 +1492,44 @@ namespace odb return affected_row_count (h); } + +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + static bool + process_delete_result (size_t i, PGresult* r, bool gr, void* data) + { + binding& param (*static_cast (data)); + + unsigned long long& s (param.status[i]); + + if (gr) + { + s = affected_row_count (r); + return true; + } + else + { + s = delete_statement::result_unknown; + return false; + } + } + + size_t delete_statement:: + execute (size_t n, multiple_exceptions& mex) + { + assert (param_ != 0); + + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + + return statement::execute ( + *param_, native_param_, n, mex, &process_delete_result, param_); + } +#endif } } diff --git a/odb/pgsql/statement.hxx b/odb/pgsql/statement.hxx index b417f1c..139d2d6 100644 --- a/odb/pgsql/statement.hxx +++ b/odb/pgsql/statement.hxx @@ -63,22 +63,24 @@ namespace odb void deallocate (); - // Adapt an ODB binding to a native PostgreSQL parameter binding. + // Adapt an ODB binding to a native PostgreSQL parameter binding. If pos + // is not 0, then bind the parameter set at this position in a batch. // static void - bind_param (native_binding&, const binding&); + bind_param (native_binding&, const binding&, std::size_t pos = 0); // Populate an ODB binding given a PostgreSQL result. If the truncated // argument is true, then only truncated columns are extracted. Return // true if all the data was extracted successfully and false if one or - // more columns were truncated. + // more columns were truncated. If pos is not 0, then populate the + // parameter set at this position in a batch. // static bool - bind_result (bind*, - std::size_t count, + bind_result (const binding&, PGresult*, std::size_t row, - bool truncated = false); + bool truncated = false, + std::size_t pos = 0); protected: // We keep two versions to take advantage of std::string COW. @@ -102,6 +104,16 @@ namespace odb const Oid* types, std::size_t types_count); + // Bulk execute implementation. + // + std::size_t + execute (const binding& param, + native_binding& native_param, + std::size_t n, + multiple_exceptions&, + bool (*process) (size_t i, PGresult*, bool good, void* data), + void* data); + private: void init (statement_kind, @@ -296,6 +308,20 @@ namespace odb bool execute (); + // Return the number of parameter sets (out of n) that were attempted. + // + std::size_t + execute (std::size_t n, multiple_exceptions&); + + // Return true if successful and false if this row is a duplicate. + // All other errors are reported via exceptions. + // + bool + result (std::size_t i) + { + return param_.status[i] != 0; + } + private: insert_statement (const insert_statement&); insert_statement& operator= (const insert_statement&); @@ -334,6 +360,22 @@ namespace odb unsigned long long execute (); + // Return the number of parameter sets (out of n) that were attempted. + // + std::size_t + execute (std::size_t n, multiple_exceptions&); + + // Return the number of rows affected (updated) by the parameter + // set. All errors are reported by throwing exceptions. + // + static const unsigned long long result_unknown = ~0ULL; + + unsigned long long + result (std::size_t i) + { + return param_.status[i]; + } + private: update_statement (const update_statement&); update_statement& operator= (const update_statement&); @@ -376,6 +418,22 @@ namespace odb unsigned long long execute (); + // Return the number of parameter sets (out of n) that were attempted. + // + std::size_t + execute (std::size_t n, multiple_exceptions&); + + // Return the number of rows affected (deleted) by the parameter + // set. All errors are reported by throwing exceptions. + // + static const unsigned long long result_unknown = ~0ULL; + + unsigned long long + result (std::size_t i) + { + return param_->status[i]; + } + private: delete_statement (const delete_statement&); delete_statement& operator= (const delete_statement&); -- cgit v1.1