diff --git a/src/k2/cmd/ycsb/data.h b/src/k2/cmd/ycsb/data.h index 35678516..afeb8a50 100644 --- a/src/k2/cmd/ycsb/data.h +++ b/src/k2/cmd/ycsb/data.h @@ -145,7 +145,7 @@ class YCSBData{ }; // function to write the given YCSB Data row -seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool erase = false, ExistencePrecondition precondition = ExistencePrecondition::None) +seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool erase = false, ExistencePrecondition precondition = ExistencePrecondition::None, bool isonehot=false) { dto::SKVRecord skv_record(YCSBData::collectionName, YCSBData::schema); // create SKV record @@ -153,7 +153,7 @@ seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool eras skv_record.serializeNext(field); // add fields to SKV record } - return txn.write(skv_record, erase, precondition).then([] (WriteResult&& result) { + return txn.write(skv_record, erase, precondition, isonehot).then([] (WriteResult&& result) { if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // k2::Statuses::S412_Precondition_Failed for write with erase=false and key already exists and k2::Statuses::S404_Not_Found for write with erase=true and key does not exist K2LOG_D(log::ycsb, "writeRow failed and is retryable: {}", result.status); return seastar::make_exception_future(std::runtime_error("writeRow failed!")); @@ -165,7 +165,7 @@ seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool eras // function to update the partial fields for a YCSB Data row seastar::future -partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector fieldsToUpdate, K2TxnHandle& txn) { +partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector fieldsToUpdate, K2TxnHandle& txn, bool isonehot=false) { dto::SKVRecord skv_record(YCSBData::collectionName, YCSBData::schema); // create SKV record @@ -182,8 +182,8 @@ partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector(skv_record, std::move(fieldsToUpdate)).then([] (PartialUpdateResult&& result) { - if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // 412 for precondition of exists not satisfied and 404 for key not found + return txn.partialUpdate(skv_record, std::move(fieldsToUpdate), dto::Key(), isonehot).then([] (PartialUpdateResult&& result) { + if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // 412 for precondition of Exists not satisfied and 404 for key not found K2LOG_D(log::ycsb, "partialUpdateRow failed: {}", result.status); return seastar::make_exception_future(std::runtime_error("partialUpdateRow failed!")); } diff --git a/src/k2/cmd/ycsb/transactions.h b/src/k2/cmd/ycsb/transactions.h index 7dec3439..3fd408c5 100644 --- a/src/k2/cmd/ycsb/transactions.h +++ b/src/k2/cmd/ycsb/transactions.h @@ -90,7 +90,7 @@ class YCSBTxn { YCSBTxn(RandomContext& random, K23SIClient& client, std::shared_ptr requestDist, std::shared_ptr scanLengthDist) : - _random(random), _client(client), _failed(false), _requestDist(requestDist), _scanLengthDist(scanLengthDist) {} + _random(random), _client(client), _failed(false), _requestDist(requestDist), _scanLengthDist(scanLengthDist), _onehot(false) {} seastar::future attempt() { K2TxnOptions options{}; @@ -164,6 +164,9 @@ class YCSBTxn { fut.ignore_ready_future(); K2LOG_D(log::ycsb, "Txn finished"); + if(_onehot) // one hot transaction and succeeded + return seastar::make_ready_future(EndResult(Statuses::S200_OK("one hot transaction ended successfully"))); + return _txn.end(true); }).then_wrapped([this] (auto&& fut) { if (fut.failed()) { @@ -216,7 +219,8 @@ class YCSBTxn { cur++; } - return partialUpdateRow(_keyid,std::move(fieldValues),std::move(fieldsToUpdate),_txn).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return partialUpdateRow(_keyid,std::move(fieldValues),std::move(fieldsToUpdate),_txn, _onehot).discard_result(); } seastar::future<> scanOperation(){ @@ -269,7 +273,8 @@ class YCSBTxn { if(_requestDistName()!="latest") { K2LOG_D(log::ycsb, "Insert operation started for keyid {}", _keyid); YCSBData row(_keyid, _random); // generate row - return writeRow(row, _txn).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, false, ExistencePrecondition::None, _onehot).discard_result(); } _keyid = _requestDist->getMaxValue()+1; @@ -277,7 +282,9 @@ class YCSBTxn { //handle latest distribution separately to identify insert fails and increment latest known record max key value K2LOG_D(log::ycsb, "Insert operation started for keyid {}", _keyid); - return writeRow(row, _txn, false, ExistencePrecondition::NotExists) // if record with given key already exists must return error so we set precondition to Exists + + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, false, ExistencePrecondition::NotExists, _onehot) // if record with given key already exists must return error so we set precondition to NotExists .then_wrapped([this] (auto&& fut) { if (fut.failed()) { fut.ignore_ready_future(); @@ -301,7 +308,8 @@ class YCSBTxn { seastar::future<> deleteOperation(){ K2LOG_D(log::ycsb, "Delete operation started for keyid {}", _keyid); YCSBData row(_keyid); // generate row - return writeRow(row, _txn, true).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, true, ExistencePrecondition::None, _onehot).discard_result(); } Query _query_scan; @@ -317,6 +325,7 @@ class YCSBTxn { std::shared_ptr _requestDist; // Request distribution for selecting keys std::shared_ptr _scanLengthDist; // Request distribution for selecting length of scan uint64_t _insertMissesLatest{0}; // number of inserts missed when request distribution is Latest distribution + bool _onehot; // set to true if one hot transaction private: ConfigVar _ops_per_txn{"ops_per_txn"}; @@ -324,4 +333,5 @@ class YCSBTxn { ConfigVar _num_fields{"num_fields"}; ConfigVar _max_fields_update{"max_fields_update"}; ConfigVar _requestDistName{"request_dist"}; + ConfigVar _isonehot{"isonehot"}; }; diff --git a/src/k2/cmd/ycsb/ycsb_client.cpp b/src/k2/cmd/ycsb/ycsb_client.cpp index dda72839..33e61c37 100644 --- a/src/k2/cmd/ycsb/ycsb_client.cpp +++ b/src/k2/cmd/ycsb/ycsb_client.cpp @@ -446,7 +446,8 @@ int main(int argc, char** argv) {; ("delete_proportion",bpo::value()->default_value(0), "Delete Proportion") ("max_scan_length",bpo::value()->default_value(10), "Maximum scan length") ("max_fields_update",bpo::value()->default_value(1), "Maximum number of fields to update") - ("ops_per_txn",bpo::value()->default_value(1), "The number of operations per transaction"); + ("ops_per_txn",bpo::value()->default_value(1), "The number of operations per transaction") + ("isonehot",bpo::value()->default_value(true),"Whether to use one hot transactions (for transactions with one write/update) or not"); app.addApplet(); app.addApplet(); diff --git a/src/k2/dto/K23SI.h b/src/k2/dto/K23SI.h index 60f2aa4f..199dbaad 100644 --- a/src/k2/dto/K23SI.h +++ b/src/k2/dto/K23SI.h @@ -214,17 +214,18 @@ struct K23SIWriteRequest { Key key; // the key for the write SKVRecord::Storage value; // the value of the write std::vector fieldsForPartialUpdate; // if size() > 0 then this is a partial update + bool isonehot = false; // flag for one hot transactions K23SIWriteRequest() = default; K23SIWriteRequest(PVID _pvid, String cname, K23SI_MTR _mtr, Key _trh, String _trhCollection, bool _isDelete, bool _designateTRH, ExistencePrecondition _precondition, uint64_t id, Key _key, SKVRecord::Storage _value, - std::vector _fields) : + std::vector _fields, bool _isonehot=false) : pvid(std::move(_pvid)), collectionName(std::move(cname)), mtr(std::move(_mtr)), trh(std::move(_trh)), trhCollection(std::move(_trhCollection)), isDelete(_isDelete), designateTRH(_designateTRH), precondition(_precondition), request_id(id), - key(std::move(_key)), value(std::move(_value)), fieldsForPartialUpdate(std::move(_fields)) {} + key(std::move(_key)), value(std::move(_value)), fieldsForPartialUpdate(std::move(_fields)), isonehot(_isonehot) {} - K2_PAYLOAD_FIELDS(pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate); - K2_DEF_FMT(K23SIWriteRequest, pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate); + K2_PAYLOAD_FIELDS(pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate, isonehot); + K2_DEF_FMT(K23SIWriteRequest, pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate, isonehot); }; struct K23SIWriteResponse { diff --git a/src/k2/module/k23si/Module.cpp b/src/k2/module/k23si/Module.cpp index e74933cf..d6cd99b2 100644 --- a/src/k2/module/k23si/Module.cpp +++ b/src/k2/module/k23si/Module.cpp @@ -1108,11 +1108,35 @@ K23SIPartitionModule::_processWrite(dto::K23SIWriteRequest&& request, FastDeadli } } + //endRequest created for calling EndTransaction for one hot transactions. + dto::K23SITxnEndRequest endRequest{request.pvid, + request.trhCollection, + request.trh, + request.mtr, + dto::EndAction::Commit, + {} + }; + + bool isonehot = request.isonehot; + auto collection = request.collection; // all checks passed - we're ready to place this WI as the latest version auto status = _createWI(std::move(request), vset); K2LOG_D(log::skvsvr, "WI creation with status {}", status); - return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + + // not one hot transaction or WI creation failed + if(!isonehot || !status.is2xxOK()) { + return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + } + + endRequest.writeRanges[collection].insert(_partition().keyRangeV); // add keyRangeV of current partition (TRH) + + // End one hot transaction + return handleTxnEnd(std::move(endRequest)) + .then( [this] (auto&& response) { + auto& [endstatus, k2response] = response; + return RPCResponse(std::move(endstatus), dto::K23SIWriteResponse{}); // Return status after ending one hot transaction + }); } Status diff --git a/src/k2/module/k23si/client/k23si_client.cpp b/src/k2/module/k23si/client/k23si_client.cpp index 2994a67f..abafb1ae 100644 --- a/src/k2/module/k23si/client/k23si_client.cpp +++ b/src/k2/module/k23si/client/k23si_client.cpp @@ -120,6 +120,7 @@ seastar::future> K2TxnHandle::read(dto::Key key, Stri _client->read_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest @@ -156,7 +157,7 @@ seastar::future> K2TxnHandle::read(dto::Key key, Stri std::unique_ptr K2TxnHandle::_makeWriteRequest(dto::SKVRecord& record, bool erase, - dto::ExistencePrecondition precondition) { + dto::ExistencePrecondition precondition, bool isonehot=false) { for (const String& key : record.partitionKeys) { if (key == "") { throw K23SIClientException("Partition key field not set for write request"); @@ -187,12 +188,13 @@ std::unique_ptr K2TxnHandle::_makeWriteRequest(dto::SKVR _client->write_ops, key, record.storage.share(), - std::vector() + std::vector(), + isonehot ); } std::unique_ptr K2TxnHandle::_makePartialUpdateRequest(dto::SKVRecord& record, - std::vector fieldsForPartialUpdate, dto::Key&& key) { + std::vector fieldsForPartialUpdate, dto::Key&& key, bool isonehot=false) { bool isTRH = !_trh_key.has_value(); if (isTRH) { _trh_key = key; @@ -211,7 +213,8 @@ std::unique_ptr K2TxnHandle::_makePartialUpdateRequest(d _client->write_ops, std::move(key), record.storage.share(), - fieldsForPartialUpdate + fieldsForPartialUpdate, + isonehot }); } @@ -277,8 +280,8 @@ seastar::future K2TxnHandle::end(bool shouldCommit) { return _heartbeat_timer.stop().then([this, s=std::move(status)] () { // TODO get min transaction time from TSO client auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - auto sleep = 50us - time_spent; + if (time_spent < 5us) { + auto sleep = 5us - time_spent; return seastar::sleep(sleep).then([s=std::move(s)] () { return seastar::make_ready_future(EndResult(std::move(s))); }); @@ -556,6 +559,7 @@ seastar::future K2TxnHandle::query(Query& query) { _client->query_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest diff --git a/src/k2/module/k23si/client/k23si_client.h b/src/k2/module/k23si/client/k23si_client.h index 43617a32..cdbf2b82 100644 --- a/src/k2/module/k23si/client/k23si_client.h +++ b/src/k2/module/k23si/client/k23si_client.h @@ -180,7 +180,7 @@ class K2TxnHandle { std::unique_ptr _makeReadRequest(const dto::Key& key, const String& collectionName) const; std::unique_ptr _makeWriteRequest(dto::SKVRecord& record, bool erase, - dto::ExistencePrecondition precondition); + dto::ExistencePrecondition precondition, bool isonehot); template std::unique_ptr _makeReadRequest(const T& user_record) const { @@ -191,7 +191,7 @@ class K2TxnHandle { } std::unique_ptr _makePartialUpdateRequest(dto::SKVRecord& record, - std::vector fieldsForPartialUpdate, dto::Key&& key); + std::vector fieldsForPartialUpdate, dto::Key&& key, bool isonehot); void _prepareQueryRequest(Query& query); @@ -247,6 +247,7 @@ class K2TxnHandle { _client->read_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest @@ -269,30 +270,39 @@ class K2TxnHandle { template seastar::future write(T& record, bool erase=false, - ExistencePrecondition precondition=ExistencePrecondition::None) { + ExistencePrecondition precondition=ExistencePrecondition::None, bool isonehot=false) { if (!_valid) { return seastar::make_exception_future(K23SIClientException("Invalid use of K2TxnHandle")); } if (_failed) { return seastar::make_ready_future(WriteResult(_failed_status, dto::K23SIWriteResponse())); } + if (isonehot) { + // this must be the only operation in the transaction + if (_total_ops>0) { + return seastar::make_exception_future(K23SIClientException("Invalid use of one hot transaction")); + } + // User is not allowed to call anything else on this TxnHandle after this operation since one hot transactions call end() on server side + _valid = false; + } std::unique_ptr request; if constexpr (std::is_same()) { - request = _makeWriteRequest(record, erase, precondition); + request = _makeWriteRequest(record, erase, precondition, isonehot); } else { SKVRecord skv_record(record.collectionName, record.schema); record.__writeFields(skv_record); - request = _makeWriteRequest(skv_record, erase, precondition); + request = _makeWriteRequest(skv_record, erase, precondition, isonehot); } _client->write_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest (_options.deadline, *request). - then([this, request=std::move(request)] (auto&& response) { + then([this, request=std::move(request), isonehot=std::move(isonehot)] (auto&& response) { auto& [status, k2response] = response; _registerRangeForWrite(status, *request); @@ -300,21 +310,39 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - if (status.is2xxOK() && !_heartbeat_timer.isArmed()) { + k2::Duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + + if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful write"); K2LOG_D(log::skvclient, "Starting hb, mtr={}", _mtr); _heartbeat_interval = _cpo_client->collections[_trh_collection]->collection.metadata.heartbeatDeadline / 2; _makeHeartbeatTimer(); _heartbeat_timer.armPeriodic(_heartbeat_interval); } + else if(isonehot) { //handle one hot transactions separately and similar to what is done in end() + if (status.is2xxOK()) { + _client->successful_txns++; + } + //note we need not end heartbeat timer because we never started it for one hot transactions. + auto time_spent = Clock::now() - _start_time; + if (time_spent < 5us) { + sleep = 5us - time_spent; + } + } - return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + if(sleep==0us) { + return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + } + return seastar::sleep(std::move(sleep)) + .then([status=std::move(status),k2response=std::move(k2response)] () mutable { + return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + }); }); } template seastar::future partialUpdate(T& record, std::vector fieldsName, - dto::Key key=dto::Key()) { + dto::Key key=dto::Key(), bool isonehot=false) { std::vector fieldsForPartialUpdate; bool find = false; for (std::size_t i = 0; i < fieldsName.size(); ++i) { @@ -330,21 +358,31 @@ class K2TxnHandle { PartialUpdateResult(dto::K23SIStatus::BadParameter("error parameter: fieldsForPartialUpdate")) ); } - return partialUpdate(record, std::move(fieldsForPartialUpdate), std::move(key)); + return partialUpdate(record, std::move(fieldsForPartialUpdate), std::move(key), isonehot); } template seastar::future partialUpdate(T& record, std::vector fieldsForPartialUpdate, - dto::Key key=dto::Key()) { + dto::Key key=dto::Key(), bool isonehot=false) { if (!_valid) { return seastar::make_exception_future(K23SIClientException("Invalid use of K2TxnHandle")); } if (_failed) { return seastar::make_ready_future(PartialUpdateResult(_failed_status)); } + if (isonehot) { + // this must be the only operation in the transaction + if (_total_ops>0) { + return seastar::make_exception_future(K23SIClientException("Invalid use of one hot transaction")); + } + // User is not allowed to call anything else on this TxnHandle after this operation since one hot transactions call end() on server side + _valid = false; + } + _client->write_ops++; _ongoing_ops++; + _total_ops++; std::unique_ptr request; if constexpr (std::is_same()) { @@ -352,7 +390,7 @@ class K2TxnHandle { key = record.getKey(); } - request = _makePartialUpdateRequest(record, fieldsForPartialUpdate, std::move(key)); + request = _makePartialUpdateRequest(record, fieldsForPartialUpdate, std::move(key), isonehot); } else { SKVRecord skv_record(record.collectionName, record.schema); record.__writeFields(skv_record); @@ -360,7 +398,7 @@ class K2TxnHandle { key = skv_record.getKey(); } - request = _makePartialUpdateRequest(skv_record, fieldsForPartialUpdate, std::move(key)); + request = _makePartialUpdateRequest(skv_record, fieldsForPartialUpdate, std::move(key), isonehot); } if (!request) { return seastar::make_ready_future ( @@ -370,7 +408,7 @@ class K2TxnHandle { return _cpo_client->partitionRequest (_options.deadline, *request). - then([this, request=std::move(request)] (auto&& response) { + then([this, request=std::move(request), isonehot=std::move(isonehot)] (auto&& response) { auto& [status, k2response] = response; _registerRangeForWrite(status, *request); @@ -378,15 +416,33 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - if (status.is2xxOK() && !_heartbeat_timer.isArmed()) { + k2::Duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + + if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful partial update"); K2LOG_D(log::skvclient, "Starting hb, mtr={}", _mtr) _heartbeat_interval = _cpo_client->collections[_trh_collection]->collection.metadata.heartbeatDeadline / 2; _makeHeartbeatTimer(); _heartbeat_timer.armPeriodic(_heartbeat_interval); } + else if(isonehot) { //handle one hot transactions separately and similar to what is done in end() + if (status.is2xxOK()) { + _client->successful_txns++; + } + //note we need not end heartbeat timer because we never started it for one hot transactions. + auto time_spent = Clock::now() - _start_time; + if (time_spent < 5us) { + sleep = 5us - time_spent; + } + } - return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + if(sleep==0us) { + return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + } + return seastar::sleep(std::move(sleep)) + .then([status=std::move(status)] () { + return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + }); }); } @@ -414,7 +470,7 @@ class K2TxnHandle { Duration _txn_end_deadline; TimePoint _start_time; uint64_t _ongoing_ops = 0; // Used to track if there are operations in flight when end() is called - + uint64_t _total_ops = 0; // Used to track number of operations in the txn and is used to determine if transaction is one hot Duration _heartbeat_interval; PeriodicTimer _heartbeat_timer;