diff options
-rw-r--r-- | odb/pgsql/error.hxx | 6 | ||||
-rw-r--r-- | odb/pgsql/error.ixx | 7 | ||||
-rw-r--r-- | odb/pgsql/statement.cxx | 222 |
3 files changed, 229 insertions, 6 deletions
diff --git a/odb/pgsql/error.hxx b/odb/pgsql/error.hxx index 36ecc44..24743ef 100644 --- a/odb/pgsql/error.hxx +++ b/odb/pgsql/error.hxx @@ -23,9 +23,9 @@ namespace odb LIBODB_PGSQL_EXPORT void translate_error (connection& c, PGresult* r); - // Return true if the PGresult is in an error state. If both s and r are - // non-null, the pointed to value will be populated with the result status. - // Otherwise, s is ignored. + // Return true if PGresult is not NULL and is not in an error state. If + // both s and r are non-null, the pointed to value will be populated with + // the result status. Otherwise, s is ignored. // LIBODB_PGSQL_EXPORT bool is_good_result (PGresult* r, ExecStatusType* s = 0); diff --git a/odb/pgsql/error.ixx b/odb/pgsql/error.ixx index 0cda31b..74c0ad7 100644 --- a/odb/pgsql/error.ixx +++ b/odb/pgsql/error.ixx @@ -5,8 +5,8 @@ namespace odb { namespace pgsql { - bool - inline is_good_result (PGresult* r, ExecStatusType* s) + inline bool + is_good_result (PGresult* r, ExecStatusType* s) { if (r != 0) { @@ -18,7 +18,8 @@ namespace odb return status != PGRES_BAD_RESPONSE && status != PGRES_NONFATAL_ERROR && - status != PGRES_FATAL_ERROR; + status != PGRES_FATAL_ERROR && + status != PGRES_PIPELINE_ABORTED; } return false; diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx index 977db72..b66a062 100644 --- a/odb/pgsql/statement.cxx +++ b/odb/pgsql/statement.cxx @@ -1,16 +1,22 @@ // file : odb/pgsql/statement.cxx // license : GNU GPL v2; see accompanying LICENSE file +#include <errno.h> +#include <sys/select.h> + #include <cstdlib> // std::atol #include <cassert> #include <sstream> // istringstream +#include <iostream> //@@ TMP + #include <libpq-fe.h> #include <odb/tracer.hxx> #include <odb/pgsql/pgsql-oid.hxx> #include <odb/pgsql/statement.hxx> +#include <odb/pgsql/exceptions.hxx> #include <odb/pgsql/connection.hxx> #include <odb/pgsql/transaction.hxx> #include <odb/pgsql/auto-handle.hxx> @@ -761,6 +767,221 @@ namespace odb { } +#if 1 + +#define PIPELINE_SYNC + + bool insert_statement:: + execute () + { + bind_param (native_param_, param_); + + /* + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + */ + + PGconn* conn (conn_.handle ()); + + int sock (PQsocket (conn)); + if (sock == -1) + { + // @@ TODO: bad connection + throw database_exception ("bad connection"); + } + + // @@ TODO RAII (but need to make sure process everything). + // + if (PQsetnonblocking (conn, 1) == -1 || + PQenterPipelineMode (conn) == 0) + { + // @@ TODO: + throw database_exception (PQerrorMessage (conn)); + } + + // True if we've written and read everything, respectively. + // + bool wdone (false), rdone (false); + + for (size_t wn (0), rn (0), n (1); !rdone; ) + { + fd_set wds; + if (!wdone) + { + FD_ZERO (&wds); + FD_SET (sock, &wds); + } + + fd_set rds; + FD_ZERO (&rds); + FD_SET (sock, &rds); + + if (select (sock + 1, &rds, wdone ? 0 : &wds, 0, 0) == -1) + { + if (errno == EINTR) + continue; + + //@@ TODO + throw database_exception ("bad connection"); + } + + // Try to minimize the chance of blocking the server by first + // processing the result and then sending more queries. + // + if (FD_ISSET (sock, &rds)) + { + cerr << "PQconsumeInput" << endl; + + if (PQconsumeInput (conn) == 0) + { + // @@ TODO + throw database_exception (PQerrorMessage (conn)); + } + + while (PQisBusy (conn) == 0) + { + cerr << "PQgetResult" << endl; + + { + auto_handle<PGresult> res (PQgetResult (conn)); + + ExecStatusType stat (PGRES_FATAL_ERROR); + + if (!is_good_result (res, &stat)) + { + //@@ TODO: cannot throw/return until process all results. + + // An auto-assigned object id should never cause a duplicate + // primary key. + // + if (returning_ == 0 && stat == PGRES_FATAL_ERROR) + { + string s (PQresultErrorField (res, PG_DIAG_SQLSTATE)); + + if (s == "23505") + return false; + } + + translate_error (conn_, res); + } + +#ifdef PIPELINE_SYNC + if (stat == PGRES_PIPELINE_SYNC) + { + assert (rn == n); + rdone = true; + break; + } +#endif + + if (returning_ != 0) + bind_result (returning_->bind, 1, res, 0, false); + } + + // We get a NULL result after each query result. + // + { + PGresult* end (PQgetResult (conn)); + assert (end == 0); + } + + ++rn; +#ifndef PIPELINE_SYNC + if (rn == n) + { + rdone = true; + break; + } +#endif + } + } + + if (!wdone && FD_ISSET (sock, &wds)) + { + // Send queries until we get blocked. This feels like a better + // overall strategy to keep the server busy compared to sending one + // query at a time and then re-checking if there is anything to read + // because the results of INSERT/UPDATE/DELETE are presumably small + // and quite a few of them can get buffered before the server gets + // blocked. + // + for (;;) + { + if (wn != n) + { + cerr << "PQsendQueryPrepared" << endl; + + if (PQsendQueryPrepared (conn, + name_, + static_cast<int> (native_param_.count), + native_param_.values, + native_param_.lengths, + native_param_.formats, + 1) == 0) + { + // @@ TODO: probably want to check/mark connection. + // + throw database_exception (PQerrorMessage (conn)); + } + + ++wn; + +#ifdef PIPELINE_SYNC + if (wn == n) + { + cerr << "PQpipelineSync" << endl; + + if (PQpipelineSync (conn) == 0) + { + //@@ TODO + throw database_exception (PQerrorMessage (conn)); + } + } +#endif + } + + // PQflush() result: + // + // 0 -- success (queue is now empty) + // 1 -- blocked + // -1 -- error + // + int r (PQflush (conn)); + if (r == -1) + { + // @@ TODO (no doc PQerrorMessage() is the way to do it). + throw database_exception (PQerrorMessage (conn)); + } + + cerr << "PQflush " << r << endl; + + if (r == 0) + { + if (wn != n) + continue; + + wdone = true; + } + + break; // Blocked or done. + } + } + } + + if (PQexitPipelineMode (conn) == 0 || + PQsetnonblocking (conn, 0) == -1) + { + throw database_exception (PQerrorMessage (conn)); + } + + return true; + } +#else bool insert_statement:: execute () { @@ -806,6 +1027,7 @@ namespace odb return true; } +#endif // // update_statement |