From 7b51842728b6ee99945afe401fca317c703a12d9 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 16 Jun 2021 11:28:56 +0200 Subject: Add support for bulk operations using pipeline mode in libpq 14 --- odb/pgsql/statement.cxx | 690 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 626 insertions(+), 64 deletions(-) (limited to 'odb/pgsql/statement.cxx') diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx index 977db72..c88e621 100644 --- a/odb/pgsql/statement.cxx +++ b/odb/pgsql/statement.cxx @@ -1,16 +1,32 @@ // file : odb/pgsql/statement.cxx // license : GNU GPL v2; see accompanying LICENSE file -#include // std::atol -#include -#include // istringstream +#include // ODB_CXX11 #include +#ifdef LIBPQ_HAS_PIPELINING +# ifndef _WIN32 +# include +# include +# endif +#endif + +#include // strcmp +#include // pair +#include + +#ifdef ODB_CXX11 +# include // strtoull +#else +# include // istringstream +#endif + #include #include #include +#include #include #include #include @@ -36,11 +52,12 @@ namespace odb count = static_cast (s[0] - '0'); else { - // @@ Using stringstream conversion for now. See if we can optimize - // this (atoll possibly, even though it is not standard). - // +#ifdef ODB_CXX11 + count = strtoull (s, 0, 10); +#else istringstream ss (s); ss >> count; +#endif } return count; @@ -256,30 +273,39 @@ namespace odb return text_; } + template + static inline T* + offset (T* base, size_t count, size_t size) + { + return reinterpret_cast ( + reinterpret_cast (base) + count * size); + } + void statement:: - bind_param (native_binding& n, const binding& b) + bind_param (native_binding& ns, const binding& bs, size_t pos) { - assert (n.count == b.count); + assert (ns.count == bs.count); - for (size_t i (0); i < n.count; ++i) + for (size_t i (0); i < ns.count; ++i) { - const bind& current_bind (b.bind[i]); + const bind& b (bs.bind[i]); - n.formats[i] = 1; + ns.formats[i] = 1; - if (current_bind.buffer == 0 || // Skip NULL entries. - (current_bind.is_null != 0 && *current_bind.is_null)) + bool* n (b.is_null != 0 ? offset (b.is_null, pos, bs.skip) : 0); + + if ((n != 0 && *n) || b.buffer == 0) // Handle NULL entries. { - n.values[i] = 0; - n.lengths[i] = 0; + ns.values[i] = 0; + ns.lengths[i] = 0; continue; } - n.values[i] = static_cast (current_bind.buffer); + ns.values[i] = static_cast (offset (b.buffer, pos, bs.skip)); size_t l (0); - switch (current_bind.type) + switch (b.type) { case bind::boolean_: { @@ -325,10 +351,18 @@ namespace odb case bind::numeric: case bind::text: case bind::bytea: - case bind::bit: case bind::varbit: { - l = *current_bind.size; + // In this case b.buffer is a pointer to pointer to buffer so we + // need to chase one level. + // + ns.values[i] = static_cast ( + *reinterpret_cast (ns.values[i])); + } + // Fall through. + case bind::bit: + { + l = *offset (b.size, pos, bs.skip); break; } case bind::uuid: @@ -340,54 +374,61 @@ namespace odb } } - n.lengths[i] = static_cast (l); + ns.lengths[i] = static_cast (l); } } bool statement:: - bind_result (bind* p, - size_t count, + bind_result (const binding& bs, PGresult* result, size_t row, - bool truncated) + bool truncated, + size_t pos) { bool r (true); int col_count (PQnfields (result)); int col (0); - for (size_t i (0); i != count && col != col_count; ++i) + for (size_t i (0); i != bs.count && col != col_count; ++i) { - const bind& b (p[i]); + const bind& b (bs.bind[i]); if (b.buffer == 0) // Skip NULL entries. continue; int c (col++); - if (truncated && (b.truncated == 0 || !*b.truncated)) - continue; + { + bool* t (b.truncated != 0 ? offset (b.truncated, pos, bs.skip) : 0); - if (b.truncated != 0) - *b.truncated = false; + if (truncated && (t == 0 || !*t)) + continue; + + if (t != 0) + *t = false; + } // Check for NULL unless we are reloading a truncated result. // if (!truncated) { - *b.is_null = PQgetisnull (result, static_cast (row), c) == 1; + bool* n (offset (b.is_null, pos, bs.skip)); + + *n = PQgetisnull (result, static_cast (row), c) == 1; - if (*b.is_null) + if (*n) continue; } + void* buf (offset (b.buffer, pos, bs.skip)); + const char* v (PQgetvalue (result, static_cast (row), c)); switch (b.type) { case bind::boolean_: { - *static_cast (b.buffer) = - *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::smallint: @@ -430,19 +471,19 @@ namespace odb { case bind::smallint: { - *static_cast (b.buffer) = + *static_cast (buf) = endian_traits::hton (static_cast (i)); break; } case bind::integer: { - *static_cast (b.buffer) = + *static_cast (buf) = endian_traits::hton (static_cast (i)); break; } case bind::bigint: { - *static_cast (b.buffer) = endian_traits::hton (i); + *static_cast (buf) = endian_traits::hton (i); break; } default: @@ -453,25 +494,23 @@ namespace odb } case bind::real: { - *static_cast (b.buffer) = - *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::double_: { - *static_cast (b.buffer) = - *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::date: { - *static_cast (b.buffer) = *reinterpret_cast (v); + *static_cast (buf) = *reinterpret_cast (v); break; } case bind::time: case bind::timestamp: { - *static_cast (b.buffer) = + *static_cast (buf) = *reinterpret_cast (v); break; } @@ -481,24 +520,37 @@ namespace odb case bind::bit: case bind::varbit: { + // Currently this is neither supported (due to capacity) nor used + // in batches. + // +#ifdef LIBPGSQL_EXTRA_CHECKS + assert (pos == 0); +#endif + *b.size = static_cast ( PQgetlength (result, static_cast (row), c)); - if (b.capacity < *b.size) - { - if (b.truncated) - *b.truncated = true; + if (b.capacity < *b.size) + { + if (b.truncated) + *b.truncated = true; - r = false; - continue; - } + r = false; + continue; + } + + // In these cases b.buffer is a pointer to pointer to buffer so we + // need to chase one level. + // + if (b.type != bind::bit) + buf = *static_cast (buf); - memcpy (b.buffer, v, *b.size); - break; + memcpy (buf, v, *b.size); + break; } case bind::uuid: { - memcpy (b.buffer, v, 16); + memcpy (buf, v, 16); break; } } @@ -514,6 +566,378 @@ namespace odb return r; } +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + // Note that this function always marks the connection as failed. + // + static void + translate_connection_error (connection& conn) + { + const char* m (PQerrorMessage (conn.handle ())); + + if (PQstatus (conn.handle ()) == CONNECTION_BAD) + { + conn.mark_failed (); + throw connection_lost (); + } + else + { + conn.mark_failed (); + throw database_exception (m != 0 ? m : "bad connection state"); + } + } + + // A RAII object for PGconn's non-blocking pipeline mode. + // + struct pipeline + { + connection& conn; + int sock; + + explicit + pipeline (connection& c) + : conn (c) + { + PGconn* ch (conn.handle ()); + + if ((sock = PQsocket (ch)) == -1 || + PQsetnonblocking (ch, 1) == -1 || + PQenterPipelineMode (ch) == 0) + { + translate_connection_error (conn); + } + } + + void + close (bool throw_ = true) + { + if (!conn.failed ()) + { + PGconn* ch (conn.handle ()); + + if (PQexitPipelineMode (ch) == 0 || + PQsetnonblocking (ch, 0) == -1) + { + if (throw_) + translate_connection_error (conn); + else + conn.mark_failed (); + } + } + } + + ~pipeline () + { + close (false); + } + + pair + wait (bool write, bool throw_ = true) + { + fd_set wds; + fd_set rds; + + for (;;) + { + if (write) + { + FD_ZERO (&wds); + FD_SET (sock, &wds); + } + + FD_ZERO (&rds); + FD_SET (sock, &rds); + + if (select (sock + 1, &rds, write ? &wds : 0, 0, 0) != -1) + break; + + if (errno != EINTR) + { + if (throw_) + translate_connection_error (conn); + else + { + conn.mark_failed (); + return pair (false, false); + } + } + } + + return pair (FD_ISSET (sock, &rds), + write && FD_ISSET (sock, &wds)); + } + }; + + // A RAII object for recovering from an error in a pipeline. + // + // Specifically, it reads and discards results until reaching + // PGRES_PIPELINE_SYNC. + // + struct pipeline_recovery + { + pipeline_recovery (pipeline& pl, bool wdone, bool sync) + : pl_ (&pl), wdone_ (wdone), sync_ (sync) + { + } + + ~pipeline_recovery () + { + if (pl_ != 0 && !pl_->conn.failed ()) + { + PGconn* ch (pl_->conn.handle ()); + + // This code runs as part of stack unwinding caused by an exception + // so if we encounter an error, we "upgrade" the existing exception + // by marking the connection as failed. + // + // The rest is essentially a special version of execute() below. + // + // Note that on the first iteration we may still have results from + // the previous call to PQconsumeInput() (and these results may + // be the entire outstanding sequence, in which case calling wait() + // will block indefinitely). + // + for (bool first (true);; first = false) + { + if (sync_) + { + assert (!wdone_); + + if (PQpipelineSync (ch) == 0) + break; + + sync_ = false; + } + + pair r (false, false); + + if (!first) + { + r = pl_->wait (!wdone_); + if (!r.first && !r.second) + break; + } + + if (r.first /* read */ || first) + { + if (r.first && PQconsumeInput (ch) == 0) + break; + + while (PQisBusy (ch) == 0) + { + auto_handle res (PQgetResult (ch)); + + // We should only get NULLs as well as PGRES_PIPELINE_ABORTED + // finished with PGRES_PIPELINE_SYNC. + // + if (res != 0) + { + ExecStatusType stat (PQresultStatus (res)); + + if (stat == PGRES_PIPELINE_SYNC) + return; + + assert (stat == PGRES_PIPELINE_ABORTED); + } + } + } + + if (r.second /* write */) + { + int r (PQflush (ch)); + if (r == -1) + break; + + if (r == 0) + wdone_ = true; + } + } + + pl_->conn.mark_failed (); + } + } + + void + cancel () + { + pl_ = 0; + } + + private: + pipeline* pl_; + bool wdone_; + bool sync_; + }; + + size_t statement:: + execute (const binding& param, + native_binding& native_param, + size_t n, + multiple_exceptions& mex, + bool (*process) (size_t, PGresult*, bool, void*), + void* data) + { + size_t i (0); // Parameter set being attempted. + mex.current (i); + + PGconn* ch (conn_.handle ()); + + pipeline pl (conn_); + + // True if we've written and read everything, respectively. + // + bool wdone (false), rdone (false); + + for (size_t wn (0), rn (0); !rdone; ) + { + // Note that there is a special version of this code above in + // ~pipeline_recovery(). + // + pair r (pl.wait (!wdone)); + + // Note that once we start the pipeline, any call that may throw + // without marking the connection as failed should be guarded by + // pipeline_recovery. + + // Try to minimize the chance of blocking the server by first + // processing the result and then sending more queries. + // + if (r.first /* read */) + { + if (PQconsumeInput (ch) == 0) + translate_connection_error (conn_); + + while (PQisBusy (ch) == 0) + { + auto_handle res (PQgetResult (ch)); + + ExecStatusType stat (PGRES_FATAL_ERROR); + bool gr (is_good_result (res, &stat)); + + if (stat == PGRES_PIPELINE_SYNC) + { + assert (wdone && rn == n); + rdone = true; + break; + } + + assert (rn != n); + ++rn; + + if (stat != PGRES_PIPELINE_ABORTED) + { + // translate_error() may throw an exception (e.g., deadlock) + // without marking the connection as failed. + // + { + pipeline_recovery plr (pl, wdone, wn != n); + + if (!process (i, res, gr, data)) + translate_error (conn_, res, i, &mex); + + plr.cancel (); + } + + mex.attempted (++i); + mex.current (i); + } + else + { + // Should we treat PGRES_PIPELINE_ABORTED entries as attempted + // or not? While we did issue PQsendQueryPrepared() for them, + // the server tells us that it did not attemp to execute them. + // So it feels like they should not be treated as attempted. + // + // Note that for this to fit into out multiple_exceptions model, + // such an incomplete batch should be fatal (otherwise we could + // end up with unattempted "holes"). This is currently the case + // for errors handled by translate_error() but not necessarily + // the case for those handled by the process function (e.g., + // duplicate id handled by process_insert_result() below). So in + // a somewhat hackish way we assume the error (e.g., duplicate + // id) will always be translated to an exception and pre-mark + // multiple_exceptions as fatal. + // + mex.fatal (true); + } + + // We get a NULL result after each query result. + // + { + PGresult* end (PQgetResult (ch)); + assert (end == 0); + } + } + } + + if (r.second /* write */) + { + // Send queries until we get blocked (write-biased). This feels like + // a better overall strategy to keep the server busy compared to + // sending one query at a time and then re-checking if there is + // anything to read because the results of INSERT/UPDATE/DELETE are + // presumably small and quite a few of them can get buffered before + // the server gets blocked. + // + for (;;) + { + if (wn != n) + { + bind_param (native_param, param, wn); + + if (PQsendQueryPrepared (ch, + name_, + static_cast (native_param.count), + native_param.values, + native_param.lengths, + native_param.formats, + 1) == 0) + translate_connection_error (conn_); + + if (++wn == n) + { + if (PQpipelineSync (ch) == 0) + translate_connection_error (conn_); + } + } + + // PQflush() result: + // + // 0 -- success (queue is now empty) + // 1 -- blocked + // -1 -- error + // + int r (PQflush (ch)); + if (r == -1) + translate_connection_error (conn_); + + if (r == 0) + { + if (wn != n) + { + // If we continue here, then we are write-biased. And if we + // break, then we are read-biased. + // +#ifdef LIBPGSQL_READ_BIASED + break; +#else + continue; +#endif + } + + wdone = true; + } + + break; // Blocked or done. + } + } + } + + pl.close (); + return i; + } +#endif + // // select_statement // @@ -689,10 +1113,7 @@ namespace odb return no_data; assert (current_row_ > 0); - return bind_result (result_.bind, - result_.count, - handle_, - current_row_ - 1) + return bind_result (result_, handle_, current_row_ - 1) ? success : truncated; } @@ -703,11 +1124,7 @@ namespace odb assert (current_row_ > 0); assert (current_row_ <= row_count_); - if (!bind_result (result_.bind, - result_.count, - handle_, - current_row_ - 1, - true)) + if (!bind_result (result_, handle_, current_row_ - 1, true)) assert (false); } @@ -738,6 +1155,8 @@ namespace odb native_param_ (native_param), returning_ (returning) { + if (returning_ != 0) + assert (returning_->count == 1); } insert_statement:: @@ -759,6 +1178,8 @@ namespace odb native_param_ (native_param), returning_ (returning) { + if (returning_ != 0) + assert (returning_->count == 1); } bool insert_statement:: @@ -792,9 +1213,9 @@ namespace odb // if (returning_ == 0 && stat == PGRES_FATAL_ERROR) { - string s (PQresultErrorField (h, PG_DIAG_SQLSTATE)); + const char* ss (PQresultErrorField (h, PG_DIAG_SQLSTATE)); - if (s == "23505") + if (ss != 0 && strcmp (ss, "23505") == 0) return false; } @@ -802,11 +1223,76 @@ namespace odb } if (returning_ != 0) - bind_result (returning_->bind, 1, h, 0, false); + bind_result (*returning_, h, 0); + + return true; + } + +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + struct insert_data + { + binding& param; + binding* returning; + }; + + static bool + process_insert_result (size_t i, PGresult* r, bool gr, void* data) + { + insert_data& d (*static_cast (data)); + + unsigned long long& s (d.param.status[i]); + s = 1; + + if (gr) + { + // Note that the result can never be truncated. + // + if (d.returning != 0) + statement::bind_result (*d.returning, r, 0, false, i); + } + else + { + // An auto-assigned object id should never cause a duplicate + // primary key. + // + if (d.returning == 0 && + r != 0 && PQresultStatus (r) == PGRES_FATAL_ERROR) + { + // Note that statement::execute() assumes that this will eventually + // be translated to an entry in multiple_exceptions. + // + const char* ss (PQresultErrorField (r, PG_DIAG_SQLSTATE)); + + if (ss != 0 && strcmp (ss, "23505") == 0) + s = 0; + } + + if (s == 1) + return false; + } return true; } + size_t insert_statement:: + execute (size_t n, multiple_exceptions& mex) + { + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + + insert_data d {param_, returning_}; + + return statement::execute ( + param_, native_param_, n, mex, &process_insert_result, &d); + } +#endif + // // update_statement // @@ -881,6 +1367,43 @@ namespace odb return affected_row_count (h); } +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + static bool + process_update_result (size_t i, PGresult* r, bool gr, void* data) + { + binding& param (*static_cast (data)); + + unsigned long long& s (param.status[i]); + + if (gr) + { + s = affected_row_count (r); + return true; + } + else + { + s = update_statement::result_unknown; + return false; + } + } + + size_t update_statement:: + execute (size_t n, multiple_exceptions& mex) + { + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + + return statement::execute ( + param_, native_param_, n, mex, &process_update_result, ¶m_); + } +#endif + // // delete_statement // @@ -969,5 +1492,44 @@ namespace odb return affected_row_count (h); } + +#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32) + + static bool + process_delete_result (size_t i, PGresult* r, bool gr, void* data) + { + binding& param (*static_cast (data)); + + unsigned long long& s (param.status[i]); + + if (gr) + { + s = affected_row_count (r); + return true; + } + else + { + s = delete_statement::result_unknown; + return false; + } + } + + size_t delete_statement:: + execute (size_t n, multiple_exceptions& mex) + { + assert (param_ != 0); + + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + + return statement::execute ( + *param_, native_param_, n, mex, &process_delete_result, param_); + } +#endif } } -- cgit v1.1