key = $data['key']; } if(isset($data['cas'])) { $this->cas = $data['cas']; } if(isset($data['value'])) { if(is_string($data['value'])) { $this->values = json_decode($data['value'], true); } else { $this->values = $data['value']; } } } } /** * This class allows you to do transactions based on two-phase commits. * * You can pass in an array of documents and the SDK makes sure to either * store them correctly or return an exception that you can catch and * act upon. * * It also adds some convenience methods that make the actual implementation * easier to read. */ class MyCouchbase extends Couchbase { /** * Holds a list of documents that should be deleted on rollback. */ protected $_deleteOnRollback = array(); /** * Holds the pre-transaction state of the documents that have been change. */ protected $_tmpCache = array(); /** * Implements the commit logic with a more concise syntax than the * simple example. * * @param array $docs An array of documents with the key as the key * and a json_encodable value as the document. * @param array $options An array of options that can be passed on. * - `full`: If set to true, it assumes that the passed * docs represent the full payload and not just * a subset. Defaults to false and will be merged. * - `create`: If true, the documents will be created * if they don't exist (but all together). Defaults * to true. */ public function commit($docs, $options = array()) { $defaults = array('full' => false, 'create' => true); $config = $options + $defaults; if(empty($docs)) { return true; } // STEP 0: Convert array do CouchbaseDocument for easier handling foreach($docs as $key => $document) { if(is_array($document)) { $docs[$key] = new CouchbaseDocument(array( 'key' => $key, 'value' => $document )); } } // STEP 1: Insert Transaction Document $transactionId = $this->increment('transaction:counter', 1, true); $transactionKey = "transaction:${transactionId}"; $transactionDocument = array( 'id' => $transactionId, 'docs' => array_keys($docs), 'state' => 'initial' ); $this->set($transactionKey, json_encode($transactionDocument)); $transaction = $this->_get($transactionKey); if(!$transaction) { throw new TransactionException("Could not insert transaction document"); } try { // STEP 1.1: Put Transaction in pending mode $transaction->values['state'] = 'pending'; $this->cas($transaction->cas, $transaction->key, json_encode($transaction->values)); $transaction = $this->_get($transaction->key); // STEP 2: Insert documents if configured to do so. $this->_deleteOnRollback[$transactionKey] = array(); if($config['create']) { foreach($docs as $key => $document) { if($this->get($key) === null) { $document->values['transactions'] = array($transaction->key); $this->set($key, json_encode($document->values)); $this->_deleteOnRollback[$transactionKey][] = $key; } } } // STEP 3: Read all current documents $current = array(); foreach(array_keys($docs) as $key) { $result = $this->_get($key); if(!$result) { throw new TransactionException("Could not read document before update."); } $current[$key] = $result; $this->_tmpBuffer[$transactionKey][$key] = $result; } if(count($current) != count($docs)) { throw new TransactionException("Document count mismatch."); } // STEP 4: Update/Merge them via CAS foreach($docs as $key => $document) { if($config['full'] === true) { $current[$key]->values = $document->values; } else { $current[$key]->values = $document->values + $current[$key]->values; } if(!isset($current[$key]->values['transactions'])) { $current[$key]->values['transactions'] = array(); } $current[$key]->values['transactions'] += array($transaction->key); if(!$this->cas($current[$key]->cas, $current[$key]->key, json_encode($current[$key]->values))) { throw new TransactionException("Could not update the document."); } } // STEP 5: Switch transaction to commited state $transaction->values['state'] = 'committed'; $this->cas($transaction->cas, $transaction->key, json_encode($transaction->values)); $transaction = $this->_get($transaction->key); // STEP 6: Remove transaction from documents foreach(array_keys($docs) as $key) { $document = $this->_get($key); $document->values['transactions'] = array_diff($document->values['transactions'], array($transactionKey)); if(!$this->cas($document->cas, $document->key, json_encode($document->values))) { throw new TransactionException("Could not remove transaction from document."); } } //throw new TransactionException("*** TEST EXCEPTION ***"); // STEP 7: Put transaction state to "done" $transaction->values['state'] = 'done'; $this->cas($transaction->cas, $transaction->key, json_encode($transaction->values)); $transaction = $this->_get($transaction->key); } catch(Exception $e) { // Read current transaction state $transaction = $this->_get($transactionKey); $state = $transaction->values['state']; // STEP 2: Do revert tasks depending on the state if($state == 'pending' || $state == 'committed') { // set transaction state to "cancelling" $transaction->values['state'] = 'cancelling'; $this->cas($transaction->cas, $transaction->key, json_encode($transaction->values)); $transaction = $this->_get($transaction->key); foreach($this->_tmpBuffer[$transactionKey] as $key => $document) { $this->set($document->key, json_encode($document->values)); } foreach($this->_deleteOnRollback[$transactionKey] as $key) { $this->delete($key); } } // set transaction state to "cancelled" $transaction->values['state'] = 'cancelled'; $this->cas($transaction->cas, $transaction->key, json_encode($transaction->values)); $transaction = $this->_get($transaction->key); throw new TransactionException("Transaction failed, recovered state.", null, $e); } } /** * Helper method to return the data needed and populate it with CouchbaseDocument * objects for easer management. */ protected function _get($key) { $return = null; $this->getDelayed(array($key), true, function($conn, $data) use(&$return) { $return = new CouchbaseDocument($data); }); return $return; } /** * Overrides the initial implementation to provide an additional third * param to create the document if it doesn't exist yet. */ public function increment($key, $offset, $create = false) { if($create) { if($this->get($key) === null) { $this->set($key, 0); } } return parent::increment($key, $offset); } } ?> array('balance' => 800), 'matt' => array('balance' => 80) ); $cb->commit($docs); ?>