From 3f97f94d1e810bcd19042ebf92e55685ea68827d Mon Sep 17 00:00:00 2001 From: ammuv Date: Mon, 9 Aug 2021 23:03:35 +0000 Subject: [PATCH 1/5] added one hot transactions --- src/k2/dto/K23SI.h | 9 ++- src/k2/module/k23si/Module.cpp | 37 ++++++++- src/k2/module/k23si/client/k23si_client.cpp | 12 ++- src/k2/module/k23si/client/k23si_client.h | 88 +++++++++++++++++---- 4 files changed, 117 insertions(+), 29 deletions(-) diff --git a/src/k2/dto/K23SI.h b/src/k2/dto/K23SI.h index 60f2aa4f..199dbaad 100644 --- a/src/k2/dto/K23SI.h +++ b/src/k2/dto/K23SI.h @@ -214,17 +214,18 @@ struct K23SIWriteRequest { Key key; // the key for the write SKVRecord::Storage value; // the value of the write std::vector fieldsForPartialUpdate; // if size() > 0 then this is a partial update + bool isonehot = false; // flag for one hot transactions K23SIWriteRequest() = default; K23SIWriteRequest(PVID _pvid, String cname, K23SI_MTR _mtr, Key _trh, String _trhCollection, bool _isDelete, bool _designateTRH, ExistencePrecondition _precondition, uint64_t id, Key _key, SKVRecord::Storage _value, - std::vector _fields) : + std::vector _fields, bool _isonehot=false) : pvid(std::move(_pvid)), collectionName(std::move(cname)), mtr(std::move(_mtr)), trh(std::move(_trh)), trhCollection(std::move(_trhCollection)), isDelete(_isDelete), designateTRH(_designateTRH), precondition(_precondition), request_id(id), - key(std::move(_key)), value(std::move(_value)), fieldsForPartialUpdate(std::move(_fields)) {} + key(std::move(_key)), value(std::move(_value)), fieldsForPartialUpdate(std::move(_fields)), isonehot(_isonehot) {} - K2_PAYLOAD_FIELDS(pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate); - K2_DEF_FMT(K23SIWriteRequest, pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate); + K2_PAYLOAD_FIELDS(pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate, isonehot); + K2_DEF_FMT(K23SIWriteRequest, pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate, isonehot); }; struct K23SIWriteResponse { diff --git a/src/k2/module/k23si/Module.cpp b/src/k2/module/k23si/Module.cpp index e74933cf..64c97ab6 100644 --- a/src/k2/module/k23si/Module.cpp +++ b/src/k2/module/k23si/Module.cpp @@ -1108,11 +1108,40 @@ K23SIPartitionModule::_processWrite(dto::K23SIWriteRequest&& request, FastDeadli } } + // build write_ranges for Transaction end request + std::unordered_map> write_ranges; // will be filled in later if WI is created successfully + + //endRequest created for calling EndTransaction for one hot transactions. + dto::K23SITxnEndRequest endRequest{request.pvid, + request.trhCollection, + request.trh, + request.mtr, + dto::EndAction::Commit, + std::move(write_ranges) + }; + + return seastar::do_with((dto::K23SITxnEndRequest)std::move(endRequest), (bool)request.isonehot, + [this, &request] (dto::K23SITxnEndRequest& endRequest, bool& isonehot) { + // all checks passed - we're ready to place this WI as the latest version + auto& vset = _indexer[request.key]; + auto status = _createWI(std::move(request), vset); + K2LOG_D(log::skvsvr, "WI creation with status {}", status); + + // not one hot transaction or WI creation failed + if(!isonehot || !status.is2xxOK()) { + return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + } - // all checks passed - we're ready to place this WI as the latest version - auto status = _createWI(std::move(request), vset); - K2LOG_D(log::skvsvr, "WI creation with status {}", status); - return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + endRequest.writeRanges[request.collectionName].insert(_partition().keyRangeV); // add keyRangeV of current partition (TRH) + + // End one hot transaction + return handleTxnEnd(std::move(endRequest)) + .then( [this] (auto&& response) { + auto& [endstatus, k2response] = response; + return RPCResponse(std::move(endstatus), dto::K23SIWriteResponse{}); // Return status after ending one hot transaction + }); + + }); } Status diff --git a/src/k2/module/k23si/client/k23si_client.cpp b/src/k2/module/k23si/client/k23si_client.cpp index 2994a67f..f00f675d 100644 --- a/src/k2/module/k23si/client/k23si_client.cpp +++ b/src/k2/module/k23si/client/k23si_client.cpp @@ -120,6 +120,7 @@ seastar::future> K2TxnHandle::read(dto::Key key, Stri _client->read_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest @@ -156,7 +157,7 @@ seastar::future> K2TxnHandle::read(dto::Key key, Stri std::unique_ptr K2TxnHandle::_makeWriteRequest(dto::SKVRecord& record, bool erase, - dto::ExistencePrecondition precondition) { + dto::ExistencePrecondition precondition, bool isonehot=false) { for (const String& key : record.partitionKeys) { if (key == "") { throw K23SIClientException("Partition key field not set for write request"); @@ -187,12 +188,13 @@ std::unique_ptr K2TxnHandle::_makeWriteRequest(dto::SKVR _client->write_ops, key, record.storage.share(), - std::vector() + std::vector(), + isonehot ); } std::unique_ptr K2TxnHandle::_makePartialUpdateRequest(dto::SKVRecord& record, - std::vector fieldsForPartialUpdate, dto::Key&& key) { + std::vector fieldsForPartialUpdate, dto::Key&& key, bool isonehot=false) { bool isTRH = !_trh_key.has_value(); if (isTRH) { _trh_key = key; @@ -211,7 +213,8 @@ std::unique_ptr K2TxnHandle::_makePartialUpdateRequest(d _client->write_ops, std::move(key), record.storage.share(), - fieldsForPartialUpdate + fieldsForPartialUpdate, + isonehot }); } @@ -556,6 +559,7 @@ seastar::future K2TxnHandle::query(Query& query) { _client->query_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest diff --git a/src/k2/module/k23si/client/k23si_client.h b/src/k2/module/k23si/client/k23si_client.h index 43617a32..9a0e3ca6 100644 --- a/src/k2/module/k23si/client/k23si_client.h +++ b/src/k2/module/k23si/client/k23si_client.h @@ -180,7 +180,7 @@ class K2TxnHandle { std::unique_ptr _makeReadRequest(const dto::Key& key, const String& collectionName) const; std::unique_ptr _makeWriteRequest(dto::SKVRecord& record, bool erase, - dto::ExistencePrecondition precondition); + dto::ExistencePrecondition precondition, bool isonehot); template std::unique_ptr _makeReadRequest(const T& user_record) const { @@ -191,7 +191,7 @@ class K2TxnHandle { } std::unique_ptr _makePartialUpdateRequest(dto::SKVRecord& record, - std::vector fieldsForPartialUpdate, dto::Key&& key); + std::vector fieldsForPartialUpdate, dto::Key&& key, bool isonehot); void _prepareQueryRequest(Query& query); @@ -247,6 +247,7 @@ class K2TxnHandle { _client->read_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest @@ -269,30 +270,39 @@ class K2TxnHandle { template seastar::future write(T& record, bool erase=false, - ExistencePrecondition precondition=ExistencePrecondition::None) { + ExistencePrecondition precondition=ExistencePrecondition::None, bool isonehot=false) { if (!_valid) { return seastar::make_exception_future(K23SIClientException("Invalid use of K2TxnHandle")); } if (_failed) { return seastar::make_ready_future(WriteResult(_failed_status, dto::K23SIWriteResponse())); } + if (isonehot) { + // this must be the only operation in the transaction + if (_total_ops>0) { + return seastar::make_exception_future(K23SIClientException("Invalid use of one hot transaction")); + } + // User is not allowed to call anything else on this TxnHandle after this operation since one hot transactions call end() on server side + _valid = false; + } std::unique_ptr request; if constexpr (std::is_same()) { - request = _makeWriteRequest(record, erase, precondition); + request = _makeWriteRequest(record, erase, precondition, isonehot); } else { SKVRecord skv_record(record.collectionName, record.schema); record.__writeFields(skv_record); - request = _makeWriteRequest(skv_record, erase, precondition); + request = _makeWriteRequest(skv_record, erase, precondition, isonehot); } _client->write_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest (_options.deadline, *request). - then([this, request=std::move(request)] (auto&& response) { + then([this, request=std::move(request), isonehot=std::move(isonehot)] (auto&& response) { auto& [status, k2response] = response; _registerRangeForWrite(status, *request); @@ -300,21 +310,38 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - if (status.is2xxOK() && !_heartbeat_timer.isArmed()) { + k2::CachedSteadyClock::duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + + if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful write"); K2LOG_D(log::skvclient, "Starting hb, mtr={}", _mtr); _heartbeat_interval = _cpo_client->collections[_trh_collection]->collection.metadata.heartbeatDeadline / 2; _makeHeartbeatTimer(); _heartbeat_timer.armPeriodic(_heartbeat_interval); } + else if(isonehot) { //handle one hot transactions separately and similar to what is done in end() + if (status.is2xxOK()) { + _client->successful_txns++; + } + //note we need not end heartbeat timer because we never started it for one hot transactions. + auto time_spent = Clock::now() - _start_time; + if (time_spent < 50us) { + sleep = 50us - time_spent; + } + } - return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + return seastar::do_with( (k2::CachedSteadyClock::duration)sleep, (dto::K23SIWriteResponse)std::move(k2response), [this,sleep=std::move(sleep),status=std::move(status)] (k2::CachedSteadyClock::duration& dur, dto::K23SIWriteResponse& k2response) { + return seastar::sleep(std::move(dur)) + .then([status=std::move(status),&k2response] () -> seastar::future { + return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + }); + }); }); } template seastar::future partialUpdate(T& record, std::vector fieldsName, - dto::Key key=dto::Key()) { + dto::Key key=dto::Key(), bool isonehot=false) { std::vector fieldsForPartialUpdate; bool find = false; for (std::size_t i = 0; i < fieldsName.size(); ++i) { @@ -330,21 +357,31 @@ class K2TxnHandle { PartialUpdateResult(dto::K23SIStatus::BadParameter("error parameter: fieldsForPartialUpdate")) ); } - return partialUpdate(record, std::move(fieldsForPartialUpdate), std::move(key)); + return partialUpdate(record, std::move(fieldsForPartialUpdate), std::move(key), isonehot); } template seastar::future partialUpdate(T& record, std::vector fieldsForPartialUpdate, - dto::Key key=dto::Key()) { + dto::Key key=dto::Key(), bool isonehot=false) { if (!_valid) { return seastar::make_exception_future(K23SIClientException("Invalid use of K2TxnHandle")); } if (_failed) { return seastar::make_ready_future(PartialUpdateResult(_failed_status)); } + if (isonehot) { + // this must be the only operation in the transaction + if (_total_ops>0) { + return seastar::make_exception_future(K23SIClientException("Invalid use of one hot transaction")); + } + // User is not allowed to call anything else on this TxnHandle after this operation since one hot transactions call end() on server side + _valid = false; + } + _client->write_ops++; _ongoing_ops++; + _total_ops++; std::unique_ptr request; if constexpr (std::is_same()) { @@ -352,7 +389,7 @@ class K2TxnHandle { key = record.getKey(); } - request = _makePartialUpdateRequest(record, fieldsForPartialUpdate, std::move(key)); + request = _makePartialUpdateRequest(record, fieldsForPartialUpdate, std::move(key), isonehot); } else { SKVRecord skv_record(record.collectionName, record.schema); record.__writeFields(skv_record); @@ -360,7 +397,7 @@ class K2TxnHandle { key = skv_record.getKey(); } - request = _makePartialUpdateRequest(skv_record, fieldsForPartialUpdate, std::move(key)); + request = _makePartialUpdateRequest(skv_record, fieldsForPartialUpdate, std::move(key), isonehot); } if (!request) { return seastar::make_ready_future ( @@ -370,7 +407,7 @@ class K2TxnHandle { return _cpo_client->partitionRequest (_options.deadline, *request). - then([this, request=std::move(request)] (auto&& response) { + then([this, request=std::move(request), isonehot=std::move(isonehot)] (auto&& response) { auto& [status, k2response] = response; _registerRangeForWrite(status, *request); @@ -378,15 +415,32 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - if (status.is2xxOK() && !_heartbeat_timer.isArmed()) { + k2::CachedSteadyClock::duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + + if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful partial update"); K2LOG_D(log::skvclient, "Starting hb, mtr={}", _mtr) _heartbeat_interval = _cpo_client->collections[_trh_collection]->collection.metadata.heartbeatDeadline / 2; _makeHeartbeatTimer(); _heartbeat_timer.armPeriodic(_heartbeat_interval); } + else if(isonehot) { //handle one hot transactions separately and similar to what is done in end() + if (status.is2xxOK()) { + _client->successful_txns++; + } + //note we need not end heartbeat timer because we never started it for one hot transactions. + auto time_spent = Clock::now() - _start_time; + if (time_spent < 50us) { + sleep = 50us - time_spent; + } + } - return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + return seastar::do_with( (k2::CachedSteadyClock::duration)sleep, [sleep=std::move(sleep),status=std::move(status)] (k2::CachedSteadyClock::duration& dur) { + return seastar::sleep(std::move(dur)) + .then([status=std::move(status)] () -> seastar::future { + return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + }); + }); }); } @@ -414,7 +468,7 @@ class K2TxnHandle { Duration _txn_end_deadline; TimePoint _start_time; uint64_t _ongoing_ops = 0; // Used to track if there are operations in flight when end() is called - + uint64_t _total_ops = 0; // Used to track number of operations in the txn and is used to determine if transaction is one hot Duration _heartbeat_interval; PeriodicTimer _heartbeat_timer; From 4207cf7a9f6e921637da97893d9fd6296418eecb Mon Sep 17 00:00:00 2001 From: ammuv Date: Wed, 11 Aug 2021 22:11:41 +0000 Subject: [PATCH 2/5] made changes to YCSB load and error codes and added one hot transactions to YCSB --- src/k2/cmd/ycsb/data.h | 12 +++++----- src/k2/cmd/ycsb/dataload.h | 25 +++++++++++---------- src/k2/cmd/ycsb/transactions.h | 24 ++++++++++++++------ src/k2/cmd/ycsb/ycsb_client.cpp | 3 ++- src/k2/module/k23si/client/k23si_client.cpp | 4 ++-- src/k2/module/k23si/client/k23si_client.h | 8 +++---- 6 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/k2/cmd/ycsb/data.h b/src/k2/cmd/ycsb/data.h index 314dc261..afeb8a50 100644 --- a/src/k2/cmd/ycsb/data.h +++ b/src/k2/cmd/ycsb/data.h @@ -145,7 +145,7 @@ class YCSBData{ }; // function to write the given YCSB Data row -seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool erase = false, ExistencePrecondition precondition = ExistencePrecondition::None) +seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool erase = false, ExistencePrecondition precondition = ExistencePrecondition::None, bool isonehot=false) { dto::SKVRecord skv_record(YCSBData::collectionName, YCSBData::schema); // create SKV record @@ -153,8 +153,8 @@ seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool eras skv_record.serializeNext(field); // add fields to SKV record } - return txn.write(skv_record, erase, precondition).then([] (WriteResult&& result) { - if (!result.status.is2xxOK() && result.status.code!=403 && result.status.code!=404) { // 403 for write with erase=false and key already exists or 404 for write with erase=true and key does not exist + return txn.write(skv_record, erase, precondition, isonehot).then([] (WriteResult&& result) { + if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // k2::Statuses::S412_Precondition_Failed for write with erase=false and key already exists and k2::Statuses::S404_Not_Found for write with erase=true and key does not exist K2LOG_D(log::ycsb, "writeRow failed and is retryable: {}", result.status); return seastar::make_exception_future(std::runtime_error("writeRow failed!")); } @@ -165,7 +165,7 @@ seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool eras // function to update the partial fields for a YCSB Data row seastar::future -partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector fieldsToUpdate, K2TxnHandle& txn) { +partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector fieldsToUpdate, K2TxnHandle& txn, bool isonehot=false) { dto::SKVRecord skv_record(YCSBData::collectionName, YCSBData::schema); // create SKV record @@ -182,8 +182,8 @@ partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector(skv_record, std::move(fieldsToUpdate)).then([] (PartialUpdateResult&& result) { - if (!result.status.is2xxOK() && result.status.code!=404) { // 404 for key not found + return txn.partialUpdate(skv_record, std::move(fieldsToUpdate), dto::Key(), isonehot).then([] (PartialUpdateResult&& result) { + if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // 412 for precondition of Exists not satisfied and 404 for key not found K2LOG_D(log::ycsb, "partialUpdateRow failed: {}", result.status); return seastar::make_exception_future(std::runtime_error("partialUpdateRow failed!")); } diff --git a/src/k2/cmd/ycsb/dataload.h b/src/k2/cmd/ycsb/dataload.h index 8985910a..8446030f 100644 --- a/src/k2/cmd/ycsb/dataload.h +++ b/src/k2/cmd/ycsb/dataload.h @@ -47,21 +47,21 @@ class DataLoader { options.syncFinalize = true; double propSkip = (double)_num_inserts()/(_num_records()+_num_inserts()); // proportion of records to Skip while loading - RandomContext random_context(0,{propSkip, 1-propSkip}); + _random = RandomContext {0,{propSkip, 1-propSkip}}; K2LOG_D(log::ycsb, "pipeline depth and data load per txn ={}, {}", pipeline_depth, _writes_per_load_txn()); - return seastar::do_with((size_t)startIdxShard, [this, options, pipeline_depth, &client, random_context, startIdxShard, endIdxShard] (size_t& start_idx){ + return seastar::do_with((size_t)startIdxShard, [this, options, pipeline_depth, &client, startIdxShard, endIdxShard] (size_t& start_idx){ return seastar::do_until( [this, &start_idx, endIdxShard] { return start_idx>=endIdxShard; }, - [this, options, pipeline_depth, &client, &start_idx, random_context, endIdxShard] { + [this, options, pipeline_depth, &client, &start_idx, endIdxShard] { std::vector> futures; for (int i=0; i < pipeline_depth; ++i) { - futures.push_back(client.beginTxn(options).then([this, i, start_idx, random_context, endIdxShard] (K2TxnHandle&& t) { + futures.push_back(client.beginTxn(options).then([this, i, start_idx, endIdxShard] (K2TxnHandle&& t) { K2LOG_D(log::ycsb, "txn begin in load data"); - return seastar::do_with(std::move(t), [this, i, start_idx, random_context, endIdxShard] (K2TxnHandle& txn) { + return seastar::do_with(std::move(t), [this, i, start_idx, endIdxShard] (K2TxnHandle& txn) { size_t idx = start_idx + i*_writes_per_load_txn(); - return insertDataLoop(txn, idx, random_context, endIdxShard); + return insertDataLoop(txn, idx, endIdxShard); }); })); } @@ -72,18 +72,18 @@ class DataLoader { } private: - seastar::future<> insertDataLoop(K2TxnHandle& txn, size_t start_idx, RandomContext random_context, size_t endIdxShard) + seastar::future<> insertDataLoop(K2TxnHandle& txn, size_t start_idx, size_t endIdxShard) { K2LOG_D(log::ycsb, "Starting transaction, start_idx is {}", start_idx); - return seastar::do_with((size_t)0, std::move(random_context), [this, &txn, start_idx, endIdxShard] (size_t& current_size, RandomContext& random_c) { + return seastar::do_with((size_t)0, [this, &txn, start_idx, endIdxShard] (size_t& current_size) { return seastar::do_until( [this, ¤t_size, start_idx, endIdxShard] { return ((current_size >= _writes_per_load_txn()) || ((start_idx + current_size)>= endIdxShard)); }, - [this, ¤t_size, &txn, start_idx, &random_c] () { - uint8_t isLoad = random_c.BiasedInt(); - if(isLoad || (_requestDistName()=="latest" && (start_idx + current_size)<_num_records())){ // load record with prob = _num_records / _num_keys or if latest load all records uptil num_records() + [this, ¤t_size, &txn, start_idx] () { + uint8_t isLoad = _random.BiasedInt(); + if((!(_requestDistName()=="latest") && isLoad) || (_requestDistName()=="latest" && (start_idx + current_size)<_num_records())){ // load record with prob = _num_records / _num_keys or if latest load all records uptil num_records() K2LOG_D(log::ycsb, "Record being loaded now in this txn is {}", start_idx + current_size); - YCSBData row(start_idx + current_size, random_c); // generate row + YCSBData row(start_idx + current_size, _random); // generate row ++current_size; return writeRow(row, txn).discard_result(); @@ -107,6 +107,7 @@ class DataLoader { }); } + RandomContext _random; ConfigVar _writes_per_load_txn{"writes_per_load_txn"}; ConfigVar _num_records{"num_records"}; ConfigVar _num_inserts{"num_records_insert"}; diff --git a/src/k2/cmd/ycsb/transactions.h b/src/k2/cmd/ycsb/transactions.h index 6092a865..3fd408c5 100644 --- a/src/k2/cmd/ycsb/transactions.h +++ b/src/k2/cmd/ycsb/transactions.h @@ -90,7 +90,7 @@ class YCSBTxn { YCSBTxn(RandomContext& random, K23SIClient& client, std::shared_ptr requestDist, std::shared_ptr scanLengthDist) : - _random(random), _client(client), _failed(false), _requestDist(requestDist), _scanLengthDist(scanLengthDist) {} + _random(random), _client(client), _failed(false), _requestDist(requestDist), _scanLengthDist(scanLengthDist), _onehot(false) {} seastar::future attempt() { K2TxnOptions options{}; @@ -164,6 +164,9 @@ class YCSBTxn { fut.ignore_ready_future(); K2LOG_D(log::ycsb, "Txn finished"); + if(_onehot) // one hot transaction and succeeded + return seastar::make_ready_future(EndResult(Statuses::S200_OK("one hot transaction ended successfully"))); + return _txn.end(true); }).then_wrapped([this] (auto&& fut) { if (fut.failed()) { @@ -216,7 +219,8 @@ class YCSBTxn { cur++; } - return partialUpdateRow(_keyid,std::move(fieldValues),std::move(fieldsToUpdate),_txn).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return partialUpdateRow(_keyid,std::move(fieldValues),std::move(fieldsToUpdate),_txn, _onehot).discard_result(); } seastar::future<> scanOperation(){ @@ -269,7 +273,8 @@ class YCSBTxn { if(_requestDistName()!="latest") { K2LOG_D(log::ycsb, "Insert operation started for keyid {}", _keyid); YCSBData row(_keyid, _random); // generate row - return writeRow(row, _txn).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, false, ExistencePrecondition::None, _onehot).discard_result(); } _keyid = _requestDist->getMaxValue()+1; @@ -277,7 +282,9 @@ class YCSBTxn { //handle latest distribution separately to identify insert fails and increment latest known record max key value K2LOG_D(log::ycsb, "Insert operation started for keyid {}", _keyid); - return writeRow(row, _txn, false, ExistencePrecondition::Exists) // if record with given key already exists must return error so we set precondition to Exists + + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, false, ExistencePrecondition::NotExists, _onehot) // if record with given key already exists must return error so we set precondition to NotExists .then_wrapped([this] (auto&& fut) { if (fut.failed()) { fut.ignore_ready_future(); @@ -285,11 +292,11 @@ class YCSBTxn { } WriteResult result = fut.get0(); - if (result.status.is2xxOK() || result.status.code == 403) { // key inserted or already exists + if (result.status.is2xxOK() || result.status.code == 412) { // key inserted or already exists if(_requestDist->getMaxValue() < _keyid) { // update bounds of distribution _requestDist->updateBounds(0,_keyid); } - if(result.status.code == 403) { // insert missed because record exists + if(result.status.code == 412) { // insert missed because record exists _insertMissesLatest++; // increment misses counter } } @@ -301,7 +308,8 @@ class YCSBTxn { seastar::future<> deleteOperation(){ K2LOG_D(log::ycsb, "Delete operation started for keyid {}", _keyid); YCSBData row(_keyid); // generate row - return writeRow(row, _txn, true).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, true, ExistencePrecondition::None, _onehot).discard_result(); } Query _query_scan; @@ -317,6 +325,7 @@ class YCSBTxn { std::shared_ptr _requestDist; // Request distribution for selecting keys std::shared_ptr _scanLengthDist; // Request distribution for selecting length of scan uint64_t _insertMissesLatest{0}; // number of inserts missed when request distribution is Latest distribution + bool _onehot; // set to true if one hot transaction private: ConfigVar _ops_per_txn{"ops_per_txn"}; @@ -324,4 +333,5 @@ class YCSBTxn { ConfigVar _num_fields{"num_fields"}; ConfigVar _max_fields_update{"max_fields_update"}; ConfigVar _requestDistName{"request_dist"}; + ConfigVar _isonehot{"isonehot"}; }; diff --git a/src/k2/cmd/ycsb/ycsb_client.cpp b/src/k2/cmd/ycsb/ycsb_client.cpp index dda72839..33e61c37 100644 --- a/src/k2/cmd/ycsb/ycsb_client.cpp +++ b/src/k2/cmd/ycsb/ycsb_client.cpp @@ -446,7 +446,8 @@ int main(int argc, char** argv) {; ("delete_proportion",bpo::value()->default_value(0), "Delete Proportion") ("max_scan_length",bpo::value()->default_value(10), "Maximum scan length") ("max_fields_update",bpo::value()->default_value(1), "Maximum number of fields to update") - ("ops_per_txn",bpo::value()->default_value(1), "The number of operations per transaction"); + ("ops_per_txn",bpo::value()->default_value(1), "The number of operations per transaction") + ("isonehot",bpo::value()->default_value(true),"Whether to use one hot transactions (for transactions with one write/update) or not"); app.addApplet(); app.addApplet(); diff --git a/src/k2/module/k23si/client/k23si_client.cpp b/src/k2/module/k23si/client/k23si_client.cpp index f00f675d..abafb1ae 100644 --- a/src/k2/module/k23si/client/k23si_client.cpp +++ b/src/k2/module/k23si/client/k23si_client.cpp @@ -280,8 +280,8 @@ seastar::future K2TxnHandle::end(bool shouldCommit) { return _heartbeat_timer.stop().then([this, s=std::move(status)] () { // TODO get min transaction time from TSO client auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - auto sleep = 50us - time_spent; + if (time_spent < 5us) { + auto sleep = 5us - time_spent; return seastar::sleep(sleep).then([s=std::move(s)] () { return seastar::make_ready_future(EndResult(std::move(s))); }); diff --git a/src/k2/module/k23si/client/k23si_client.h b/src/k2/module/k23si/client/k23si_client.h index 9a0e3ca6..873a1280 100644 --- a/src/k2/module/k23si/client/k23si_client.h +++ b/src/k2/module/k23si/client/k23si_client.h @@ -325,8 +325,8 @@ class K2TxnHandle { } //note we need not end heartbeat timer because we never started it for one hot transactions. auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - sleep = 50us - time_spent; + if (time_spent < 5us) { + sleep = 5us - time_spent; } } @@ -430,8 +430,8 @@ class K2TxnHandle { } //note we need not end heartbeat timer because we never started it for one hot transactions. auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - sleep = 50us - time_spent; + if (time_spent < 5us) { + sleep = 5us - time_spent; } } From 623e006847aba76838768faad24106931eede1fc Mon Sep 17 00:00:00 2001 From: ammuv Date: Mon, 9 Aug 2021 23:03:35 +0000 Subject: [PATCH 3/5] added one hot transactions --- src/k2/dto/K23SI.h | 9 ++- src/k2/module/k23si/Module.cpp | 37 ++++++++- src/k2/module/k23si/client/k23si_client.cpp | 12 ++- src/k2/module/k23si/client/k23si_client.h | 88 +++++++++++++++++---- 4 files changed, 117 insertions(+), 29 deletions(-) diff --git a/src/k2/dto/K23SI.h b/src/k2/dto/K23SI.h index 60f2aa4f..199dbaad 100644 --- a/src/k2/dto/K23SI.h +++ b/src/k2/dto/K23SI.h @@ -214,17 +214,18 @@ struct K23SIWriteRequest { Key key; // the key for the write SKVRecord::Storage value; // the value of the write std::vector fieldsForPartialUpdate; // if size() > 0 then this is a partial update + bool isonehot = false; // flag for one hot transactions K23SIWriteRequest() = default; K23SIWriteRequest(PVID _pvid, String cname, K23SI_MTR _mtr, Key _trh, String _trhCollection, bool _isDelete, bool _designateTRH, ExistencePrecondition _precondition, uint64_t id, Key _key, SKVRecord::Storage _value, - std::vector _fields) : + std::vector _fields, bool _isonehot=false) : pvid(std::move(_pvid)), collectionName(std::move(cname)), mtr(std::move(_mtr)), trh(std::move(_trh)), trhCollection(std::move(_trhCollection)), isDelete(_isDelete), designateTRH(_designateTRH), precondition(_precondition), request_id(id), - key(std::move(_key)), value(std::move(_value)), fieldsForPartialUpdate(std::move(_fields)) {} + key(std::move(_key)), value(std::move(_value)), fieldsForPartialUpdate(std::move(_fields)), isonehot(_isonehot) {} - K2_PAYLOAD_FIELDS(pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate); - K2_DEF_FMT(K23SIWriteRequest, pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate); + K2_PAYLOAD_FIELDS(pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate, isonehot); + K2_DEF_FMT(K23SIWriteRequest, pvid, collectionName, mtr, trh, trhCollection, isDelete, designateTRH, precondition, request_id, key, value, fieldsForPartialUpdate, isonehot); }; struct K23SIWriteResponse { diff --git a/src/k2/module/k23si/Module.cpp b/src/k2/module/k23si/Module.cpp index e74933cf..64c97ab6 100644 --- a/src/k2/module/k23si/Module.cpp +++ b/src/k2/module/k23si/Module.cpp @@ -1108,11 +1108,40 @@ K23SIPartitionModule::_processWrite(dto::K23SIWriteRequest&& request, FastDeadli } } + // build write_ranges for Transaction end request + std::unordered_map> write_ranges; // will be filled in later if WI is created successfully + + //endRequest created for calling EndTransaction for one hot transactions. + dto::K23SITxnEndRequest endRequest{request.pvid, + request.trhCollection, + request.trh, + request.mtr, + dto::EndAction::Commit, + std::move(write_ranges) + }; + + return seastar::do_with((dto::K23SITxnEndRequest)std::move(endRequest), (bool)request.isonehot, + [this, &request] (dto::K23SITxnEndRequest& endRequest, bool& isonehot) { + // all checks passed - we're ready to place this WI as the latest version + auto& vset = _indexer[request.key]; + auto status = _createWI(std::move(request), vset); + K2LOG_D(log::skvsvr, "WI creation with status {}", status); + + // not one hot transaction or WI creation failed + if(!isonehot || !status.is2xxOK()) { + return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + } - // all checks passed - we're ready to place this WI as the latest version - auto status = _createWI(std::move(request), vset); - K2LOG_D(log::skvsvr, "WI creation with status {}", status); - return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + endRequest.writeRanges[request.collectionName].insert(_partition().keyRangeV); // add keyRangeV of current partition (TRH) + + // End one hot transaction + return handleTxnEnd(std::move(endRequest)) + .then( [this] (auto&& response) { + auto& [endstatus, k2response] = response; + return RPCResponse(std::move(endstatus), dto::K23SIWriteResponse{}); // Return status after ending one hot transaction + }); + + }); } Status diff --git a/src/k2/module/k23si/client/k23si_client.cpp b/src/k2/module/k23si/client/k23si_client.cpp index 2994a67f..f00f675d 100644 --- a/src/k2/module/k23si/client/k23si_client.cpp +++ b/src/k2/module/k23si/client/k23si_client.cpp @@ -120,6 +120,7 @@ seastar::future> K2TxnHandle::read(dto::Key key, Stri _client->read_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest @@ -156,7 +157,7 @@ seastar::future> K2TxnHandle::read(dto::Key key, Stri std::unique_ptr K2TxnHandle::_makeWriteRequest(dto::SKVRecord& record, bool erase, - dto::ExistencePrecondition precondition) { + dto::ExistencePrecondition precondition, bool isonehot=false) { for (const String& key : record.partitionKeys) { if (key == "") { throw K23SIClientException("Partition key field not set for write request"); @@ -187,12 +188,13 @@ std::unique_ptr K2TxnHandle::_makeWriteRequest(dto::SKVR _client->write_ops, key, record.storage.share(), - std::vector() + std::vector(), + isonehot ); } std::unique_ptr K2TxnHandle::_makePartialUpdateRequest(dto::SKVRecord& record, - std::vector fieldsForPartialUpdate, dto::Key&& key) { + std::vector fieldsForPartialUpdate, dto::Key&& key, bool isonehot=false) { bool isTRH = !_trh_key.has_value(); if (isTRH) { _trh_key = key; @@ -211,7 +213,8 @@ std::unique_ptr K2TxnHandle::_makePartialUpdateRequest(d _client->write_ops, std::move(key), record.storage.share(), - fieldsForPartialUpdate + fieldsForPartialUpdate, + isonehot }); } @@ -556,6 +559,7 @@ seastar::future K2TxnHandle::query(Query& query) { _client->query_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest diff --git a/src/k2/module/k23si/client/k23si_client.h b/src/k2/module/k23si/client/k23si_client.h index 43617a32..9a0e3ca6 100644 --- a/src/k2/module/k23si/client/k23si_client.h +++ b/src/k2/module/k23si/client/k23si_client.h @@ -180,7 +180,7 @@ class K2TxnHandle { std::unique_ptr _makeReadRequest(const dto::Key& key, const String& collectionName) const; std::unique_ptr _makeWriteRequest(dto::SKVRecord& record, bool erase, - dto::ExistencePrecondition precondition); + dto::ExistencePrecondition precondition, bool isonehot); template std::unique_ptr _makeReadRequest(const T& user_record) const { @@ -191,7 +191,7 @@ class K2TxnHandle { } std::unique_ptr _makePartialUpdateRequest(dto::SKVRecord& record, - std::vector fieldsForPartialUpdate, dto::Key&& key); + std::vector fieldsForPartialUpdate, dto::Key&& key, bool isonehot); void _prepareQueryRequest(Query& query); @@ -247,6 +247,7 @@ class K2TxnHandle { _client->read_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest @@ -269,30 +270,39 @@ class K2TxnHandle { template seastar::future write(T& record, bool erase=false, - ExistencePrecondition precondition=ExistencePrecondition::None) { + ExistencePrecondition precondition=ExistencePrecondition::None, bool isonehot=false) { if (!_valid) { return seastar::make_exception_future(K23SIClientException("Invalid use of K2TxnHandle")); } if (_failed) { return seastar::make_ready_future(WriteResult(_failed_status, dto::K23SIWriteResponse())); } + if (isonehot) { + // this must be the only operation in the transaction + if (_total_ops>0) { + return seastar::make_exception_future(K23SIClientException("Invalid use of one hot transaction")); + } + // User is not allowed to call anything else on this TxnHandle after this operation since one hot transactions call end() on server side + _valid = false; + } std::unique_ptr request; if constexpr (std::is_same()) { - request = _makeWriteRequest(record, erase, precondition); + request = _makeWriteRequest(record, erase, precondition, isonehot); } else { SKVRecord skv_record(record.collectionName, record.schema); record.__writeFields(skv_record); - request = _makeWriteRequest(skv_record, erase, precondition); + request = _makeWriteRequest(skv_record, erase, precondition, isonehot); } _client->write_ops++; _ongoing_ops++; + _total_ops++; return _cpo_client->partitionRequest (_options.deadline, *request). - then([this, request=std::move(request)] (auto&& response) { + then([this, request=std::move(request), isonehot=std::move(isonehot)] (auto&& response) { auto& [status, k2response] = response; _registerRangeForWrite(status, *request); @@ -300,21 +310,38 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - if (status.is2xxOK() && !_heartbeat_timer.isArmed()) { + k2::CachedSteadyClock::duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + + if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful write"); K2LOG_D(log::skvclient, "Starting hb, mtr={}", _mtr); _heartbeat_interval = _cpo_client->collections[_trh_collection]->collection.metadata.heartbeatDeadline / 2; _makeHeartbeatTimer(); _heartbeat_timer.armPeriodic(_heartbeat_interval); } + else if(isonehot) { //handle one hot transactions separately and similar to what is done in end() + if (status.is2xxOK()) { + _client->successful_txns++; + } + //note we need not end heartbeat timer because we never started it for one hot transactions. + auto time_spent = Clock::now() - _start_time; + if (time_spent < 50us) { + sleep = 50us - time_spent; + } + } - return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + return seastar::do_with( (k2::CachedSteadyClock::duration)sleep, (dto::K23SIWriteResponse)std::move(k2response), [this,sleep=std::move(sleep),status=std::move(status)] (k2::CachedSteadyClock::duration& dur, dto::K23SIWriteResponse& k2response) { + return seastar::sleep(std::move(dur)) + .then([status=std::move(status),&k2response] () -> seastar::future { + return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + }); + }); }); } template seastar::future partialUpdate(T& record, std::vector fieldsName, - dto::Key key=dto::Key()) { + dto::Key key=dto::Key(), bool isonehot=false) { std::vector fieldsForPartialUpdate; bool find = false; for (std::size_t i = 0; i < fieldsName.size(); ++i) { @@ -330,21 +357,31 @@ class K2TxnHandle { PartialUpdateResult(dto::K23SIStatus::BadParameter("error parameter: fieldsForPartialUpdate")) ); } - return partialUpdate(record, std::move(fieldsForPartialUpdate), std::move(key)); + return partialUpdate(record, std::move(fieldsForPartialUpdate), std::move(key), isonehot); } template seastar::future partialUpdate(T& record, std::vector fieldsForPartialUpdate, - dto::Key key=dto::Key()) { + dto::Key key=dto::Key(), bool isonehot=false) { if (!_valid) { return seastar::make_exception_future(K23SIClientException("Invalid use of K2TxnHandle")); } if (_failed) { return seastar::make_ready_future(PartialUpdateResult(_failed_status)); } + if (isonehot) { + // this must be the only operation in the transaction + if (_total_ops>0) { + return seastar::make_exception_future(K23SIClientException("Invalid use of one hot transaction")); + } + // User is not allowed to call anything else on this TxnHandle after this operation since one hot transactions call end() on server side + _valid = false; + } + _client->write_ops++; _ongoing_ops++; + _total_ops++; std::unique_ptr request; if constexpr (std::is_same()) { @@ -352,7 +389,7 @@ class K2TxnHandle { key = record.getKey(); } - request = _makePartialUpdateRequest(record, fieldsForPartialUpdate, std::move(key)); + request = _makePartialUpdateRequest(record, fieldsForPartialUpdate, std::move(key), isonehot); } else { SKVRecord skv_record(record.collectionName, record.schema); record.__writeFields(skv_record); @@ -360,7 +397,7 @@ class K2TxnHandle { key = skv_record.getKey(); } - request = _makePartialUpdateRequest(skv_record, fieldsForPartialUpdate, std::move(key)); + request = _makePartialUpdateRequest(skv_record, fieldsForPartialUpdate, std::move(key), isonehot); } if (!request) { return seastar::make_ready_future ( @@ -370,7 +407,7 @@ class K2TxnHandle { return _cpo_client->partitionRequest (_options.deadline, *request). - then([this, request=std::move(request)] (auto&& response) { + then([this, request=std::move(request), isonehot=std::move(isonehot)] (auto&& response) { auto& [status, k2response] = response; _registerRangeForWrite(status, *request); @@ -378,15 +415,32 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - if (status.is2xxOK() && !_heartbeat_timer.isArmed()) { + k2::CachedSteadyClock::duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + + if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful partial update"); K2LOG_D(log::skvclient, "Starting hb, mtr={}", _mtr) _heartbeat_interval = _cpo_client->collections[_trh_collection]->collection.metadata.heartbeatDeadline / 2; _makeHeartbeatTimer(); _heartbeat_timer.armPeriodic(_heartbeat_interval); } + else if(isonehot) { //handle one hot transactions separately and similar to what is done in end() + if (status.is2xxOK()) { + _client->successful_txns++; + } + //note we need not end heartbeat timer because we never started it for one hot transactions. + auto time_spent = Clock::now() - _start_time; + if (time_spent < 50us) { + sleep = 50us - time_spent; + } + } - return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + return seastar::do_with( (k2::CachedSteadyClock::duration)sleep, [sleep=std::move(sleep),status=std::move(status)] (k2::CachedSteadyClock::duration& dur) { + return seastar::sleep(std::move(dur)) + .then([status=std::move(status)] () -> seastar::future { + return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + }); + }); }); } @@ -414,7 +468,7 @@ class K2TxnHandle { Duration _txn_end_deadline; TimePoint _start_time; uint64_t _ongoing_ops = 0; // Used to track if there are operations in flight when end() is called - + uint64_t _total_ops = 0; // Used to track number of operations in the txn and is used to determine if transaction is one hot Duration _heartbeat_interval; PeriodicTimer _heartbeat_timer; From a65d104377a30c33ed6acf02ae209c30a46b8cd3 Mon Sep 17 00:00:00 2001 From: ammuv Date: Wed, 11 Aug 2021 22:11:41 +0000 Subject: [PATCH 4/5] made changes to YCSB load and error codes and added one hot transactions to YCSB --- src/k2/cmd/ycsb/data.h | 10 +++++----- src/k2/cmd/ycsb/transactions.h | 20 +++++++++++++++----- src/k2/cmd/ycsb/ycsb_client.cpp | 3 ++- src/k2/module/k23si/client/k23si_client.cpp | 4 ++-- src/k2/module/k23si/client/k23si_client.h | 8 ++++---- 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/k2/cmd/ycsb/data.h b/src/k2/cmd/ycsb/data.h index 35678516..afeb8a50 100644 --- a/src/k2/cmd/ycsb/data.h +++ b/src/k2/cmd/ycsb/data.h @@ -145,7 +145,7 @@ class YCSBData{ }; // function to write the given YCSB Data row -seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool erase = false, ExistencePrecondition precondition = ExistencePrecondition::None) +seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool erase = false, ExistencePrecondition precondition = ExistencePrecondition::None, bool isonehot=false) { dto::SKVRecord skv_record(YCSBData::collectionName, YCSBData::schema); // create SKV record @@ -153,7 +153,7 @@ seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool eras skv_record.serializeNext(field); // add fields to SKV record } - return txn.write(skv_record, erase, precondition).then([] (WriteResult&& result) { + return txn.write(skv_record, erase, precondition, isonehot).then([] (WriteResult&& result) { if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // k2::Statuses::S412_Precondition_Failed for write with erase=false and key already exists and k2::Statuses::S404_Not_Found for write with erase=true and key does not exist K2LOG_D(log::ycsb, "writeRow failed and is retryable: {}", result.status); return seastar::make_exception_future(std::runtime_error("writeRow failed!")); @@ -165,7 +165,7 @@ seastar::future writeRow(YCSBData& row, K2TxnHandle& txn, bool eras // function to update the partial fields for a YCSB Data row seastar::future -partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector fieldsToUpdate, K2TxnHandle& txn) { +partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector fieldsToUpdate, K2TxnHandle& txn, bool isonehot=false) { dto::SKVRecord skv_record(YCSBData::collectionName, YCSBData::schema); // create SKV record @@ -182,8 +182,8 @@ partialUpdateRow(uint32_t keyid, std::vector fieldValues, std::vector(skv_record, std::move(fieldsToUpdate)).then([] (PartialUpdateResult&& result) { - if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // 412 for precondition of exists not satisfied and 404 for key not found + return txn.partialUpdate(skv_record, std::move(fieldsToUpdate), dto::Key(), isonehot).then([] (PartialUpdateResult&& result) { + if (!result.status.is2xxOK() && result.status.code!=412 && result.status.code!=404) { // 412 for precondition of Exists not satisfied and 404 for key not found K2LOG_D(log::ycsb, "partialUpdateRow failed: {}", result.status); return seastar::make_exception_future(std::runtime_error("partialUpdateRow failed!")); } diff --git a/src/k2/cmd/ycsb/transactions.h b/src/k2/cmd/ycsb/transactions.h index 7dec3439..3fd408c5 100644 --- a/src/k2/cmd/ycsb/transactions.h +++ b/src/k2/cmd/ycsb/transactions.h @@ -90,7 +90,7 @@ class YCSBTxn { YCSBTxn(RandomContext& random, K23SIClient& client, std::shared_ptr requestDist, std::shared_ptr scanLengthDist) : - _random(random), _client(client), _failed(false), _requestDist(requestDist), _scanLengthDist(scanLengthDist) {} + _random(random), _client(client), _failed(false), _requestDist(requestDist), _scanLengthDist(scanLengthDist), _onehot(false) {} seastar::future attempt() { K2TxnOptions options{}; @@ -164,6 +164,9 @@ class YCSBTxn { fut.ignore_ready_future(); K2LOG_D(log::ycsb, "Txn finished"); + if(_onehot) // one hot transaction and succeeded + return seastar::make_ready_future(EndResult(Statuses::S200_OK("one hot transaction ended successfully"))); + return _txn.end(true); }).then_wrapped([this] (auto&& fut) { if (fut.failed()) { @@ -216,7 +219,8 @@ class YCSBTxn { cur++; } - return partialUpdateRow(_keyid,std::move(fieldValues),std::move(fieldsToUpdate),_txn).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return partialUpdateRow(_keyid,std::move(fieldValues),std::move(fieldsToUpdate),_txn, _onehot).discard_result(); } seastar::future<> scanOperation(){ @@ -269,7 +273,8 @@ class YCSBTxn { if(_requestDistName()!="latest") { K2LOG_D(log::ycsb, "Insert operation started for keyid {}", _keyid); YCSBData row(_keyid, _random); // generate row - return writeRow(row, _txn).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, false, ExistencePrecondition::None, _onehot).discard_result(); } _keyid = _requestDist->getMaxValue()+1; @@ -277,7 +282,9 @@ class YCSBTxn { //handle latest distribution separately to identify insert fails and increment latest known record max key value K2LOG_D(log::ycsb, "Insert operation started for keyid {}", _keyid); - return writeRow(row, _txn, false, ExistencePrecondition::NotExists) // if record with given key already exists must return error so we set precondition to Exists + + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, false, ExistencePrecondition::NotExists, _onehot) // if record with given key already exists must return error so we set precondition to NotExists .then_wrapped([this] (auto&& fut) { if (fut.failed()) { fut.ignore_ready_future(); @@ -301,7 +308,8 @@ class YCSBTxn { seastar::future<> deleteOperation(){ K2LOG_D(log::ycsb, "Delete operation started for keyid {}", _keyid); YCSBData row(_keyid); // generate row - return writeRow(row, _txn, true).discard_result(); + _onehot = _ops_per_txn()==1?_isonehot():false; // flag for one-hot transactions + return writeRow(row, _txn, true, ExistencePrecondition::None, _onehot).discard_result(); } Query _query_scan; @@ -317,6 +325,7 @@ class YCSBTxn { std::shared_ptr _requestDist; // Request distribution for selecting keys std::shared_ptr _scanLengthDist; // Request distribution for selecting length of scan uint64_t _insertMissesLatest{0}; // number of inserts missed when request distribution is Latest distribution + bool _onehot; // set to true if one hot transaction private: ConfigVar _ops_per_txn{"ops_per_txn"}; @@ -324,4 +333,5 @@ class YCSBTxn { ConfigVar _num_fields{"num_fields"}; ConfigVar _max_fields_update{"max_fields_update"}; ConfigVar _requestDistName{"request_dist"}; + ConfigVar _isonehot{"isonehot"}; }; diff --git a/src/k2/cmd/ycsb/ycsb_client.cpp b/src/k2/cmd/ycsb/ycsb_client.cpp index dda72839..33e61c37 100644 --- a/src/k2/cmd/ycsb/ycsb_client.cpp +++ b/src/k2/cmd/ycsb/ycsb_client.cpp @@ -446,7 +446,8 @@ int main(int argc, char** argv) {; ("delete_proportion",bpo::value()->default_value(0), "Delete Proportion") ("max_scan_length",bpo::value()->default_value(10), "Maximum scan length") ("max_fields_update",bpo::value()->default_value(1), "Maximum number of fields to update") - ("ops_per_txn",bpo::value()->default_value(1), "The number of operations per transaction"); + ("ops_per_txn",bpo::value()->default_value(1), "The number of operations per transaction") + ("isonehot",bpo::value()->default_value(true),"Whether to use one hot transactions (for transactions with one write/update) or not"); app.addApplet(); app.addApplet(); diff --git a/src/k2/module/k23si/client/k23si_client.cpp b/src/k2/module/k23si/client/k23si_client.cpp index f00f675d..abafb1ae 100644 --- a/src/k2/module/k23si/client/k23si_client.cpp +++ b/src/k2/module/k23si/client/k23si_client.cpp @@ -280,8 +280,8 @@ seastar::future K2TxnHandle::end(bool shouldCommit) { return _heartbeat_timer.stop().then([this, s=std::move(status)] () { // TODO get min transaction time from TSO client auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - auto sleep = 50us - time_spent; + if (time_spent < 5us) { + auto sleep = 5us - time_spent; return seastar::sleep(sleep).then([s=std::move(s)] () { return seastar::make_ready_future(EndResult(std::move(s))); }); diff --git a/src/k2/module/k23si/client/k23si_client.h b/src/k2/module/k23si/client/k23si_client.h index 9a0e3ca6..873a1280 100644 --- a/src/k2/module/k23si/client/k23si_client.h +++ b/src/k2/module/k23si/client/k23si_client.h @@ -325,8 +325,8 @@ class K2TxnHandle { } //note we need not end heartbeat timer because we never started it for one hot transactions. auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - sleep = 50us - time_spent; + if (time_spent < 5us) { + sleep = 5us - time_spent; } } @@ -430,8 +430,8 @@ class K2TxnHandle { } //note we need not end heartbeat timer because we never started it for one hot transactions. auto time_spent = Clock::now() - _start_time; - if (time_spent < 50us) { - sleep = 50us - time_spent; + if (time_spent < 5us) { + sleep = 5us - time_spent; } } From 4d7f75e4c5a5cc50c699bafee09b8634060f1d99 Mon Sep 17 00:00:00 2001 From: ammuv Date: Mon, 23 Aug 2021 17:58:20 +0000 Subject: [PATCH 5/5] PR changes --- src/k2/module/k23si/Module.cpp | 39 ++++++++++------------- src/k2/module/k23si/client/k23si_client.h | 22 +++++++------ 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/src/k2/module/k23si/Module.cpp b/src/k2/module/k23si/Module.cpp index 64c97ab6..d6cd99b2 100644 --- a/src/k2/module/k23si/Module.cpp +++ b/src/k2/module/k23si/Module.cpp @@ -1108,40 +1108,35 @@ K23SIPartitionModule::_processWrite(dto::K23SIWriteRequest&& request, FastDeadli } } - // build write_ranges for Transaction end request - std::unordered_map> write_ranges; // will be filled in later if WI is created successfully - //endRequest created for calling EndTransaction for one hot transactions. dto::K23SITxnEndRequest endRequest{request.pvid, request.trhCollection, request.trh, request.mtr, dto::EndAction::Commit, - std::move(write_ranges) + {} }; - return seastar::do_with((dto::K23SITxnEndRequest)std::move(endRequest), (bool)request.isonehot, - [this, &request] (dto::K23SITxnEndRequest& endRequest, bool& isonehot) { - // all checks passed - we're ready to place this WI as the latest version - auto& vset = _indexer[request.key]; - auto status = _createWI(std::move(request), vset); - K2LOG_D(log::skvsvr, "WI creation with status {}", status); + bool isonehot = request.isonehot; + auto collection = request.collection; - // not one hot transaction or WI creation failed - if(!isonehot || !status.is2xxOK()) { - return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); - } + // all checks passed - we're ready to place this WI as the latest version + auto status = _createWI(std::move(request), vset); + K2LOG_D(log::skvsvr, "WI creation with status {}", status); - endRequest.writeRanges[request.collectionName].insert(_partition().keyRangeV); // add keyRangeV of current partition (TRH) + // not one hot transaction or WI creation failed + if(!isonehot || !status.is2xxOK()) { + return RPCResponse(std::move(status), dto::K23SIWriteResponse{}); + } - // End one hot transaction - return handleTxnEnd(std::move(endRequest)) - .then( [this] (auto&& response) { - auto& [endstatus, k2response] = response; - return RPCResponse(std::move(endstatus), dto::K23SIWriteResponse{}); // Return status after ending one hot transaction - }); + endRequest.writeRanges[collection].insert(_partition().keyRangeV); // add keyRangeV of current partition (TRH) - }); + // End one hot transaction + return handleTxnEnd(std::move(endRequest)) + .then( [this] (auto&& response) { + auto& [endstatus, k2response] = response; + return RPCResponse(std::move(endstatus), dto::K23SIWriteResponse{}); // Return status after ending one hot transaction + }); } Status diff --git a/src/k2/module/k23si/client/k23si_client.h b/src/k2/module/k23si/client/k23si_client.h index 873a1280..cdbf2b82 100644 --- a/src/k2/module/k23si/client/k23si_client.h +++ b/src/k2/module/k23si/client/k23si_client.h @@ -310,7 +310,7 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - k2::CachedSteadyClock::duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + k2::Duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful write"); @@ -330,12 +330,13 @@ class K2TxnHandle { } } - return seastar::do_with( (k2::CachedSteadyClock::duration)sleep, (dto::K23SIWriteResponse)std::move(k2response), [this,sleep=std::move(sleep),status=std::move(status)] (k2::CachedSteadyClock::duration& dur, dto::K23SIWriteResponse& k2response) { - return seastar::sleep(std::move(dur)) - .then([status=std::move(status),&k2response] () -> seastar::future { + if(sleep==0us) { + return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); + } + return seastar::sleep(std::move(sleep)) + .then([status=std::move(status),k2response=std::move(k2response)] () mutable { return seastar::make_ready_future(WriteResult(std::move(status), std::move(k2response))); }); - }); }); } @@ -415,7 +416,7 @@ class K2TxnHandle { _checkResponseStatus(status); _ongoing_ops--; - k2::CachedSteadyClock::duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn + k2::Duration sleep= 0us; // time to sleep before we return to satisfy min time_spent in txn if (!isonehot && status.is2xxOK() && !_heartbeat_timer.isArmed()) { K2ASSERT(log::skvclient, _cpo_client->collections.find(_trh_collection) != _cpo_client->collections.end(), "collection not present after successful partial update"); @@ -435,12 +436,13 @@ class K2TxnHandle { } } - return seastar::do_with( (k2::CachedSteadyClock::duration)sleep, [sleep=std::move(sleep),status=std::move(status)] (k2::CachedSteadyClock::duration& dur) { - return seastar::sleep(std::move(dur)) - .then([status=std::move(status)] () -> seastar::future { + if(sleep==0us) { + return seastar::make_ready_future(PartialUpdateResult(std::move(status))); + } + return seastar::sleep(std::move(sleep)) + .then([status=std::move(status)] () { return seastar::make_ready_future(PartialUpdateResult(std::move(status))); }); - }); }); }