From a6df72a09ef63f4b03bb7f7fb8866640040b4fef Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Thu, 9 Oct 2014 11:22:06 +0200 Subject: Initial bulk erase implementation --- odb/mssql/simple-object-statements.txx | 2 +- odb/mssql/statement.cxx | 387 ++++++++++++++++++++++----------- odb/mssql/statement.hxx | 72 ++++-- odb/mssql/statement.ixx | 33 +++ 4 files changed, 353 insertions(+), 141 deletions(-) diff --git a/odb/mssql/simple-object-statements.txx b/odb/mssql/simple-object-statements.txx index 9edd380..fa1b6e7 100644 --- a/odb/mssql/simple-object-statements.txx +++ b/odb/mssql/simple-object-statements.txx @@ -55,7 +55,7 @@ namespace odb id_column_count, object_traits::batch, sizeof (id_image_type), - 0), + status_), od_ (update_image_bind_ + update_column_count) { image_[0].version = 0; // @@ TODO [0] diff --git a/odb/mssql/statement.cxx b/odb/mssql/statement.cxx index f3456fb..12fc21c 100644 --- a/odb/mssql/statement.cxx +++ b/odb/mssql/statement.cxx @@ -276,42 +276,10 @@ namespace odb } void statement:: - bind_param (bind* b, size_t n, size_t skip, SQLUSMALLINT* status) + bind_param (bind* b, size_t n) { SQLRETURN r; - // Setup row-wise batch operation. We set the actual number of - // rows in the batch in execute(). - // - param_skip_ = skip; - - if (param_skip_ != 0) - { - r = SQLSetStmtAttr (stmt_, - SQL_ATTR_PARAM_BIND_TYPE, - (SQLPOINTER) param_skip_, - 0); - - if (!SQL_SUCCEEDED (r)) - translate_error (r, conn_, stmt_); - - r = SQLSetStmtAttr (stmt_, - SQL_ATTR_PARAMS_PROCESSED_PTR, - (SQLPOINTER) &processed_, - 0); - - if (!SQL_SUCCEEDED (r)) - translate_error (r, conn_, stmt_); - - r = SQLSetStmtAttr (stmt_, - SQL_ATTR_PARAM_STATUS_PTR, - (SQLPOINTER) status, - 0); - - if (!SQL_SUCCEEDED (r)) - translate_error (r, conn_, stmt_); - } - SQLUSMALLINT i (0); for (bind* end (b + n); b != end; ++b) { @@ -551,8 +519,6 @@ namespace odb t->execute (conn_, *this); } - processed_ = 0; - SQLRETURN r (SQLExecute (stmt_)); if (r == SQL_NEED_DATA) @@ -753,6 +719,115 @@ namespace odb } // + // bulk_statement + // + bulk_statement:: + ~bulk_statement () {} + + void bulk_statement:: + init (size_t skip) + { + // Setup row-wise batch operation. We set the actual number of + // rows in the batch in execute(). + // + SQLRETURN r; + + r = SQLSetStmtAttr (stmt_, + SQL_ATTR_PARAM_BIND_TYPE, + (SQLPOINTER) skip, + 0); + + if (!SQL_SUCCEEDED (r)) + translate_error (r, conn_, stmt_); + + r = SQLSetStmtAttr (stmt_, + SQL_ATTR_PARAMS_PROCESSED_PTR, + (SQLPOINTER) &processed_, + 0); + + if (!SQL_SUCCEEDED (r)) + translate_error (r, conn_, stmt_); + + r = SQLSetStmtAttr (stmt_, + SQL_ATTR_PARAM_STATUS_PTR, + (SQLPOINTER) status_, + 0); + + if (!SQL_SUCCEEDED (r)) + translate_error (r, conn_, stmt_); + } + + SQLRETURN bulk_statement:: + execute (size_t n, multiple_exceptions* mex) + { + mex_ = mex; + + if (status_ != 0) + { + SQLRETURN r (SQLSetStmtAttr (stmt_, + SQL_ATTR_PARAMSET_SIZE, + (SQLPOINTER) n, + 0)); + + if (!SQL_SUCCEEDED (r)) + translate_error (r, conn_, stmt_); + + // Some SQL* functions would only update the status in case of + // an error. + // + memset (status_, 0, sizeof (status_[0]) * n); + } + + processed_ = 0; + SQLRETURN r (statement::execute ()); + + // If the statement failed as a whole, assume only the first row + // was attempted (and failed). Otherwise, the documentation says + // that the native client driver keeps processing remaining rows + // even in case of an error. + // + i_ = 0; + n_ = (SQL_SUCCEEDED (r) ? n : 1); + + if (mex_ != 0) + { + mex_->current (i_); + mex_->attempted (processed_); + } + + if (!SQL_SUCCEEDED (r)) + { + if (mex_ != 0) + mex_->fatal (true); // An incomplete batch is always fatal. + + return r; + } + + cerr << "processed: " << processed_ << endl; + + for (unsigned i (0); i != processed_; ++i) + { + cerr << "[" << i << "] " << status_[i] << " "; + + if (status_[i] == SQL_PARAM_ERROR) + cerr << "error "; + else if (status_[i] == SQL_PARAM_SUCCESS || + status_[i] == SQL_PARAM_SUCCESS_WITH_INFO) + cerr << "ok"; + else if (status_[i] == SQL_PARAM_UNUSED) + cerr << "unused"; + else if (status_[i] == SQL_PARAM_DIAG_UNAVAILABLE) + cerr << "unavailable"; + else + cerr << "?"; + + cerr << endl; + } + + return r; + } + + // // select_statement // select_statement:: @@ -907,16 +982,13 @@ namespace odb bool returning_id, bool returning_version, binding* returning) - : statement (conn, - text, statement_insert, - (process ? ¶m : 0), false), - returning_id_ (returning_id), returning_version_ (returning_version), - status_ (param.status) + : bulk_statement (conn, + text, statement_insert, + (process ? ¶m : 0), false, + param.batch, param.skip, param.status), + returning_id_ (returning_id), returning_version_ (returning_version) { - //@@ Just pass binding already! And use batch size, not skip to - // decide if it is batch. - // - bind_param (param.bind, param.count, param.skip, status_); + bind_param (param.bind, param.count); if (returning_id_ || returning_version_) init_result (*returning); @@ -931,14 +1003,14 @@ namespace odb bool returning_version, binding* returning, bool copy_text) - : statement (conn, - text, statement_insert, - (process ? ¶m : 0), false, - copy_text), - returning_id_ (returning_id), returning_version_ (returning_version), - status_ (param.status) + : bulk_statement (conn, + text, statement_insert, + (process ? ¶m : 0), false, + param.batch, param.skip, param.status, + copy_text), + returning_id_ (returning_id), returning_version_ (returning_version) { - bind_param (param.bind, param.count, param.skip, status_); + bind_param (param.bind, param.count); if (returning_id_ || returning_version_) init_result (*returning); @@ -954,7 +1026,7 @@ namespace odb // for one of the inserted columns is supplied at execution // (long data). // - batch_ = strstr (text_, "OUTPUT INSERTED.") == 0 && + text_batch_ = strstr (text_, "OUTPUT INSERTED.") == 0 && strstr (text_, "output inserted.") == 0; SQLRETURN r; @@ -1001,28 +1073,6 @@ namespace odb size_t insert_statement:: execute (size_t n, multiple_exceptions* mex) { - mex_ = mex; - - //@@ TODO Should move to statement, also don't call if no batch (will - // need a flag). - // - { - SQLRETURN r (SQLSetStmtAttr (stmt_, - SQL_ATTR_PARAMSET_SIZE, - (SQLPOINTER) n, - 0)); - - if (!SQL_SUCCEEDED (r)) - translate_error (r, conn_, stmt_); - - // Some SQL* functions would only update the status in case of - // an error. - // - memset (status_, 0, sizeof (status_[0]) * n); - } - - SQLRETURN r (statement::execute ()); - // The batch INSERT works in two different ways, depending on // whether we have the OUTPUT clause. If there is no OUTPUT, // then all the rows are processed inside the SQLExecute() @@ -1034,50 +1084,16 @@ namespace odb // (value in n_). Note that in the OUTPUT case if there is an // error, the processed count seems to jump by 2 for some reason. // - // If the statement failed as a whole, assume only the first row - // was attempted (and failed). Otherwise, the documentation says - // that the native client driver keeps processing remaining rows - // even in case of an error. - // - i_ = 0; - n_ = (SQL_SUCCEEDED (r) ? n : 1); - - if (mex_ != 0) - { - mex_->current (i_); - mex_->attempted (processed_); - } + SQLRETURN r (bulk_statement::execute (n, mex)); + // Statement failed as a whole, assume only one row was attempted. + // if (!SQL_SUCCEEDED (r)) { - if (mex_ != 0) - mex_->fatal (true); // An incomplete batch is always fatal. - fetch (r); return n_; } - cerr << "processed: " << processed_ << endl; - - for (unsigned i (0); i != processed_; ++i) - { - cerr << "[" << i << "] " << status_[i] << " "; - - if (status_[i] == SQL_PARAM_ERROR) - cerr << "error "; - else if (status_[i] == SQL_PARAM_SUCCESS || - status_[i] == SQL_PARAM_SUCCESS_WITH_INFO) - cerr << "ok"; - else if (status_[i] == SQL_PARAM_UNUSED) - cerr << "unused"; - else if (status_[i] == SQL_PARAM_DIAG_UNAVAILABLE) - cerr << "unavailable"; - else - cerr << "?"; - - cerr << endl; - } - if (n == 1) // Note: not n_! fetch (SQL_SUCCESS); // Non-batch case. else @@ -1196,7 +1212,7 @@ namespace odb // if (result_ && (returning_id_ || returning_version_)) { - if (batch_) + if (text_batch_) { r = SQLMoreResults (stmt_); @@ -1237,13 +1253,21 @@ namespace odb // if (i != i_) { + // Multiple exceptions cannot be NULL since this is a batch. + // + mex_->current (++i_); + // Only in case of the OUTPUT clause do we have multiple result sets. // if (returning_id_ || returning_version_) { r = SQLMoreResults (stmt_); - cerr << "more [" << (i_ + 1) << "] " << status_[i_ + 1] << " " + // The actually processed count could have changed (see execute()). + // + mex_->attempted (processed_); + + cerr << "more [" << (i_) << "] " << status_[i_] << " " << SQL_SUCCEEDED (r) << endl; cerr << "more processed: " << processed_ << endl; @@ -1255,15 +1279,8 @@ namespace odb "?????", "multiple result sets expected from an array of parameters"); } - - // The actually processed count could have changed (see execute()). - // Multiple exceptions cannot be NULL since this is a batch. - // - mex_->attempted (processed_); } - mex_->current (++i_); - fetch (status_[i_] == SQL_PARAM_SUCCESS || status_[i_] == SQL_PARAM_SUCCESS_WITH_INFO ? SQL_SUCCESS : SQL_ERROR); @@ -1412,9 +1429,10 @@ namespace odb delete_statement (connection_type& conn, const string& text, binding& param) - : statement (conn, - text, statement_delete, - 0, false) + : bulk_statement (conn, + text, statement_delete, + 0, false, + param.batch, param.skip, param.status) { bind_param (param.bind, param.count); } @@ -1424,14 +1442,128 @@ namespace odb const char* text, binding& param, bool copy_text) - : statement (conn, - text, statement_delete, - 0, false, - copy_text) + : bulk_statement (conn, + text, statement_delete, + 0, false, + param.batch, param.skip, param.status, + copy_text) { bind_param (param.bind, param.count); } + size_t delete_statement:: + execute (size_t n, multiple_exceptions* mex) + { + /* + @@ ?? + + // The batch INSERT works in two different ways, depending on + // whether we have the OUTPUT clause. If there is no OUTPUT, + // then all the rows are processed inside the SQLExecute() + // call. If, however, there is OUTPUT, then the rows are + // processed one at a time as we consume the results with + // the SQLMoreResults() call below. Thus we in effect have + // two counts: the "processed so far" as set by the API + // (SQL_ATTR_PARAMS_PROCESSED_PTR) and the "to be processed" + // (value in n_). Note that in the OUTPUT case if there is an + // error, the processed count seems to jump by 2 for some reason. + // + */ + + SQLRETURN r (bulk_statement::execute (n, mex)); + + // Statement failed as a whole, assume only one row was attempted. + // + if (!SQL_SUCCEEDED (r)) + { + fetch (r); + return n_; + } + + if (n == 1) // Note: not n_! + fetch (SQL_SUCCESS); // Non-batch case. + else + fetch (status_[i_] == SQL_PARAM_SUCCESS || + status_[i_] == SQL_PARAM_SUCCESS_WITH_INFO + ? SQL_SUCCESS : SQL_ERROR); + + return n_; + } + + void delete_statement:: + fetch (SQLRETURN r) + { + /* + @@ + + // SQL_NO_DATA indicates that the statement hasn't affected any rows. + // + if (r == SQL_NO_DATA) + return 0; + + */ + + if (SQL_SUCCEEDED (r)) + { + // Get the number of affected rows. + // + SQLLEN rows; + r = SQLRowCount (stmt_, &rows); + + if (!SQL_SUCCEEDED (r)) + translate_error (r, conn_, stmt_); + + cerr << "fetch: " << rows << endl; + + result_ = static_cast (rows); + } + else + translate_error (r, conn_, stmt_, i_, mex_); // Can return. + } + + unsigned long long delete_statement:: + result (size_t i) + { + assert ((i_ == i || i_ + 1 == i) && i < n_); + + SQLRETURN r; + + // Get to the next result set if necessary. + // + if (i != i_) + { + // Multiple exceptions cannot be NULL since this is a batch. + // + mex_->current (++i_); + + r = SQLMoreResults (stmt_); + + // The actually processed count could have changed (see execute()). + // + mex_->attempted (processed_); + + cerr << "more [" << (i_) << "] " << status_[i_] << " " + << SQL_SUCCEEDED (r) << " " << (r == SQL_NO_DATA) << endl; + + cerr << "more processed: " << processed_ << endl; + + if (r == SQL_NO_DATA) + { + throw database_exception ( + 0, + "?????", + "multiple result sets expected from an array of parameters"); + } + + fetch (status_[i_] == SQL_PARAM_SUCCESS || + status_[i_] == SQL_PARAM_SUCCESS_WITH_INFO + ? SQL_SUCCESS : SQL_ERROR); + } + + return result_; + } + + /* unsigned long long delete_statement:: execute () { @@ -1455,5 +1587,6 @@ namespace odb return static_cast (rows); } + */ } } diff --git a/odb/mssql/statement.hxx b/odb/mssql/statement.hxx index 69d144b..8da71c2 100644 --- a/odb/mssql/statement.hxx +++ b/odb/mssql/statement.hxx @@ -97,8 +97,7 @@ namespace odb protected: void - bind_param (bind*, std::size_t count, - std::size_t skip = 0, SQLUSMALLINT* status = 0); + bind_param (bind*, std::size_t count); // Return the actual number of columns bound. // @@ -126,10 +125,46 @@ namespace odb std::string text_copy_; const char* text_; auto_handle stmt_; + }; + + class LIBODB_MSSQL_EXPORT bulk_statement: public statement + { + public: + virtual + ~bulk_statement () = 0; + + bulk_statement (connection_type&, + const std::string& text, + statement_kind, + const binding* process, + bool optimize, + std::size_t batch, + std::size_t skip, + SQLUSMALLINT* status); + + bulk_statement (connection_type&, + const char* text, + statement_kind, + const binding* process, + bool optimize, + std::size_t batch, + std::size_t skip, + SQLUSMALLINT* status, + bool copy_text); - std::size_t param_skip_; // If 0, no batch is used. - SQLULEN processed_; // Number of batch rows processed. - // @@ TODO could be local var in execute()? Nop. + SQLRETURN + execute (std::size_t n, multiple_exceptions*); + + private: + void + init (std::size_t skip); + + protected: + SQLULEN processed_; // Number of batch rows processed so far. + SQLUSMALLINT* status_; // Row status array. + std::size_t n_; // Actual batch size. + std::size_t i_; // Position in result. + multiple_exceptions* mex_; }; class LIBODB_MSSQL_EXPORT select_statement: public statement @@ -242,7 +277,7 @@ namespace odb select_statement* s_; }; - class LIBODB_MSSQL_EXPORT insert_statement: public statement + class LIBODB_MSSQL_EXPORT insert_statement: public bulk_statement { public: virtual @@ -290,12 +325,8 @@ namespace odb private: bool returning_id_; bool returning_version_; - bool batch_; + bool text_batch_; - multiple_exceptions* mex_; - std::size_t n_; // Batch size. - std::size_t i_; // Position in result. - SQLUSMALLINT* status_; // @@ TODO: move to statement? bool result_; }; @@ -339,7 +370,7 @@ namespace odb SQLLEN version_size_ind_; }; - class LIBODB_MSSQL_EXPORT delete_statement: public statement + class LIBODB_MSSQL_EXPORT delete_statement: public bulk_statement { public: virtual @@ -354,12 +385,27 @@ namespace odb binding& param, bool copy_text = true); + // Return the number of parameter rows (out of n) that were attempted. + // + std::size_t + execute (std::size_t n = 1, multiple_exceptions* = 0); + + // Return the number of rows affected (deleted) by the parameter + // row. Errors are reported by throwing exceptions. + // unsigned long long - execute (); + result (std::size_t i = 0); private: delete_statement (const delete_statement&); delete_statement& operator= (const delete_statement&); + + private: + void + fetch (SQLRETURN); + + private: + unsigned long long result_; }; } } diff --git a/odb/mssql/statement.ixx b/odb/mssql/statement.ixx index b2c091e..66316f4 100644 --- a/odb/mssql/statement.ixx +++ b/odb/mssql/statement.ixx @@ -6,6 +6,39 @@ namespace odb { namespace mssql { + inline bulk_statement:: + bulk_statement (connection_type& c, + const std::string& text, + statement_kind k, + const binding* process, + bool optimize, + std::size_t batch, + std::size_t skip, + SQLUSMALLINT* status) + : statement (c, text, k, process, optimize), + status_ (batch == 1 ? 0 : status) + { + if (batch != 1) + init (skip); + } + + inline bulk_statement:: + bulk_statement (connection_type& c, + const char* text, + statement_kind k, + const binding* process, + bool optimize, + std::size_t batch, + std::size_t skip, + SQLUSMALLINT* status, + bool copy_text) + : statement (c, text, k, process, optimize, copy_text), + status_ (batch == 1 ? 0 : status) + { + if (batch != 1) + init (skip); + } + inline unsigned long long update_statement:: version () { -- cgit v1.1