Code review comment for lp://staging/~free.ekanayaka/storm/tpc-support

Revision history for this message
Stuart Bishop (stub) wrote :

=== modified file 'NEWS'
+- Add support for two-phase commits, using the DB API version 2.0, which
+ for now is only supported by psycopg2.

'for now is only supported by psycopg2' is better written as 'is only supported by PostgreSQL'. It isn't so much a driver issue, but that PostgreSQL is the only backend that provides the TPC feature... and not our fault :-)

=== modified file 'storm/store.py'
+ def begin(self, xid):
+ """Start a new two-phase transaction.

Should xid be an optional parameter, and we generate one using a UUID if it isn't provided? If we do make xid optional, the xid needs to be returned from this method. Optional parameter means people can be lazy, but this may not be a good thing as ideally your global transaction id should be shared between all the TPC participants, making it easier to resolve a failed TPC dance.

=== added file 'storm/xid.py'
+class Xid(object):
+ """
+ Represent a transaction identifier compliant with the XA specification.
+ """
+
+ def __init__(self, format_id, global_transaction_id, branch_qualifier):
+ self.format_id = format_id
+ self.global_transaction_id = global_transaction_id
+ self.branch_qualifier = branch_qualifier

Should any or all of the constructor's parameters be optional?

=== modified file 'storm/zope/zstorm.py'

+from random import randint

+ if global_transaction_id is None:
+ # The the global transaction doesn't have an ID yet, let's create
+ # one in a way that it will be unique
+ global_transaction_id = "_storm_%032x" % randint(0, 2 ** 128)
+ txn.__storm_transaction_id__ = global_transaction_id

You should be using a UUID here rather than randint. Random methods try to provide a random distribution. UUIDs guarantee (as much as possible) to provide unique value. We are after the UUID behavior here.

+ xid = Xid(0, global_transaction_id, zstorm.get_name(store))
+ store.begin(xid)

This seems good. We get a unique global transaction id each transaction that is shared between stores, helping us result transactions lost during TPC failure. I think store names will always be valid XID components, but we might want to check this. In any case, psycopg2 should raise an exception if it is given invalid input and it is nice we have a meaningful name.

The commit/tpc_* method changes look correct for PostgreSQL. However, for backends that don't support TPC we are moving the actual commit from happening in the commit() method to the tpc_finish() method. This is not ideal, but maybe it is good enough.

The use case here is if your application is mixing PostgreSQL and SQLLite. Provided you have only a single SQLLite Store, you can actually get TPC reliability. You first need to prepare() all TPC capable backends, then commit() the single TPC incapable backend, then tpc_commit() the TPC capable backends.

I think we can do this easily enough if we added an attribute to the Store that lets us check if that Store is TPC capable. We then change the sortKey in zstorm.py to list TPC capable Stores first in the commit order, and then adjust the logic in the commit/tpc_* methods to only call the TPC methods if the backend is TPC capable.The sortOrder was created for this, back with Zope2, where the ZODB supported TPC but the none of the relational databases did. I think the rdb's all got 'zz_' prefixes to ensure they got committed last.

(and ignore the case where there are multiple TPC incapable stores - there are no guarantees there)

=== modified file 'tests/databases/base.py'

+class TwoPhaseCommitTest(object):
+
+ def test_begin(self):
+ """
+ begin() starts a transaction that can be ended with a two-phase commit.
+ """
+ xid = Xid(0, "foo", "bar")
+ self.connection.begin(xid)
+ self.connection.execute("INSERT INTO test VALUES (30, 'Title 30')")
+ self.connection.prepare()
+ self.connection.commit()
+ self.connection.rollback()
+ result = self.connection.execute("SELECT id FROM test WHERE id=30")
+ self.assertTrue(result.get_one())

We are missing a test for rollback after prepare (begin, execute, prepare, rollback, confirm changes were rolled back).

We are missing tests for mixing tpc and standard modes:
    execute, commit, begin, execute, prepare, commit, execute
    execute, rollback, begin, execute, prepare, rollback, execute

+ def test_begin_inside_a_two_phase_transaction(self):
+ """
+ begin() can't be used if a two-phase transaction has already started.
+ """
+ xid = Xid(0, "foo", "bar")
+ self.connection.begin(xid)
+ self.assertRaises(ProgrammingError, self.connection.begin, xid)

You should generate a fresh xid before the second begin() call. There is a chance this failed because you are attempting to use a non-unique xid. Both cases are probably worth testing.

+class TwoPhaseCommitDisconnectionTest(object):
+
+ def test_begin_after_rollback_with_disconnection_error(self):
+ """
+ If a rollback fails because of a disconnection error, the two-phase
+ transaction should be properly reset.
+ """
+ xid = Xid(0, "foo", "bar")
+ self.connection.begin(xid)
+ self.connection.execute("SELECT 1")
+ self.proxy.stop()
+ self.connection.rollback()
+ self.proxy.start()
+ self.connection.begin(xid)
+ result = self.connection.execute("SELECT 1")
+ self.assertTrue(result.get_one())

You seem to be relying on the fact that self.connection.begin(xid) will fail if the previous transaction using that xid was not rolled back. This is fragile, because that transaction may not have been rolled back yet; at least in the case of PostgreSQL it might take some time for the server to notice that the connection was dropped and roll it back. This might only be working because the only statement issued was a SELECT. I think you should generate a fresh xid for the second begin() to avoid this potential problem.

+ def test_begin_after_with_statement_disconnection_error_and_rollback(self):
+ """
+ The two-phase transaction state is properly reset if a disconnection
+ happens before the rollback.
+ """
+ xid = Xid(0, "foo", "bar")
+ self.connection.begin(xid)
+ self.proxy.close()
+ self.assertRaises(DisconnectionError,
+ self.connection.execute, "SELECT 1")
+ self.connection.rollback()
+ self.proxy.start()
+ self.connection.begin(xid)
+ result = self.connection.execute("SELECT 1")
+ self.assertTrue(result.get_one())

Here too.

And a test to demonstrate how things fail if you try to reuse an uncommitted xid could be beneficial. It does fail, doesn't it?

=== modified file 'tests/databases/postgres.py'

+class PostgresTest(DatabaseTest, TwoPhaseCommitTest, TestHelper):

We are running the TwoPhaseCommitTests with PostgreSQL. Shouldn't we also be running them under the other backends? I think all the previous tests should pass with the other backends despite the fact they don't actually do TPC.

We should have a test that TPC is actually happening with PostgreSQL. I think this is important as I don't think we have anything else ensuring we actually get around to invoking the real connection.tpc_* methods. To do this we need to 'begin, update, prepare'. Then open a new psycopg2 connection and make the connection.tpc_commit(xid) call. That should succeed, and if you force the original connection to reconnect the changes should be visible.

=== modified file 'tests/zope/zstorm.py'

+ def test_transaction_with_two_phase_commit(self):
+ """
+ Two stores in two-phase-commit mode joining the same transaction share
+ the same global transaction ID.
+ """

This docstring is incorrect; it is a copy of the previous test's docstring.

« Back to merge proposal