aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2021-06-16 11:28:56 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2021-06-16 11:28:56 +0200
commitd094c398449be4366d6bfa93ad4fe58a19a159c1 (patch)
tree12d492c1b34bd9334e5fbe5c290c3f2d09ede617
parentc6019273a7564be9ed772d45aa95e09aaff37a23 (diff)
Pipeline experimentbulk
-rw-r--r--odb/pgsql/error.hxx6
-rw-r--r--odb/pgsql/error.ixx7
-rw-r--r--odb/pgsql/statement.cxx222
3 files changed, 229 insertions, 6 deletions
diff --git a/odb/pgsql/error.hxx b/odb/pgsql/error.hxx
index 36ecc44..24743ef 100644
--- a/odb/pgsql/error.hxx
+++ b/odb/pgsql/error.hxx
@@ -23,9 +23,9 @@ namespace odb
LIBODB_PGSQL_EXPORT void
translate_error (connection& c, PGresult* r);
- // Return true if the PGresult is in an error state. If both s and r are
- // non-null, the pointed to value will be populated with the result status.
- // Otherwise, s is ignored.
+ // Return true if PGresult is not NULL and is not in an error state. If
+ // both s and r are non-null, the pointed to value will be populated with
+ // the result status. Otherwise, s is ignored.
//
LIBODB_PGSQL_EXPORT bool
is_good_result (PGresult* r, ExecStatusType* s = 0);
diff --git a/odb/pgsql/error.ixx b/odb/pgsql/error.ixx
index 0cda31b..74c0ad7 100644
--- a/odb/pgsql/error.ixx
+++ b/odb/pgsql/error.ixx
@@ -5,8 +5,8 @@ namespace odb
{
namespace pgsql
{
- bool
- inline is_good_result (PGresult* r, ExecStatusType* s)
+ inline bool
+ is_good_result (PGresult* r, ExecStatusType* s)
{
if (r != 0)
{
@@ -18,7 +18,8 @@ namespace odb
return
status != PGRES_BAD_RESPONSE &&
status != PGRES_NONFATAL_ERROR &&
- status != PGRES_FATAL_ERROR;
+ status != PGRES_FATAL_ERROR &&
+ status != PGRES_PIPELINE_ABORTED;
}
return false;
diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx
index 977db72..b66a062 100644
--- a/odb/pgsql/statement.cxx
+++ b/odb/pgsql/statement.cxx
@@ -1,16 +1,22 @@
// file : odb/pgsql/statement.cxx
// license : GNU GPL v2; see accompanying LICENSE file
+#include <errno.h>
+#include <sys/select.h>
+
#include <cstdlib> // std::atol
#include <cassert>
#include <sstream> // istringstream
+#include <iostream> //@@ TMP
+
#include <libpq-fe.h>
#include <odb/tracer.hxx>
#include <odb/pgsql/pgsql-oid.hxx>
#include <odb/pgsql/statement.hxx>
+#include <odb/pgsql/exceptions.hxx>
#include <odb/pgsql/connection.hxx>
#include <odb/pgsql/transaction.hxx>
#include <odb/pgsql/auto-handle.hxx>
@@ -761,6 +767,221 @@ namespace odb
{
}
+#if 1
+
+#define PIPELINE_SYNC
+
+ bool insert_statement::
+ execute ()
+ {
+ bind_param (native_param_, param_);
+
+ /*
+ {
+ odb::tracer* t;
+ if ((t = conn_.transaction_tracer ()) ||
+ (t = conn_.tracer ()) ||
+ (t = conn_.database ().tracer ()))
+ t->execute (conn_, *this);
+ }
+ */
+
+ PGconn* conn (conn_.handle ());
+
+ int sock (PQsocket (conn));
+ if (sock == -1)
+ {
+ // @@ TODO: bad connection
+ throw database_exception ("bad connection");
+ }
+
+ // @@ TODO RAII (but need to make sure process everything).
+ //
+ if (PQsetnonblocking (conn, 1) == -1 ||
+ PQenterPipelineMode (conn) == 0)
+ {
+ // @@ TODO:
+ throw database_exception (PQerrorMessage (conn));
+ }
+
+ // True if we've written and read everything, respectively.
+ //
+ bool wdone (false), rdone (false);
+
+ for (size_t wn (0), rn (0), n (1); !rdone; )
+ {
+ fd_set wds;
+ if (!wdone)
+ {
+ FD_ZERO (&wds);
+ FD_SET (sock, &wds);
+ }
+
+ fd_set rds;
+ FD_ZERO (&rds);
+ FD_SET (sock, &rds);
+
+ if (select (sock + 1, &rds, wdone ? 0 : &wds, 0, 0) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+
+ //@@ TODO
+ throw database_exception ("bad connection");
+ }
+
+ // Try to minimize the chance of blocking the server by first
+ // processing the result and then sending more queries.
+ //
+ if (FD_ISSET (sock, &rds))
+ {
+ cerr << "PQconsumeInput" << endl;
+
+ if (PQconsumeInput (conn) == 0)
+ {
+ // @@ TODO
+ throw database_exception (PQerrorMessage (conn));
+ }
+
+ while (PQisBusy (conn) == 0)
+ {
+ cerr << "PQgetResult" << endl;
+
+ {
+ auto_handle<PGresult> res (PQgetResult (conn));
+
+ ExecStatusType stat (PGRES_FATAL_ERROR);
+
+ if (!is_good_result (res, &stat))
+ {
+ //@@ TODO: cannot throw/return until process all results.
+
+ // An auto-assigned object id should never cause a duplicate
+ // primary key.
+ //
+ if (returning_ == 0 && stat == PGRES_FATAL_ERROR)
+ {
+ string s (PQresultErrorField (res, PG_DIAG_SQLSTATE));
+
+ if (s == "23505")
+ return false;
+ }
+
+ translate_error (conn_, res);
+ }
+
+#ifdef PIPELINE_SYNC
+ if (stat == PGRES_PIPELINE_SYNC)
+ {
+ assert (rn == n);
+ rdone = true;
+ break;
+ }
+#endif
+
+ if (returning_ != 0)
+ bind_result (returning_->bind, 1, res, 0, false);
+ }
+
+ // We get a NULL result after each query result.
+ //
+ {
+ PGresult* end (PQgetResult (conn));
+ assert (end == 0);
+ }
+
+ ++rn;
+#ifndef PIPELINE_SYNC
+ if (rn == n)
+ {
+ rdone = true;
+ break;
+ }
+#endif
+ }
+ }
+
+ if (!wdone && FD_ISSET (sock, &wds))
+ {
+ // Send queries until we get blocked. This feels like a better
+ // overall strategy to keep the server busy compared to sending one
+ // query at a time and then re-checking if there is anything to read
+ // because the results of INSERT/UPDATE/DELETE are presumably small
+ // and quite a few of them can get buffered before the server gets
+ // blocked.
+ //
+ for (;;)
+ {
+ if (wn != n)
+ {
+ cerr << "PQsendQueryPrepared" << endl;
+
+ if (PQsendQueryPrepared (conn,
+ name_,
+ static_cast<int> (native_param_.count),
+ native_param_.values,
+ native_param_.lengths,
+ native_param_.formats,
+ 1) == 0)
+ {
+ // @@ TODO: probably want to check/mark connection.
+ //
+ throw database_exception (PQerrorMessage (conn));
+ }
+
+ ++wn;
+
+#ifdef PIPELINE_SYNC
+ if (wn == n)
+ {
+ cerr << "PQpipelineSync" << endl;
+
+ if (PQpipelineSync (conn) == 0)
+ {
+ //@@ TODO
+ throw database_exception (PQerrorMessage (conn));
+ }
+ }
+#endif
+ }
+
+ // PQflush() result:
+ //
+ // 0 -- success (queue is now empty)
+ // 1 -- blocked
+ // -1 -- error
+ //
+ int r (PQflush (conn));
+ if (r == -1)
+ {
+ // @@ TODO (no doc PQerrorMessage() is the way to do it).
+ throw database_exception (PQerrorMessage (conn));
+ }
+
+ cerr << "PQflush " << r << endl;
+
+ if (r == 0)
+ {
+ if (wn != n)
+ continue;
+
+ wdone = true;
+ }
+
+ break; // Blocked or done.
+ }
+ }
+ }
+
+ if (PQexitPipelineMode (conn) == 0 ||
+ PQsetnonblocking (conn, 0) == -1)
+ {
+ throw database_exception (PQerrorMessage (conn));
+ }
+
+ return true;
+ }
+#else
bool insert_statement::
execute ()
{
@@ -806,6 +1027,7 @@ namespace odb
return true;
}
+#endif
//
// update_statement