aboutsummaryrefslogtreecommitdiff
path: root/pgsql/bulk/driver.cxx
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2022-07-26 08:47:25 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2022-07-26 08:47:25 +0200
commit59e1be91b58c5d8591fcfa0b103ac6ed167a45fe (patch)
tree071bf02283d19633de673db29e9fb01938d2f4db /pgsql/bulk/driver.cxx
parentcc08639dd0da91a8be97f883cf8b90c757f79071 (diff)
Release version 2.5.0-b.23v2.5.0-b.23
Diffstat (limited to 'pgsql/bulk/driver.cxx')
-rw-r--r--pgsql/bulk/driver.cxx340
1 files changed, 340 insertions, 0 deletions
diff --git a/pgsql/bulk/driver.cxx b/pgsql/bulk/driver.cxx
new file mode 100644
index 0000000..fd629b8
--- /dev/null
+++ b/pgsql/bulk/driver.cxx
@@ -0,0 +1,340 @@
+// file : pgsql/savepoint/driver.cxx
+// license : GNU GPL v2; see accompanying LICENSE file
+
+// Test transaction savepoints.
+//
+
+#include <libpq-fe.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <stddef.h>
+#include <assert.h>
+#include <sys/select.h>
+
+// Note: hack.
+//
+#include <arpa/inet.h>
+#define htonll(x) ((((long long)htonl(x)) << 32) + htonl((x) >> 32))
+
+static const size_t columns = 3;
+
+struct data
+{
+ long long id;
+ long long idata;
+ const char* sdata;
+};
+
+static char* values[columns];
+static int lengths[columns];
+static int formats[columns] = {1, 1, 1};
+
+static const unsigned int types[columns] = {
+ 20, // int8
+ 20, // int8
+ 25 // text
+};
+
+static void
+init (const struct data* d)
+{
+ values[0] = (char*)&d->id;
+ lengths[0] = sizeof (d->id);
+
+ values[1] = (char*)&d->idata;
+ lengths[1] = sizeof (d->idata);
+
+ values[2] = (char*)d->sdata;
+ lengths[2] = strlen (d->sdata);
+}
+
+static void
+execute (PGconn* conn, const struct data* ds, size_t n)
+{
+ int sock = PQsocket (conn);
+ assert (sock != -1);
+
+ if (PQsetnonblocking (conn, 1) == -1 ||
+ PQenterPipelineMode (conn) == 0)
+ assert (false);
+
+ // True if we've written and read everything, respectively.
+ //
+ bool wdone = false;
+ bool rdone = false;
+
+ size_t wn = 0;
+ size_t rn = 0;
+
+ while (!rdone)
+ {
+ fd_set wds;
+ if (!wdone)
+ {
+ FD_ZERO (&wds);
+ FD_SET (sock, &wds);
+ }
+
+ fd_set rds;
+ FD_ZERO (&rds);
+ FD_SET (sock, &rds);
+
+ if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+
+ assert (false);
+ }
+
+ // Try to minimize the chance of blocking the server by first processing
+ // the result and then sending more queries.
+ //
+ if (FD_ISSET (sock, &rds))
+ {
+ if (PQconsumeInput (conn) == 0)
+ assert (false);
+
+ while (wn > rn && PQisBusy (conn) == 0)
+ {
+ //fprintf (stderr, "PQgetResult %zu\n", rn);
+
+ PGresult* res = PQgetResult (conn);
+ assert (res != NULL);
+ ExecStatusType stat = PQresultStatus (res);
+
+ if (stat == PGRES_PIPELINE_SYNC)
+ {
+ assert (wdone && rn == n);
+ PQclear (res);
+ rdone = true;
+ break;
+ }
+
+ if (stat == PGRES_FATAL_ERROR)
+ {
+ const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);
+
+ if (strcmp (s, "23505") == 0)
+ fprintf (stderr, "duplicate id at %zu\n", rn);
+ }
+
+ PQclear (res);
+ assert (rn != n);
+ ++rn;
+
+ // We get a NULL result after each query result.
+ //
+ {
+ PGresult* end = PQgetResult (conn);
+ assert (end == NULL);
+ }
+ }
+ }
+
+ if (!wdone && FD_ISSET (sock, &wds))
+ {
+ // Send queries until we get blocked (write-biased). This feels like
+ // a better overall strategy to keep the server busy compared to
+ // sending one query at a time and then re-checking if there is
+ // anything to read because the results of INSERT/UPDATE/DELETE are
+ // presumably small and quite a few of them can get buffered before
+ // the server gets blocked.
+ //
+ for (;;)
+ {
+ if (wn < n)
+ {
+ //fprintf (stderr, "PQsendQueryPrepared %zu\n", wn);
+
+ init (ds + wn);
+
+ if (PQsendQueryPrepared (conn,
+ "persist_object",
+ (int)(columns),
+ values,
+ lengths,
+ formats,
+ 1) == 0)
+ assert (false);
+
+ if (++wn == n)
+ {
+ if (PQpipelineSync (conn) == 0)
+ assert (false);
+
+ ++wn;
+ }
+ }
+
+ // PQflush() result:
+ //
+ // 0 -- success (queue is now empty)
+ // 1 -- blocked
+ // -1 -- error
+ //
+ int r = PQflush (conn);
+ assert (r != -1);
+
+ if (r == 0)
+ {
+ if (wn < n)
+ {
+ // If we continue here, then we are write-biased. And if we
+ // break, then we are read-biased.
+ //
+#if 0
+ break;
+#else
+ continue;
+#endif
+ }
+
+ wdone = true;
+ }
+
+ break; // Blocked or done.
+ }
+ }
+ }
+
+ if (PQexitPipelineMode (conn) == 0 ||
+ PQsetnonblocking (conn, 0) == -1)
+ assert (false);
+}
+
+static void
+test (PGconn* conn)
+{
+ const size_t batch = 500;
+ struct data ds[batch];
+
+ for (size_t i = 0; i != batch; ++i)
+ {
+ ds[i].id = htonll (i == batch / 2 ? i - 1 : i); // Cause duplicate PK.
+ ds[i].idata = htonll (i);
+ ds[i].sdata = "abc";
+ }
+
+ // Prepare the statement.
+ //
+ {
+ PGresult* res = PQprepare (
+ conn,
+ "persist_object",
+ "INSERT INTO \"pgsql_bulk_object\" "
+ "(\"id\", "
+ "\"idata\", "
+ "\"sdata\") "
+ "VALUES "
+ "($1, $2, $3)",
+ (int)(columns),
+ types);
+ assert (PQresultStatus (res) == PGRES_COMMAND_OK);
+ PQclear (res);
+ }
+
+ // Begin transaction.
+ //
+ {
+ PGresult* res = PQexec (conn, "begin");
+ assert (PQresultStatus (res) == PGRES_COMMAND_OK);
+ PQclear (res);
+ }
+
+ execute (conn, ds, batch);
+
+ // Commit transaction.
+ //
+ {
+ PGresult* res = PQexec (conn, "commit");
+ assert (PQresultStatus (res) == PGRES_COMMAND_OK);
+ PQclear (res);
+ }
+}
+
+#include <memory> // std::auto_ptr
+#include <cassert>
+#include <iostream>
+
+#include <vector>
+
+#include <odb/pgsql/database.hxx>
+#include <odb/pgsql/transaction.hxx>
+
+#include <common/common.hxx>
+
+#include "test.hxx"
+#include "test-odb.hxx"
+
+using namespace std;
+namespace pgsql = odb::pgsql;
+using namespace pgsql;
+
+int
+main (int argc, char* argv[])
+{
+ try
+ {
+ auto_ptr<database> db (create_specific_database<database> (argc, argv));
+
+ connection_ptr cn (db->connection ());
+
+ if (false)
+ {
+ PGconn* conn (cn->handle ());
+ test (conn);
+ }
+
+ {
+ const unsigned long n (500);
+
+ vector<object> os;
+
+ for (unsigned long i (0); i != n; ++i)
+ {
+ os.push_back (object {i, i, string (i, 'x')});
+
+ if (i == n / 2)
+ os.push_back (object {i, i, to_string (i)});
+ }
+
+ {
+ transaction t (cn->begin ());
+ db->persist (os.begin (), os.end ());
+ t.commit ();
+ }
+
+ {
+ transaction t (cn->begin ());
+ db->find<object> (2);
+ t.commit ();
+ }
+
+ for (unsigned long i (0); i != n; ++i)
+ {
+ //assert (os[i].id == i + 1);
+ os[i].idata++;
+ }
+
+ {
+ transaction t (cn->begin ());
+ db->update (os.begin (), os.end ());
+ t.commit ();
+ }
+
+ {
+ transaction t (cn->begin ());
+ db->erase (os.begin (), os.end ());
+ t.commit ();
+ }
+ }
+ }
+ catch (const odb::exception& e)
+ {
+ cerr << e.what () << endl;
+ return 1;
+ }
+}