From 0f9cfacd6cc45f78f1453a8eeb7ffa542dc5dc48 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 15 Jul 2015 18:43:03 +0200 Subject: Implement SQLite incremental BLOB/TEXT I/O --- odb/sqlite/statement.cxx | 163 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 145 insertions(+), 18 deletions(-) (limited to 'odb/sqlite/statement.cxx') diff --git a/odb/sqlite/statement.cxx b/odb/sqlite/statement.cxx index 3bd5e4c..f7ce632 100644 --- a/odb/sqlite/statement.cxx +++ b/odb/sqlite/statement.cxx @@ -42,6 +42,12 @@ namespace odb } void statement:: + clear () + { + reset (); + } + + void statement:: init (const char* text, std::size_t text_size, statement_kind sk, @@ -49,8 +55,6 @@ namespace odb bool optimize) { active_ = false; - prev_ = 0; - next_ = this; string tmp; if (proc != 0) @@ -58,23 +62,25 @@ namespace odb switch (sk) { case statement_select: - process_select (text, + process_select (tmp, + text, &proc->bind->buffer, proc->count, sizeof (bind), '"', '"', - optimize, - tmp); + optimize); break; case statement_insert: - process_insert (text, + process_insert (tmp, + text, &proc->bind->buffer, proc->count, sizeof (bind), '?', - tmp); + '$'); break; case statement_update: - process_update (text, + process_update (tmp, + text, &proc->bind->buffer, proc->count, sizeof (bind), '?', - tmp); + '$'); break; case statement_delete: case statement_generic: @@ -102,9 +108,10 @@ namespace odb { // Temporarily store the statement text in prev_ so that // text() which may be called by the tracer can access it. + // Dirty but efficient. // #if SQLITE_VERSION_NUMBER >= 3005003 - prev_ = reinterpret_cast (const_cast (text)); + prev_ = reinterpret_cast (const_cast (text)); #endif t->prepare (conn_, *this); #if SQLITE_VERSION_NUMBER >= 3005003 @@ -159,10 +166,11 @@ namespace odb #endif } - void statement:: + bool statement:: bind_param (const bind* p, size_t n) { int e (SQLITE_OK); + bool r (false); // SQLite parameters are counted from 1. // @@ -231,11 +239,25 @@ namespace odb SQLITE_STATIC); break; } + case bind::stream: + { +#if SQLITE_VERSION_NUMBER >= 3004000 + e = sqlite3_bind_zeroblob (stmt_, + c, + static_cast (*b.size)); + r = true; +#else + assert (false); +#endif + break; + } } } if (e != SQLITE_OK) translate_error (e, conn_); + + return r; } bool statement:: @@ -254,6 +276,9 @@ namespace odb int c (col++); + if (b.type == bind::stream) + col++; // Skip ROWID value that follows. + if (truncated && (b.truncated == 0 || !*b.truncated)) continue; @@ -320,6 +345,27 @@ namespace odb memcpy (b.buffer, d, *b.size); break; } + case bind::stream: + { + stream_buffers& sb (*static_cast (b.buffer)); + + // SQLite documentation states that these are valid until the + // statement is finalized (or reprepared). For our case, we + // only need it to stay alive until we call set_value() which + // we do while executing the statement (i.e., we don't copy + // images for later processing). + // + sb.db.in = sqlite3_column_database_name (stmt_, c); + sb.table.in = sqlite3_column_table_name (stmt_, c); + sb.column.in = sqlite3_column_origin_name (stmt_, c); + + // The ROWID comes in the following column. + // + sb.rowid.in = static_cast ( + sqlite3_column_int64 (stmt_, c + 1)); + + break; + } } } @@ -333,6 +379,61 @@ namespace odb return r; } + void statement:: + stream_param (const bind* p, size_t n, const stream_data& d) + { + // Code similar to bind_param(). + // + for (size_t i (0), j (1); i < n; ++i) + { + const bind& b (p[i]); + + if (b.buffer == 0) // Skip NULL entries. + continue; + + int c (static_cast (j++)); + + if ((b.is_null != 0 && *b.is_null) || b.type != bind::stream) + continue; + + // Get column name. + // + const char* col (sqlite3_bind_parameter_name (stmt_, c)); + assert (col != 0); // Statement doesn't contain column name. + + stream_buffers& sb (*static_cast (b.buffer)); + + *sb.db.out = d.db; + *sb.table.out = d.table; + *sb.column.out = col + 1; // Skip '$'. + *sb.rowid.out = d.rowid; + } + } + + inline void + update_hook (void* v, const char* db, const char* table, long long rowid) + { + statement::stream_data& d (*static_cast (v)); + d.db = db; + d.table = table; + d.rowid = rowid; + } + + extern "C" void + odb_sqlite_update_hook (void* v, + int, + const char* db, + const char* table, +#if SQLITE_VERSION_NUMBER >= 3005000 + sqlite3_int64 rowid +#else + sqlite_int64 rowid +#endif + ) + { + update_hook (v, db, table, static_cast (rowid)); + } + // generic_statement // @@ -357,7 +458,7 @@ namespace odb generic_statement:: generic_statement (connection_type& conn, const char* text, - std::size_t text_size) + size_t text_size) : statement (conn, text, text_size, statement_generic, 0, false), @@ -640,12 +741,16 @@ namespace odb t->execute (conn_, *this); } - bind_param (param_.bind, param_.count); + sqlite3* h (conn_.handle ()); + bool stream (bind_param (param_.bind, param_.count)); + + stream_data sd; + if (stream) + sqlite3_update_hook (h, &odb_sqlite_update_hook, &sd); int e; #ifdef LIBODB_SQLITE_HAVE_UNLOCK_NOTIFY - sqlite3* h (conn_.handle ()); while ((e = sqlite3_step (stmt_)) == SQLITE_LOCKED) { if (sqlite3_extended_errcode (h) != SQLITE_LOCKED_SHAREDCACHE) @@ -658,6 +763,9 @@ namespace odb e = sqlite3_step (stmt_); #endif + if (stream) + sqlite3_update_hook (h, 0, 0); // Clear the hook. + // sqlite3_step() will return a detailed error code only if we used // sqlite3_prepare_v2(). Otherwise, sqlite3_reset() returns the // error. @@ -684,6 +792,11 @@ namespace odb translate_error (e, conn_); } + // Stream parameters, if any. + // + if (stream) + stream_param (param_.bind, param_.count, sd); + if (returning_ != 0) { bind& b (returning_->bind[0]); @@ -691,7 +804,7 @@ namespace odb *b.is_null = false; *static_cast (b.buffer) = static_cast ( - sqlite3_last_insert_rowid (conn_.handle ())); + sqlite3_last_insert_rowid (h)); } return true; @@ -735,10 +848,14 @@ namespace odb t->execute (conn_, *this); } - bind_param (param_.bind, param_.count); + sqlite3* h (conn_.handle ()); + bool stream (bind_param (param_.bind, param_.count)); + + stream_data sd; + if (stream) + sqlite3_update_hook (h, &odb_sqlite_update_hook, &sd); int e; - sqlite3* h (conn_.handle ()); #ifdef LIBODB_SQLITE_HAVE_UNLOCK_NOTIFY while ((e = sqlite3_step (stmt_)) == SQLITE_LOCKED) @@ -753,6 +870,9 @@ namespace odb e = sqlite3_step (stmt_); #endif + if (stream) + sqlite3_update_hook (h, 0, 0); // Clear the hook. + // sqlite3_step() will return a detailed error code only if we used // sqlite3_prepare_v2(). Otherwise, sqlite3_reset() returns the // error. @@ -768,7 +888,14 @@ namespace odb #endif translate_error (e, conn_); - return static_cast (sqlite3_changes (h)); + int r (sqlite3_changes (h)); + + // Stream parameters, if any. + // + if (stream && r != 0) + stream_param (param_.bind, param_.count, sd); + + return static_cast (r); } // delete_statement -- cgit v1.1