aboutsummaryrefslogtreecommitdiff
path: root/odb/pgsql/statement.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'odb/pgsql/statement.cxx')
-rw-r--r--odb/pgsql/statement.cxx690
1 files changed, 626 insertions, 64 deletions
diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx
index 977db72..c88e621 100644
--- a/odb/pgsql/statement.cxx
+++ b/odb/pgsql/statement.cxx
@@ -1,16 +1,32 @@
// file : odb/pgsql/statement.cxx
// license : GNU GPL v2; see accompanying LICENSE file
-#include <cstdlib> // std::atol
-#include <cassert>
-#include <sstream> // istringstream
+#include <odb/details/config.hxx> // ODB_CXX11
#include <libpq-fe.h>
+#ifdef LIBPQ_HAS_PIPELINING
+# ifndef _WIN32
+# include <errno.h>
+# include <sys/select.h>
+# endif
+#endif
+
+#include <cstring> // strcmp
+#include <utility> // pair
+#include <cassert>
+
+#ifdef ODB_CXX11
+# include <cstdlib> // strtoull
+#else
+# include <sstream> // istringstream
+#endif
+
#include <odb/tracer.hxx>
#include <odb/pgsql/pgsql-oid.hxx>
#include <odb/pgsql/statement.hxx>
+#include <odb/pgsql/exceptions.hxx>
#include <odb/pgsql/connection.hxx>
#include <odb/pgsql/transaction.hxx>
#include <odb/pgsql/auto-handle.hxx>
@@ -36,11 +52,12 @@ namespace odb
count = static_cast<unsigned long long> (s[0] - '0');
else
{
- // @@ Using stringstream conversion for now. See if we can optimize
- // this (atoll possibly, even though it is not standard).
- //
+#ifdef ODB_CXX11
+ count = strtoull (s, 0, 10);
+#else
istringstream ss (s);
ss >> count;
+#endif
}
return count;
@@ -256,30 +273,39 @@ namespace odb
return text_;
}
+ template <typename T>
+ static inline T*
+ offset (T* base, size_t count, size_t size)
+ {
+ return reinterpret_cast<T*> (
+ reinterpret_cast<char*> (base) + count * size);
+ }
+
void statement::
- bind_param (native_binding& n, const binding& b)
+ bind_param (native_binding& ns, const binding& bs, size_t pos)
{
- assert (n.count == b.count);
+ assert (ns.count == bs.count);
- for (size_t i (0); i < n.count; ++i)
+ for (size_t i (0); i < ns.count; ++i)
{
- const bind& current_bind (b.bind[i]);
+ const bind& b (bs.bind[i]);
- n.formats[i] = 1;
+ ns.formats[i] = 1;
- if (current_bind.buffer == 0 || // Skip NULL entries.
- (current_bind.is_null != 0 && *current_bind.is_null))
+ bool* n (b.is_null != 0 ? offset (b.is_null, pos, bs.skip) : 0);
+
+ if ((n != 0 && *n) || b.buffer == 0) // Handle NULL entries.
{
- n.values[i] = 0;
- n.lengths[i] = 0;
+ ns.values[i] = 0;
+ ns.lengths[i] = 0;
continue;
}
- n.values[i] = static_cast<char*> (current_bind.buffer);
+ ns.values[i] = static_cast<char*> (offset (b.buffer, pos, bs.skip));
size_t l (0);
- switch (current_bind.type)
+ switch (b.type)
{
case bind::boolean_:
{
@@ -325,10 +351,18 @@ namespace odb
case bind::numeric:
case bind::text:
case bind::bytea:
- case bind::bit:
case bind::varbit:
{
- l = *current_bind.size;
+ // In this case b.buffer is a pointer to pointer to buffer so we
+ // need to chase one level.
+ //
+ ns.values[i] = static_cast<char*> (
+ *reinterpret_cast<void**> (ns.values[i]));
+ }
+ // Fall through.
+ case bind::bit:
+ {
+ l = *offset (b.size, pos, bs.skip);
break;
}
case bind::uuid:
@@ -340,54 +374,61 @@ namespace odb
}
}
- n.lengths[i] = static_cast<int> (l);
+ ns.lengths[i] = static_cast<int> (l);
}
}
bool statement::
- bind_result (bind* p,
- size_t count,
+ bind_result (const binding& bs,
PGresult* result,
size_t row,
- bool truncated)
+ bool truncated,
+ size_t pos)
{
bool r (true);
int col_count (PQnfields (result));
int col (0);
- for (size_t i (0); i != count && col != col_count; ++i)
+ for (size_t i (0); i != bs.count && col != col_count; ++i)
{
- const bind& b (p[i]);
+ const bind& b (bs.bind[i]);
if (b.buffer == 0) // Skip NULL entries.
continue;
int c (col++);
- if (truncated && (b.truncated == 0 || !*b.truncated))
- continue;
+ {
+ bool* t (b.truncated != 0 ? offset (b.truncated, pos, bs.skip) : 0);
- if (b.truncated != 0)
- *b.truncated = false;
+ if (truncated && (t == 0 || !*t))
+ continue;
+
+ if (t != 0)
+ *t = false;
+ }
// Check for NULL unless we are reloading a truncated result.
//
if (!truncated)
{
- *b.is_null = PQgetisnull (result, static_cast<int> (row), c) == 1;
+ bool* n (offset (b.is_null, pos, bs.skip));
+
+ *n = PQgetisnull (result, static_cast<int> (row), c) == 1;
- if (*b.is_null)
+ if (*n)
continue;
}
+ void* buf (offset (b.buffer, pos, bs.skip));
+
const char* v (PQgetvalue (result, static_cast<int> (row), c));
switch (b.type)
{
case bind::boolean_:
{
- *static_cast<bool*> (b.buffer) =
- *reinterpret_cast<const bool*> (v);
+ *static_cast<bool*> (buf) = *reinterpret_cast<const bool*> (v);
break;
}
case bind::smallint:
@@ -430,19 +471,19 @@ namespace odb
{
case bind::smallint:
{
- *static_cast<short*> (b.buffer) =
+ *static_cast<short*> (buf) =
endian_traits::hton (static_cast<short> (i));
break;
}
case bind::integer:
{
- *static_cast<int*> (b.buffer) =
+ *static_cast<int*> (buf) =
endian_traits::hton (static_cast<int> (i));
break;
}
case bind::bigint:
{
- *static_cast<long long*> (b.buffer) = endian_traits::hton (i);
+ *static_cast<long long*> (buf) = endian_traits::hton (i);
break;
}
default:
@@ -453,25 +494,23 @@ namespace odb
}
case bind::real:
{
- *static_cast<float*> (b.buffer) =
- *reinterpret_cast<const float*> (v);
+ *static_cast<float*> (buf) = *reinterpret_cast<const float*> (v);
break;
}
case bind::double_:
{
- *static_cast<double*> (b.buffer) =
- *reinterpret_cast<const double*> (v);
+ *static_cast<double*> (buf) = *reinterpret_cast<const double*> (v);
break;
}
case bind::date:
{
- *static_cast<int*> (b.buffer) = *reinterpret_cast<const int*> (v);
+ *static_cast<int*> (buf) = *reinterpret_cast<const int*> (v);
break;
}
case bind::time:
case bind::timestamp:
{
- *static_cast<long long*> (b.buffer) =
+ *static_cast<long long*> (buf) =
*reinterpret_cast<const long long*> (v);
break;
}
@@ -481,24 +520,37 @@ namespace odb
case bind::bit:
case bind::varbit:
{
+ // Currently this is neither supported (due to capacity) nor used
+ // in batches.
+ //
+#ifdef LIBPGSQL_EXTRA_CHECKS
+ assert (pos == 0);
+#endif
+
*b.size = static_cast<size_t> (
PQgetlength (result, static_cast<int> (row), c));
- if (b.capacity < *b.size)
- {
- if (b.truncated)
- *b.truncated = true;
+ if (b.capacity < *b.size)
+ {
+ if (b.truncated)
+ *b.truncated = true;
- r = false;
- continue;
- }
+ r = false;
+ continue;
+ }
+
+ // In these cases b.buffer is a pointer to pointer to buffer so we
+ // need to chase one level.
+ //
+ if (b.type != bind::bit)
+ buf = *static_cast<void**> (buf);
- memcpy (b.buffer, v, *b.size);
- break;
+ memcpy (buf, v, *b.size);
+ break;
}
case bind::uuid:
{
- memcpy (b.buffer, v, 16);
+ memcpy (buf, v, 16);
break;
}
}
@@ -514,6 +566,378 @@ namespace odb
return r;
}
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ // Note that this function always marks the connection as failed.
+ //
+ static void
+ translate_connection_error (connection& conn)
+ {
+ const char* m (PQerrorMessage (conn.handle ()));
+
+ if (PQstatus (conn.handle ()) == CONNECTION_BAD)
+ {
+ conn.mark_failed ();
+ throw connection_lost ();
+ }
+ else
+ {
+ conn.mark_failed ();
+ throw database_exception (m != 0 ? m : "bad connection state");
+ }
+ }
+
+ // A RAII object for PGconn's non-blocking pipeline mode.
+ //
+ struct pipeline
+ {
+ connection& conn;
+ int sock;
+
+ explicit
+ pipeline (connection& c)
+ : conn (c)
+ {
+ PGconn* ch (conn.handle ());
+
+ if ((sock = PQsocket (ch)) == -1 ||
+ PQsetnonblocking (ch, 1) == -1 ||
+ PQenterPipelineMode (ch) == 0)
+ {
+ translate_connection_error (conn);
+ }
+ }
+
+ void
+ close (bool throw_ = true)
+ {
+ if (!conn.failed ())
+ {
+ PGconn* ch (conn.handle ());
+
+ if (PQexitPipelineMode (ch) == 0 ||
+ PQsetnonblocking (ch, 0) == -1)
+ {
+ if (throw_)
+ translate_connection_error (conn);
+ else
+ conn.mark_failed ();
+ }
+ }
+ }
+
+ ~pipeline ()
+ {
+ close (false);
+ }
+
+ pair<bool /* read */, bool /* write */>
+ wait (bool write, bool throw_ = true)
+ {
+ fd_set wds;
+ fd_set rds;
+
+ for (;;)
+ {
+ if (write)
+ {
+ FD_ZERO (&wds);
+ FD_SET (sock, &wds);
+ }
+
+ FD_ZERO (&rds);
+ FD_SET (sock, &rds);
+
+ if (select (sock + 1, &rds, write ? &wds : 0, 0, 0) != -1)
+ break;
+
+ if (errno != EINTR)
+ {
+ if (throw_)
+ translate_connection_error (conn);
+ else
+ {
+ conn.mark_failed ();
+ return pair<bool, bool> (false, false);
+ }
+ }
+ }
+
+ return pair<bool, bool> (FD_ISSET (sock, &rds),
+ write && FD_ISSET (sock, &wds));
+ }
+ };
+
+ // A RAII object for recovering from an error in a pipeline.
+ //
+ // Specifically, it reads and discards results until reaching
+ // PGRES_PIPELINE_SYNC.
+ //
+ struct pipeline_recovery
+ {
+ pipeline_recovery (pipeline& pl, bool wdone, bool sync)
+ : pl_ (&pl), wdone_ (wdone), sync_ (sync)
+ {
+ }
+
+ ~pipeline_recovery ()
+ {
+ if (pl_ != 0 && !pl_->conn.failed ())
+ {
+ PGconn* ch (pl_->conn.handle ());
+
+ // This code runs as part of stack unwinding caused by an exception
+ // so if we encounter an error, we "upgrade" the existing exception
+ // by marking the connection as failed.
+ //
+ // The rest is essentially a special version of execute() below.
+ //
+ // Note that on the first iteration we may still have results from
+ // the previous call to PQconsumeInput() (and these results may
+ // be the entire outstanding sequence, in which case calling wait()
+ // will block indefinitely).
+ //
+ for (bool first (true);; first = false)
+ {
+ if (sync_)
+ {
+ assert (!wdone_);
+
+ if (PQpipelineSync (ch) == 0)
+ break;
+
+ sync_ = false;
+ }
+
+ pair<bool, bool> r (false, false);
+
+ if (!first)
+ {
+ r = pl_->wait (!wdone_);
+ if (!r.first && !r.second)
+ break;
+ }
+
+ if (r.first /* read */ || first)
+ {
+ if (r.first && PQconsumeInput (ch) == 0)
+ break;
+
+ while (PQisBusy (ch) == 0)
+ {
+ auto_handle<PGresult> res (PQgetResult (ch));
+
+ // We should only get NULLs as well as PGRES_PIPELINE_ABORTED
+ // finished with PGRES_PIPELINE_SYNC.
+ //
+ if (res != 0)
+ {
+ ExecStatusType stat (PQresultStatus (res));
+
+ if (stat == PGRES_PIPELINE_SYNC)
+ return;
+
+ assert (stat == PGRES_PIPELINE_ABORTED);
+ }
+ }
+ }
+
+ if (r.second /* write */)
+ {
+ int r (PQflush (ch));
+ if (r == -1)
+ break;
+
+ if (r == 0)
+ wdone_ = true;
+ }
+ }
+
+ pl_->conn.mark_failed ();
+ }
+ }
+
+ void
+ cancel ()
+ {
+ pl_ = 0;
+ }
+
+ private:
+ pipeline* pl_;
+ bool wdone_;
+ bool sync_;
+ };
+
+ size_t statement::
+ execute (const binding& param,
+ native_binding& native_param,
+ size_t n,
+ multiple_exceptions& mex,
+ bool (*process) (size_t, PGresult*, bool, void*),
+ void* data)
+ {
+ size_t i (0); // Parameter set being attempted.
+ mex.current (i);
+
+ PGconn* ch (conn_.handle ());
+
+ pipeline pl (conn_);
+
+ // True if we've written and read everything, respectively.
+ //
+ bool wdone (false), rdone (false);
+
+ for (size_t wn (0), rn (0); !rdone; )
+ {
+ // Note that there is a special version of this code above in
+ // ~pipeline_recovery().
+ //
+ pair<bool, bool> r (pl.wait (!wdone));
+
+ // Note that once we start the pipeline, any call that may throw
+ // without marking the connection as failed should be guarded by
+ // pipeline_recovery.
+
+ // Try to minimize the chance of blocking the server by first
+ // processing the result and then sending more queries.
+ //
+ if (r.first /* read */)
+ {
+ if (PQconsumeInput (ch) == 0)
+ translate_connection_error (conn_);
+
+ while (PQisBusy (ch) == 0)
+ {
+ auto_handle<PGresult> res (PQgetResult (ch));
+
+ ExecStatusType stat (PGRES_FATAL_ERROR);
+ bool gr (is_good_result (res, &stat));
+
+ if (stat == PGRES_PIPELINE_SYNC)
+ {
+ assert (wdone && rn == n);
+ rdone = true;
+ break;
+ }
+
+ assert (rn != n);
+ ++rn;
+
+ if (stat != PGRES_PIPELINE_ABORTED)
+ {
+ // translate_error() may throw an exception (e.g., deadlock)
+ // without marking the connection as failed.
+ //
+ {
+ pipeline_recovery plr (pl, wdone, wn != n);
+
+ if (!process (i, res, gr, data))
+ translate_error (conn_, res, i, &mex);
+
+ plr.cancel ();
+ }
+
+ mex.attempted (++i);
+ mex.current (i);
+ }
+ else
+ {
+ // Should we treat PGRES_PIPELINE_ABORTED entries as attempted
+ // or not? While we did issue PQsendQueryPrepared() for them,
+ // the server tells us that it did not attemp to execute them.
+ // So it feels like they should not be treated as attempted.
+ //
+ // Note that for this to fit into out multiple_exceptions model,
+ // such an incomplete batch should be fatal (otherwise we could
+ // end up with unattempted "holes"). This is currently the case
+ // for errors handled by translate_error() but not necessarily
+ // the case for those handled by the process function (e.g.,
+ // duplicate id handled by process_insert_result() below). So in
+ // a somewhat hackish way we assume the error (e.g., duplicate
+ // id) will always be translated to an exception and pre-mark
+ // multiple_exceptions as fatal.
+ //
+ mex.fatal (true);
+ }
+
+ // We get a NULL result after each query result.
+ //
+ {
+ PGresult* end (PQgetResult (ch));
+ assert (end == 0);
+ }
+ }
+ }
+
+ if (r.second /* write */)
+ {
+ // Send queries until we get blocked (write-biased). This feels like
+ // a better overall strategy to keep the server busy compared to
+ // sending one query at a time and then re-checking if there is
+ // anything to read because the results of INSERT/UPDATE/DELETE are
+ // presumably small and quite a few of them can get buffered before
+ // the server gets blocked.
+ //
+ for (;;)
+ {
+ if (wn != n)
+ {
+ bind_param (native_param, param, wn);
+
+ if (PQsendQueryPrepared (ch,
+ name_,
+ static_cast<int> (native_param.count),
+ native_param.values,
+ native_param.lengths,
+ native_param.formats,
+ 1) == 0)
+ translate_connection_error (conn_);
+
+ if (++wn == n)
+ {
+ if (PQpipelineSync (ch) == 0)
+ translate_connection_error (conn_);
+ }
+ }
+
+ // PQflush() result:
+ //
+ // 0 -- success (queue is now empty)
+ // 1 -- blocked
+ // -1 -- error
+ //
+ int r (PQflush (ch));
+ if (r == -1)
+ translate_connection_error (conn_);
+
+ if (r == 0)
+ {
+ if (wn != n)
+ {
+ // If we continue here, then we are write-biased. And if we
+ // break, then we are read-biased.
+ //
+#ifdef LIBPGSQL_READ_BIASED
+ break;
+#else
+ continue;
+#endif
+ }
+
+ wdone = true;
+ }
+
+ break; // Blocked or done.
+ }
+ }
+ }
+
+ pl.close ();
+ return i;
+ }
+#endif
+
//
// select_statement
//
@@ -689,10 +1113,7 @@ namespace odb
return no_data;
assert (current_row_ > 0);
- return bind_result (result_.bind,
- result_.count,
- handle_,
- current_row_ - 1)
+ return bind_result (result_, handle_, current_row_ - 1)
? success
: truncated;
}
@@ -703,11 +1124,7 @@ namespace odb
assert (current_row_ > 0);
assert (current_row_ <= row_count_);
- if (!bind_result (result_.bind,
- result_.count,
- handle_,
- current_row_ - 1,
- true))
+ if (!bind_result (result_, handle_, current_row_ - 1, true))
assert (false);
}
@@ -738,6 +1155,8 @@ namespace odb
native_param_ (native_param),
returning_ (returning)
{
+ if (returning_ != 0)
+ assert (returning_->count == 1);
}
insert_statement::
@@ -759,6 +1178,8 @@ namespace odb
native_param_ (native_param),
returning_ (returning)
{
+ if (returning_ != 0)
+ assert (returning_->count == 1);
}
bool insert_statement::
@@ -792,9 +1213,9 @@ namespace odb
//
if (returning_ == 0 && stat == PGRES_FATAL_ERROR)
{
- string s (PQresultErrorField (h, PG_DIAG_SQLSTATE));
+ const char* ss (PQresultErrorField (h, PG_DIAG_SQLSTATE));
- if (s == "23505")
+ if (ss != 0 && strcmp (ss, "23505") == 0)
return false;
}
@@ -802,11 +1223,76 @@ namespace odb
}
if (returning_ != 0)
- bind_result (returning_->bind, 1, h, 0, false);
+ bind_result (*returning_, h, 0);
+
+ return true;
+ }
+
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ struct insert_data
+ {
+ binding& param;
+ binding* returning;
+ };
+
+ static bool
+ process_insert_result (size_t i, PGresult* r, bool gr, void* data)
+ {
+ insert_data& d (*static_cast<insert_data*> (data));
+
+ unsigned long long& s (d.param.status[i]);
+ s = 1;
+
+ if (gr)
+ {
+ // Note that the result can never be truncated.
+ //
+ if (d.returning != 0)
+ statement::bind_result (*d.returning, r, 0, false, i);
+ }
+ else
+ {
+ // An auto-assigned object id should never cause a duplicate
+ // primary key.
+ //
+ if (d.returning == 0 &&
+ r != 0 && PQresultStatus (r) == PGRES_FATAL_ERROR)
+ {
+ // Note that statement::execute() assumes that this will eventually
+ // be translated to an entry in multiple_exceptions.
+ //
+ const char* ss (PQresultErrorField (r, PG_DIAG_SQLSTATE));
+
+ if (ss != 0 && strcmp (ss, "23505") == 0)
+ s = 0;
+ }
+
+ if (s == 1)
+ return false;
+ }
return true;
}
+ size_t insert_statement::
+ execute (size_t n, multiple_exceptions& mex)
+ {
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+
+ insert_data d {param_, returning_};
+
+ return statement::execute (
+ param_, native_param_, n, mex, &process_insert_result, &d);
+ }
+#endif
+
//
// update_statement
//
@@ -881,6 +1367,43 @@ namespace odb
return affected_row_count (h);
}
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ static bool
+ process_update_result (size_t i, PGresult* r, bool gr, void* data)
+ {
+ binding& param (*static_cast<binding*> (data));
+
+ unsigned long long& s (param.status[i]);
+
+ if (gr)
+ {
+ s = affected_row_count (r);
+ return true;
+ }
+ else
+ {
+ s = update_statement::result_unknown;
+ return false;
+ }
+ }
+
+ size_t update_statement::
+ execute (size_t n, multiple_exceptions& mex)
+ {
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+
+ return statement::execute (
+ param_, native_param_, n, mex, &process_update_result, &param_);
+ }
+#endif
+
//
// delete_statement
//
@@ -969,5 +1492,44 @@ namespace odb
return affected_row_count (h);
}
+
+#if defined(LIBPQ_HAS_PIPELINING) && !defined(_WIN32)
+
+ static bool
+ process_delete_result (size_t i, PGresult* r, bool gr, void* data)
+ {
+ binding& param (*static_cast<binding*> (data));
+
+ unsigned long long& s (param.status[i]);
+
+ if (gr)
+ {
+ s = affected_row_count (r);
+ return true;
+ }
+ else
+ {
+ s = delete_statement::result_unknown;
+ return false;
+ }
+ }
+
+ size_t delete_statement::
+ execute (size_t n, multiple_exceptions& mex)
+ {
+ assert (param_ != 0);
+
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+
+ return statement::execute (
+ *param_, native_param_, n, mex, &process_delete_result, param_);
+ }
+#endif
}
}