From d094c398449be4366d6bfa93ad4fe58a19a159c1 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 16 Jun 2021 11:28:56 +0200 Subject: Pipeline experiment --- odb/pgsql/statement.cxx | 222 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) (limited to 'odb/pgsql/statement.cxx') diff --git a/odb/pgsql/statement.cxx b/odb/pgsql/statement.cxx index 977db72..b66a062 100644 --- a/odb/pgsql/statement.cxx +++ b/odb/pgsql/statement.cxx @@ -1,16 +1,22 @@ // file : odb/pgsql/statement.cxx // license : GNU GPL v2; see accompanying LICENSE file +#include +#include + #include // std::atol #include #include // istringstream +#include //@@ TMP + #include #include #include #include +#include #include #include #include @@ -761,6 +767,221 @@ namespace odb { } +#if 1 + +#define PIPELINE_SYNC + + bool insert_statement:: + execute () + { + bind_param (native_param_, param_); + + /* + { + odb::tracer* t; + if ((t = conn_.transaction_tracer ()) || + (t = conn_.tracer ()) || + (t = conn_.database ().tracer ())) + t->execute (conn_, *this); + } + */ + + PGconn* conn (conn_.handle ()); + + int sock (PQsocket (conn)); + if (sock == -1) + { + // @@ TODO: bad connection + throw database_exception ("bad connection"); + } + + // @@ TODO RAII (but need to make sure process everything). + // + if (PQsetnonblocking (conn, 1) == -1 || + PQenterPipelineMode (conn) == 0) + { + // @@ TODO: + throw database_exception (PQerrorMessage (conn)); + } + + // True if we've written and read everything, respectively. + // + bool wdone (false), rdone (false); + + for (size_t wn (0), rn (0), n (1); !rdone; ) + { + fd_set wds; + if (!wdone) + { + FD_ZERO (&wds); + FD_SET (sock, &wds); + } + + fd_set rds; + FD_ZERO (&rds); + FD_SET (sock, &rds); + + if (select (sock + 1, &rds, wdone ? 0 : &wds, 0, 0) == -1) + { + if (errno == EINTR) + continue; + + //@@ TODO + throw database_exception ("bad connection"); + } + + // Try to minimize the chance of blocking the server by first + // processing the result and then sending more queries. + // + if (FD_ISSET (sock, &rds)) + { + cerr << "PQconsumeInput" << endl; + + if (PQconsumeInput (conn) == 0) + { + // @@ TODO + throw database_exception (PQerrorMessage (conn)); + } + + while (PQisBusy (conn) == 0) + { + cerr << "PQgetResult" << endl; + + { + auto_handle res (PQgetResult (conn)); + + ExecStatusType stat (PGRES_FATAL_ERROR); + + if (!is_good_result (res, &stat)) + { + //@@ TODO: cannot throw/return until process all results. + + // An auto-assigned object id should never cause a duplicate + // primary key. + // + if (returning_ == 0 && stat == PGRES_FATAL_ERROR) + { + string s (PQresultErrorField (res, PG_DIAG_SQLSTATE)); + + if (s == "23505") + return false; + } + + translate_error (conn_, res); + } + +#ifdef PIPELINE_SYNC + if (stat == PGRES_PIPELINE_SYNC) + { + assert (rn == n); + rdone = true; + break; + } +#endif + + if (returning_ != 0) + bind_result (returning_->bind, 1, res, 0, false); + } + + // We get a NULL result after each query result. + // + { + PGresult* end (PQgetResult (conn)); + assert (end == 0); + } + + ++rn; +#ifndef PIPELINE_SYNC + if (rn == n) + { + rdone = true; + break; + } +#endif + } + } + + if (!wdone && FD_ISSET (sock, &wds)) + { + // Send queries until we get blocked. This feels like a better + // overall strategy to keep the server busy compared to sending one + // query at a time and then re-checking if there is anything to read + // because the results of INSERT/UPDATE/DELETE are presumably small + // and quite a few of them can get buffered before the server gets + // blocked. + // + for (;;) + { + if (wn != n) + { + cerr << "PQsendQueryPrepared" << endl; + + if (PQsendQueryPrepared (conn, + name_, + static_cast (native_param_.count), + native_param_.values, + native_param_.lengths, + native_param_.formats, + 1) == 0) + { + // @@ TODO: probably want to check/mark connection. + // + throw database_exception (PQerrorMessage (conn)); + } + + ++wn; + +#ifdef PIPELINE_SYNC + if (wn == n) + { + cerr << "PQpipelineSync" << endl; + + if (PQpipelineSync (conn) == 0) + { + //@@ TODO + throw database_exception (PQerrorMessage (conn)); + } + } +#endif + } + + // PQflush() result: + // + // 0 -- success (queue is now empty) + // 1 -- blocked + // -1 -- error + // + int r (PQflush (conn)); + if (r == -1) + { + // @@ TODO (no doc PQerrorMessage() is the way to do it). + throw database_exception (PQerrorMessage (conn)); + } + + cerr << "PQflush " << r << endl; + + if (r == 0) + { + if (wn != n) + continue; + + wdone = true; + } + + break; // Blocked or done. + } + } + } + + if (PQexitPipelineMode (conn) == 0 || + PQsetnonblocking (conn, 0) == -1) + { + throw database_exception (PQerrorMessage (conn)); + } + + return true; + } +#else bool insert_statement:: execute () { @@ -806,6 +1027,7 @@ namespace odb return true; } +#endif // // update_statement -- cgit v1.1