Hi Stuart, thanks a lot for the insightful review. I think I have addressed everything except the single-non-tpc-store use case for which I need some clarification, see below. === modified file 'NEWS' > 'for now is only supported by psycopg2' is better written as > 'is only supported by PostgreSQL' Fixed. === modified file 'storm/store.py' > Should xid be an optional parameter, and we generate one using a > UUID if it isn't provided? I don't think this is a good idea for the reason you mention. Note that in the DB API it's *not* optional as well. Doing a two-phase commit when you have only one store doesn't make sense, I think, and if you have to writing something like: xid = store1.begin() xid.branch_qualifier = "store2" store2.begin(xid) doesn't feel good nor convenient. === added file 'storm/xid.py' +class Xid(object): + """ + Represent a transaction identifier compliant with the XA specification. + """ + + def __init__(self, format_id, global_transaction_id, branch_qualifier): + self.format_id = format_id + self.global_transaction_id = global_transaction_id + self.branch_qualifier = branch_qualifier > Should any or all of the constructor's parameters be optional? I don't really see a way that we could really provide sane defaults or generate them if they are not provided, but I'm open to ideas. In particular, global_transaction_id is expected to be passed to all the Xid objects that you build for the various store that you have (each store should have a different branch_qualifier though). === modified file 'storm/zope/zstorm.py' +from random import randint + if global_transaction_id is None: + # The the global transaction doesn't have an ID yet, let's create + # one in a way that it will be unique + global_transaction_id = "_storm_%032x" % randint(0, 2 ** 128) + txn.__storm_transaction_id__ = global_transaction_id > You should be using a UUID here rather than randint. Random > methods try to provide a random distribution. UUIDs guarantee (as > much as possible) to provide unique value. We are after the UUID > behavior here. Good point, fixed. + xid = Xid(0, global_transaction_id, zstorm.get_name(store)) + store.begin(xid) > This seems good. We get a unique global transaction id each > transaction that is shared between stores, helping us result > transactions lost during TPC failure. I think store names will > always be valid XID components, but we might want to check > this. They should, according to the XA specification the are just strings. Note that PostgreSQL doesn't really follow the XA specification, it just have the notion of a "transaction_id" that must be less than 200 bytes long. The psycopg2 driver will just compose the global_transaction_id and branch_qualifier to build the transaction_id. > In any case, psycopg2 should raise an exception if it is > given invalid input and it is nice we have a meaningful name. Right. > However, for backends that don't support TPC we are > moving the actual commit from happening in the commit() method > to the tpc_finish() method. Uhm, the branch is *not* moving that. The StoreDataManager.commit method reads: def commit(self, txn): if self._store._tpc: self._store.prepare() else: try: self._store.commit() finally: self._hook_register_transaction_event() so it explicitly keeps the old behavior and we *do* commit there stores which are not set for TPC. Perhaps you mean that the branch *should* change this? That would indeed support the use case that you point about a single store not capable of TPC and all others set for TPC. By the way, note that according to the documention of IDataManager commiting the changes in tpc_finish() should be actually the correct behavior, whereas the one we have now for stores not set for TPC (running Store.commit in StoreDataManager.commit) is not: def commit(transaction): """Commit modifications to registered objects. Save changes to be made persistent if the transaction commits (if tpc_finish is called later). If tpc_abort is called later, changes must not persist. This includes conflict detection and handling. If no conflicts or errors occur, the data manager should be prepared to make the changes persist when tpc_finish is called. """ def tpc_finish(transaction): """Indicate confirmation that the transaction is done. Make all changes to objects modified by this transaction persist. transaction is the ITransaction instance associated with the transaction being committed. This should never fail. If this raises an exception, the database is not expected to maintain consistency; it's a serious error. """ So if we want to support the use case you mention, I can change the current behavior and run store.commit() in tpc_finish() for stores not set for TPC, and change sortKey() to have non-TPC stores commit first. This would change the current behavior (though in a way more compliant with the IDataManager contract), but I can't think of bad consequences. === modified file 'tests/databases/base.py' +class TwoPhaseCommitTest(object): + + def test_begin(self): + """ + begin() starts a transaction that can be ended with a two-phase commit. + """ + xid = Xid(0, "foo", "bar") + self.connection.begin(xid) + self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')") + self.connection.prepare() + self.connection.commit() + self.connection.rollback() + result = self.connection.execute("SELECT id FROM test WHERE id=30") + self.assertTrue(result.get_one()) > We are missing a test for rollback after prepare (begin, execute, > prepare, rollback, confirm changes were rolled back). Yeah, I'd consider it a test at the edge between unit-test and functional-test, as Storm or my branch don't have (or need) any specific logic for that to work, and we rely on the backend. But I agree it's nice to have such a test, so I added it. > We are missing tests for mixing tpc and standard modes: > execute, commit, begin, execute, prepare, commit, execute > execute, rollback, begin, execute, prepare, rollback, execute Added them as well. + def test_begin_inside_a_two_phase_transaction(self): + """ + begin() can't be used if a two-phase transaction has already started. + """ + xid = Xid(0, "foo", "bar") + self.connection.begin(xid) + self.assertRaises(ProgrammingError, self.connection.begin, xid) > You should generate a fresh xid before the second begin() > call. There is a chance this failed because you are attempting to > use a non-unique xid. Both cases are probably worth testing. Good point, fixed. I just didn't add a test for re-using the same xid as PostgreSQL seems just happy with passing the same transaction_id string to two different PREPARE TRANSACTION commands in two different transactions. It feels like a questionable behavior, but apparently that's the way it is. It's surely not something we want to rely on or suggest users to rely on. +class TwoPhaseCommitDisconnectionTest(object): + + def test_begin_after_rollback_with_disconnection_error(self): + """ + If a rollback fails because of a disconnection error, the two-phase + transaction should be properly reset. + """ + xid = Xid(0, "foo", "bar") + self.connection.begin(xid) + self.connection.execute("SELECT 1") + self.proxy.stop() + self.connection.rollback() + self.proxy.start() + self.connection.begin(xid) + result = self.connection.execute("SELECT 1") + self.assertTrue(result.get_one()) > You seem to be relying on the fact that self.connection.begin(xid) > will fail if the previous transaction using that xid was not > rolled back. This is fragile, because that transaction may not > have been rolled back yet; Yes, I'm relying on it, but I think it's safe because here we are not testing backend-related behavior, but rather Storm behavior. In particular, if you comment the self._two_phase_transaction = False line here (in Connection.rollback): except Error, exc: if self.is_disconnection_error(exc): self._raw_connection = None self._state = STATE_RECONNECT self._two_phase_transaction = False then this test will fail because the second begin() will fail, regardless of the xid and of what the backend does. Hope I'm not missing something. > at least in the case of PostgreSQL > it might take some time for the server to notice that the connection > was dropped and roll it back. This might only be working because > the only statement issued was a SELECT. I think you should generate > a fresh xid for the second begin() to avoid this potential problem. That's nevertheless a good idea, so fixed it. + def test_begin_after_with_statement_disconnection_error_and_rollback(self): + """ + The two-phase transaction state is properly reset if a disconnection + happens before the rollback. + """ + xid = Xid(0, "foo", "bar") + self.connection.begin(xid) + self.proxy.close() + self.assertRaises(DisconnectionError, + self.connection.execute, "SELECT 1") + self.connection.rollback() + self.proxy.start() + self.connection.begin(xid) + result = self.connection.execute("SELECT 1") + self.assertTrue(result.get_one()) > Here too. Fixed. > And a test to demonstrate how things fail if you try to reuse an > uncommitted xid could be beneficial. It does fail, doesn't it? It doesn't fail for the reason above, afaict PostgreSQL seems happy to reuse any transaction_id. I think Storm should discourage doing so and consider the resulting behavior undefined. Nobody should rely rely on the fact that it doesn't fail, so I'm a bit reluctant adding such a test. === modified file 'tests/databases/postgres.py' +class PostgresTest(DatabaseTest, TwoPhaseCommitTest, TestHelper): > We are running the TwoPhaseCommitTests with PostgreSQL. Shouldn't we > also be running them under the other backends? I think all the previous > tests should pass with the other backends despite the fact they don't > actually do TPC. So the reason why I didn't add "dummy" implementations of begin/prepare for the backend that don't support TPC, is basically this one (from the second paragraph on): https://bugs.launchpad.net/storm/+bug/132485/comments/4 That being said, we could add an UnsupportedTwoPhaseCommitTests mixin to add the the non-TPC backend test classes and raise a NotSupportedError error for those backends in begin/prepare. > We should have a test that TPC is actually happening with PostgreSQL. > I think this is important as I don't think we have anything else > ensuring we actually get around to invoking the real connection.tpc_* > methods. To do this we need to 'begin, update, prepare'. Then open > a new psycopg2 connection and make the connection.tpc_commit(xid) > call. That should succeed, and if you force the original connection > to reconnect the changes should be visible. Sounds good. I ended up adding a Connection.recover() method, and optional xid parameters to Connection.commit() and Connection.rollback() in order to support the full story at the Connection level, and test it in unit-tests. We might possibly want to expose this at the Store level too. === modified file 'tests/zope/zstorm.py' + def test_transaction_with_two_phase_commit(self): + """ + Two stores in two-phase-commit mode joining the same transaction share + the same global transaction ID. + """ > This docstring is incorrect; it is a copy of the previous test's docstring. Fixed.