From 06583147eaa601764a789ae586384e38e05e9015 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 26 Nov 2014 15:04:56 +0200 Subject: Implement optimistic concurrency support in bulk operations Bulk update and SQL Server ROWVERSION not yet supported. --- odb/relational/mssql/header.cxx | 6 ++ odb/relational/mssql/source.cxx | 12 ++- odb/relational/source.cxx | 226 +++++++++++++++++++++++++++++++++++----- odb/relational/source.hxx | 5 +- odb/relational/validator.cxx | 10 +- 5 files changed, 218 insertions(+), 41 deletions(-) diff --git a/odb/relational/mssql/header.cxx b/odb/relational/mssql/header.cxx index 3bc2cbd..ff16950 100644 --- a/odb/relational/mssql/header.cxx +++ b/odb/relational/mssql/header.cxx @@ -53,6 +53,12 @@ namespace relational os << "static const bool rowversion = " << rv << ";" << endl; + + // Disable bulk update if we have ROWVERSION since we don't + // yet support batch extraction of the version. + // + if (rv && c.count ("bulk-update")) + c.remove ("bulk-update"); } virtual void diff --git a/odb/relational/mssql/source.cxx b/odb/relational/mssql/source.cxx index ffcfa3c..2ce8810 100644 --- a/odb/relational/mssql/source.cxx +++ b/odb/relational/mssql/source.cxx @@ -1118,21 +1118,25 @@ namespace relational } virtual string - optimistic_version_init (semantics::data_member& m) + optimistic_version_init (semantics::data_member& m, bool index) { sql_type t (parse_sql_type (column_type (m), m)); return t.type != sql_type::ROWVERSION ? "1" - : "version (sts.id_image ())"; + : (index + ? "version (sts.id_image (i))" + : "version (sts.id_image ())"); } virtual string - optimistic_version_increment (semantics::data_member& m) + optimistic_version_increment (semantics::data_member& m, bool index) { sql_type t (parse_sql_type (column_type (m), m)); return t.type != sql_type::ROWVERSION ? "1" - : "version (sts.id_image ())"; + : (index + ? "version (sts.id_image (i))" + : "version (sts.id_image ())"); } virtual bool diff --git a/odb/relational/source.cxx b/odb/relational/source.cxx index 6c8204c..fad4ac5 100644 --- a/odb/relational/source.cxx +++ b/odb/relational/source.cxx @@ -1568,6 +1568,41 @@ traverse_object (type& c) } } + // Set the optimistic concurrency version. + // + if (opt != 0) + { + // If we don't have auto id, then obj is a const reference. + // + string obj (auto_id ? "obj" : "const_cast< object_type& > (obj)"); + string init (optimistic_version_init (*opt, true)); + + if (!opt_ma_set->synthesized) + os << "// From " << location_string (opt_ma_set->loc, true) << endl; + + if (opt_ma_set->placeholder ()) + os << opt_ma_set->translate (obj, init) << ";" + << endl; + else + { + // If this member is const and we have a synthesized direct access, + // then cast away constness. Otherwise, we assume that the user- + // provided expression handles this. + // + bool cast (opt_ma_set->direct () && const_type (opt->type ())); + if (cast) + os << "const_cast< version_type& > (" << endl; + + os << opt_ma_set->translate (obj); + + if (cast) + os << ")"; + + os << " = " << init << ";" + << endl; + } + } + // Reset sections: loaded, unchanged. // for (user_sections::iterator i (uss.begin ()); i != uss.end (); ++i) @@ -2315,7 +2350,16 @@ traverse_object (type& c) os << "const id_type& id (" << endl << id_ma->translate ("obj") << ");"; - os << "init (sts.id_image (i), id);" + if (opt != 0) + { + if (!opt_ma_get->synthesized) + os << "// From " << location_string (opt_ma_get->loc, true) << endl; + + os << "const version_type& v (" << endl + << opt_ma_get->translate ("obj") << ");"; + } + + os << "init (sts.id_image (i), id" << (opt != 0 ? ", &v" : "") << ");" //@@ assumption: generate_grow false << "init (sts.image (i), obj, statement_update);" << "}"; @@ -2370,14 +2414,66 @@ traverse_object (type& c) << "mex.insert (i," << endl //@@ assumption: result_unknown << "(r == update_statement::result_unknown)," << endl - << "object_not_persistent ());" + << (opt == 0 ? "object_not_persistent" : "object_changed") << " ());" << "continue;" << "}" << "if (mex.fatal ())" << endl // Don't do any extra work. << "continue;" << endl - << "const object_type& obj (*objs[i]);" - << "callback (db, obj, callback_event::post_update);" + << "const object_type& obj (*objs[i]);"; + + // Update the optimistic concurrency version in the object member. + // + if (opt != 0) + { + // Object is passed as const reference so we need to cast away + // constness. + // + string obj ("const_cast< object_type& > (obj)"); + string inc (optimistic_version_increment (*opt, true)); + + if (!opt_ma_set->synthesized) + os << "// From " << location_string (opt_ma_set->loc, true) << endl; + + if (opt_ma_set->placeholder ()) + { + if (!opt_ma_get->synthesized) + os << "// From " << location_string (opt_ma_get->loc, true) << + endl; + + if (inc == "1") + os << opt_ma_set->translate ( + obj, opt_ma_get->translate ("obj") + " + 1") << ";"; + else + os << opt_ma_set->translate (obj, inc) << ";"; + + os << endl; + } + else + { + // If this member is const and we have a synthesized direct access, + // then cast away constness. Otherwise, we assume that the user- + // provided expression handles this. + // + bool cast (opt_ma_set->direct () && const_type (opt->type ())); + if (cast) + os << "const_cast< version_type& > (" << endl; + + os << opt_ma_set->translate (obj); + + if (cast) + os << ")"; + + if (inc == "1") + os << "++;"; + else + os << " = " << inc << ";"; + + os << endl; + } + } + + os << "callback (db, obj, callback_event::post_update);" << "pointer_cache_traits::update (db, obj);" << "}" // for << "}"; // update() @@ -2893,28 +2989,106 @@ traverse_object (type& c) << "const object_type** objs," << endl << "std::size_t n," << endl << "multiple_exceptions& mex)" - << "{" - << "id_type a[batch];" - << "const id_type* p[batch];" - << endl - << "for (std::size_t i (0); i != n; ++i)" - << "{" - << "const object_type& obj (*objs[i]);" - << "callback (db, obj, callback_event::pre_erase);" - << "a[i] = id (obj);" - << "p[i] = a + i;" - << "}" - << "n = erase (db, p, n, mex);" - << endl - << "if (mex.fatal ())" << endl // Don't do any extra work. - << "return;" - << endl - << "for (std::size_t i (0); i != n; ++i)" - << "{" - << "if (mex[i] == 0)" << endl // No pending exception. - << "callback (db, *objs[i], callback_event::post_erase);" - << "}" // for - << "}"; // erase() + << "{"; + + // In non-optimistic case delegate to erase(id). + // + if (opt == 0) + { + os << "id_type a[batch];" + << "const id_type* p[batch];" + << endl + << "for (std::size_t i (0); i != n; ++i)" + << "{" + << "const object_type& obj (*objs[i]);" + << "callback (db, obj, callback_event::pre_erase);" + << "a[i] = id (obj);" + << "p[i] = a + i;" + << "}" + << "n = erase (db, p, n, mex);" + << endl + << "if (mex.fatal ())" << endl // Don't do any extra work. + << "return;" + << endl + << "for (std::size_t i (0); i != n; ++i)" + << "{" + << "if (mex[i] == 0)" << endl // No pending exception. + << "callback (db, *objs[i], callback_event::post_erase);" + << "}"; // for + } + else + { + os << "using namespace " << db << ";" + << endl + << "ODB_POTENTIALLY_UNUSED (db);" + << endl + << db << "::connection& conn (" << endl + << db << "::transaction::current ().connection ());" + << "statements_type& sts (" << endl + << "conn.statement_cache ().find_object ());" + << endl + << "for (std::size_t i (0); i != n; ++i)" + << "{" + << "const object_type& obj (*objs[i]);" + << "callback (db, obj, callback_event::pre_erase);"; + + if (!id_ma->synthesized) + os << "// From " << location_string (id_ma->loc, true) << endl; + + os << "const id_type& id (" << endl + << id_ma->translate ("obj") << ");"; + + if (!opt_ma_get->synthesized) + os << "// From " << location_string (opt_ma_get->loc, true) << endl; + + os << "const version_type& v (" << endl + << opt_ma_get->translate ("obj") << ");"; + + os << "init (sts.id_image (i), id, &v);" + << "}"; + + os << "binding& idb (sts.id_image_binding ());" + //@@ assumption: generate_grow false + << "if (idb.version == 0)" + << "{" + << "bind (idb.bind, sts.id_image ());" + << "idb.version++;" + << "sts.optimistic_id_image_binding ().version++;" + << "}" + << "delete_statement& st (sts.optimistic_erase_statement ());" + << "n = st.execute (n, mex);" // Set to actual number of attempted. + << endl + << "for (std::size_t i (0); i != n; ++i)" + << "{" + << "unsigned long long r (st.result (i));" // Sets current in mex. + << endl + << "if (mex[i] != 0)" << endl // Pending exception from result(). + << "continue;" + << endl + << "if (r != 1)" + << "{" + << "mex.insert (i," << endl + //@@ assumption: result_unknown + << "(r == delete_statement::result_unknown)," << endl + << "object_changed ());" + << "continue;" + << "}" + << "if (mex.fatal ())" << endl // Don't do any extra work. + << "continue;" + << endl + << "const object_type& obj (*objs[i]);"; + + if (!id_ma->synthesized) + os << "// From " << location_string (id_ma->loc, true) << endl; + + os << "const id_type& id (" << endl + << id_ma->translate ("obj") << ");" + << "pointer_cache_traits::erase (db, id);" + << "callback (db, obj, callback_event::post_erase);" + << "}"; // for + } + + os << "}"; // erase() } // find (id) diff --git a/odb/relational/source.hxx b/odb/relational/source.hxx index 3754fcc..cbdec73 100644 --- a/odb/relational/source.hxx +++ b/odb/relational/source.hxx @@ -5491,7 +5491,7 @@ namespace relational } virtual string - optimistic_version_init (semantics::data_member&) + optimistic_version_init (semantics::data_member&, bool /*index*/ = false) { return "1"; } @@ -5499,7 +5499,8 @@ namespace relational // Returning "1" means increment by one. // virtual string - optimistic_version_increment (semantics::data_member&) + optimistic_version_increment (semantics::data_member&, + bool /*index*/ = false) { return "1"; } diff --git a/odb/relational/validator.cxx b/odb/relational/validator.cxx index 6c3a6fd..33d796d 100644 --- a/odb/relational/validator.cxx +++ b/odb/relational/validator.cxx @@ -409,17 +409,9 @@ namespace relational break; } - if (optimistic (c)) - { - error (l) << "bulk operations on optimistic objects are not " - "supported" << endl; - valid_ = false; - break; - } - bool update (true); - // If we have a change-updated section, then we cannot generate + // Unless we only have manually-updated sections, we cannot generate // the bulk update operation. // user_sections& uss (c.get ("user-sections")); -- cgit v1.1