Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/rmq/rmqio/rmqio_asioconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ AsioConnection<SocketType>::prepareBuffer()
template <typename SocketType>
bool AsioConnection<SocketType>::doRead(bsl::size_t bytes_transferred)
{

bool success = true;

// d_inbound is setup with a buffer size of maxFrameSize in ::prepareBuffer
Expand All @@ -444,20 +443,21 @@ bool AsioConnection<SocketType>::doRead(bsl::size_t bytes_transferred)
bsl::size_t bytes_decoded = 0;
boost::asio::streambuf::const_buffers_type bufs = d_inbound->data();
bsl::vector<rmqamqpt::Frame> readFrames;
for (boost::asio::streambuf::const_buffers_type::const_iterator i =
bufs.begin();
i != bufs.end();
++i) {
boost::asio::const_buffer buf(*i);
Decoder::ReturnCode rcode =
d_frameDecoder->appendBytes(&readFrames, buf.data(), buf.size());

for (const boost::asio::const_buffer* it =
boost::asio::buffer_sequence_begin(bufs);
it != boost::asio::buffer_sequence_end(bufs);
++it) {
const boost::asio::const_buffer& buffer = *it;
Decoder::ReturnCode rcode = d_frameDecoder->appendBytes(
&readFrames, buffer.data(), buffer.size());
if (rcode != Decoder::OK) {
BALL_LOG_WARN << "Bad rcode from decoder: " << rcode;
// Fail but we still want to process frames we were able to decode
success = false;
break;
};
bytes_decoded += buf.size();
break; // Exit the loop on error
}
bytes_decoded += buffer.size();
}

if (bytes_decoded != bytes_transferred) {
Expand Down
11 changes: 9 additions & 2 deletions src/rmq/rmqio/rmqio_asioeventloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,15 @@ void AsioEventLoop::onThreadStarted()
d_condition.broadcast();
}

void AsioEventLoop::postImpl(const Item& item) { d_context.post(item); }
void AsioEventLoop::dispatchImpl(const Item& item) { d_context.dispatch(item); }
void AsioEventLoop::postImpl(const Item& item)
{
boost::asio::post(d_context, item);
}

void AsioEventLoop::dispatchImpl(const Item& item)
{
boost::asio::dispatch(d_context, item);
}

bsl::shared_ptr<rmqio::Resolver>
AsioEventLoop::resolver(bool shuffleConnectionEndpoints)
Expand Down
8 changes: 3 additions & 5 deletions src/tests/rmqa/rmqa_connectionstring.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include <rmqa_connectionstring.h>

#include <rmqtestutil_testsuite.t.h>

#include <bsl_string.h>

#include <gmock/gmock.h>
Expand Down Expand Up @@ -98,10 +100,6 @@ const ConnectionStringTestCase k_CONN_STRING_TESTS[] = {
{"amqp://rabbit1.:", false, "", "", "", "", "", ""},
};

// We need to stick to INSTANTIATE_TEST_CASE_P for a while longer
// But we do want to build with -Werror in our CI
#pragma GCC diagnostic warning "-Wdeprecated-declarations"

INSTANTIATE_TEST_CASE_P(ConnectionStringTests,
RMQTESTUTIL_TESTSUITE_P(ConnectionStringTests,
ConnectionStringPTests,
testing::ValuesIn(k_CONN_STRING_TESTS));
7 changes: 2 additions & 5 deletions src/tests/rmqa/rmqa_consumerimpl.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rmqtestutil_mockchannel.t.h>
#include <rmqtestutil_mockeventloop.t.h>
#include <rmqtestutil_savethreadid.h>
#include <rmqtestutil_testsuite.t.h>

#include <rmqt_consumerackbatch.h>
#include <rmqt_envelope.h>
Expand Down Expand Up @@ -449,11 +450,7 @@ TEST_P(ConsumerImplTests, UpdateCallbackFromTwoThreadsAtOnce)
EXPECT_TRUE(future2.blockResult());
}

// We need to stick to INSTANTIATE_TEST_CASE_P for a while longer
// But we do want to build with -Werror in our CI
#pragma GCC diagnostic warning "-Wdeprecated-declarations"

INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
ConsumerImplTests,
Values(CONSUMER, TRACING_CONSUMER),
ConsumerImplTests::PrintParamName());
17 changes: 7 additions & 10 deletions src/tests/rmqa/rmqa_producerimpl.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <rmqtestutil_mockchannel.t.h>
#include <rmqtestutil_mockeventloop.t.h>
#include <rmqtestutil_savethreadid.h>
#include <rmqtestutil_testsuite.t.h>

#include <rmqp_producer.h>
#include <rmqt_confirmresponse.h>
Expand Down Expand Up @@ -815,32 +816,28 @@ TEST_P(TracingProducerImplTests, SendConfirmCallsTracing)
d_threadPool.drain();
}

// We need to stick to INSTANTIATE_TEST_CASE_P for a while longer
// But we do want to build with -Werror in our CI
#pragma GCC diagnostic warning "-Wdeprecated-declarations"

INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
ProducerImplTests,
Values(PRODUCER, TRACING_PRODUCER),
ProducerImplTests::PrintParamName());
INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
ProducerImplConfirmTypeTests,
Values(PRODUCER, TRACING_PRODUCER),
ProducerImplTests::PrintParamName());
INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
ProducerImplCallbackLifetimeTests,
Values(PRODUCER, TRACING_PRODUCER),
ProducerImplTests::PrintParamName());
INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
ProducerImplMaxOutstandingTests,
Values(PRODUCER, TRACING_PRODUCER),
ProducerImplTests::PrintParamName());
INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
ProducerImplUpdateTopology,
Values(PRODUCER, TRACING_PRODUCER),
ProducerImplTests::PrintParamName());

INSTANTIATE_TEST_CASE_P(AllMembers,
RMQTESTUTIL_TESTSUITE_P(AllMembers,
TracingProducerImplTests,
Values(TRACING_PRODUCER),
ProducerImplTests::PrintParamName());
10 changes: 6 additions & 4 deletions src/tests/rmqamqp/rmqamqp_connection.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class MockConnection : public rmqio::Connection {

BSLS_ASSERT_OPT(rc == Frame::OK);

d_eventLoop.post(
boost::asio::post(
d_eventLoop,
bdlf::BindUtil::bind(d_connectionCallbacks.onRead, decoded));
}
}
Expand All @@ -156,7 +157,8 @@ class MockConnection : public rmqio::Connection {
{
BALL_LOG_TRACE << "MockConnection close";

d_eventLoop.post(bdlf::BindUtil::bind(cb, GRACEFUL_DISCONNECT));
boost::asio::post(d_eventLoop,
bdlf::BindUtil::bind(cb, GRACEFUL_DISCONNECT));
}

void asyncWriteImpl(
Expand All @@ -175,7 +177,7 @@ class MockConnection : public rmqio::Connection {
rmqamqpt::Method(
rmqamqpt::ConnectionMethod(rmqamqpt::ConnectionCloseOk())));

d_eventLoop.post(callback);
boost::asio::post(d_eventLoop, callback);

if (!closeOk) {
feedNextFrame();
Expand Down Expand Up @@ -301,7 +303,7 @@ ACTION_P3(ConnectMockConnection, mockConnectPtrPtr, replayFrame, eventLoop)

ON_CALL(**mockConnectPtrPtr, isConnected()).WillByDefault(Return(true));

eventLoop.get().post(arg4);
boost::asio::post(eventLoop.get(), arg4);

return *mockConnectPtrPtr;
}
Expand Down
1 change: 1 addition & 0 deletions src/tests/rmqamqpt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ add_executable(rmqamqpt_tests
)

target_link_libraries(rmqamqpt_tests PUBLIC
rmqtestutil
rmqamqpt
rmqamqp
$<TARGET_PROPERTY:rmqamqp,LINK_LIBRARIES>
Expand Down
9 changes: 3 additions & 6 deletions src/tests/rmqamqpt/rmqamqpt_fieldvalue.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
// limitations under the License.

#include <rmqamqpt_fieldvalue.h>
#include <rmqamqpt_types.h>
#include <rmqamqpt_writer.h>

#include <rmqamqpt_types.h>
#include <rmqtestutil_testsuite.t.h>

#include <gmock/gmock.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -238,10 +239,6 @@ const FieldValueTestCase k_FV_SIZE_TEST_CASES[] = {
{rmqt::FieldValue(bsl::make_shared<rmqt::FieldArray>()), 5}};
} // namespace

// We need to stick to INSTANTIATE_TEST_CASE_P for a while longer
// But we do want to build with -Werror in our CI
#pragma GCC diagnostic warning "-Wdeprecated-declarations"

INSTANTIATE_TEST_CASE_P(FieldValueTests,
RMQTESTUTIL_TESTSUITE_P(FieldValueTests,
FieldValuePTests,
testing::ValuesIn(k_FV_SIZE_TEST_CASES));
63 changes: 23 additions & 40 deletions src/tests/rmqamqpt/rmqamqpt_types.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ const uint8_t duplicateKeyFieldTable[] = {
0x74, 0x61, 0x6d, 0x70, 0x5f, 0x69, 0x6e, 0x5f, 0x6d, 0x73, 0x6c, 0x00,
0x00, 0x01, 0x7a, 0x8c, 0x5f, 0x30, 0x6c};

typedef bsl::basic_stringstream<uint8_t> StringStream;
typedef bsl::basic_string<uint8_t> StringStreamBufferType;
typedef bsl::vector<uint8_t> BufferType;

} // namespace
Expand Down Expand Up @@ -280,9 +278,7 @@ TEST(TypesEncoding, ShouldRoundTripShortStringCorrectly)
// WHEN
rmqamqpt::Types::encodeShortString(writer, shortString);

StringStreamBufferType data(storage.data(), storage.size());

rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
bsl::string resultString;
bool result = rmqamqpt::Types::decodeShortString(&resultString, &buffer);

Expand Down Expand Up @@ -337,8 +333,7 @@ TEST(TypesEncoding, ShouldRoundTripLongStringCorrectly)
// WHEN
rmqamqpt::Types::encodeLongString(writer, longString);

StringStreamBufferType data(storage.data(), storage.size());
rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
bsl::string resultString;
bool result = rmqamqpt::Types::decodeLongString(&resultString, &buffer);

Expand All @@ -359,8 +354,7 @@ TEST(TypesEncoding, ShouldRoundTripByteVectorCorrectly)
// WHEN
rmqamqpt::Types::encodeByteVector(writer, byteVector);

StringStreamBufferType data(storage.data(), storage.size());
rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.begin(), storage.size());
BufferType resultVector;
bool result = rmqamqpt::Types::decodeByteVector(
&resultVector, &buffer, byteVector.size());
Expand All @@ -381,8 +375,7 @@ TEST(TypesEncoding, ShouldRoundTripFieldValueBoolCorrectly)
// WHEN
rmqamqpt::Types::encodeFieldValue(writer, fieldValue);

StringStreamBufferType data(storage.data(), storage.size());
rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
rmqt::FieldValue resultFieldValue(false);
const bool result =
rmqamqpt::Types::decodeFieldValue(&resultFieldValue, &buffer);
Expand All @@ -409,16 +402,15 @@ TEST(TypesEncoding, ShouldRoundTripFieldValueByteArrayCorrectly)
// WHEN
rmqamqpt::Types::encodeFieldValue(writer, fieldValue);

StringStreamBufferType data(storage.data(), storage.size());
const bsl::vector<bsl::uint8_t> actualData(data.begin(), data.end());
const bsl::vector<bsl::uint8_t> actualData(storage.begin(), storage.end());

// THEN
const bsl::vector<bsl::uint8_t> expectedData(bsl::begin(expectedBytes),
bsl::end(expectedBytes));
ASSERT_EQ(actualData, expectedData);

// WHEN
rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
rmqt::FieldValue resultFieldValue;
const bool result =
rmqamqpt::Types::decodeFieldValue(&resultFieldValue, &buffer);
Expand Down Expand Up @@ -455,15 +447,13 @@ TEST(TypesEncoding, ShouldRoundTripFieldValueArrayCorrectly)
// WHEN
rmqamqpt::Types::encodeFieldValue(writer, fieldValue);

StringStreamBufferType data(storage.data(), storage.size());

EXPECT_EQ(sizeof(expectedBytes), data.size());
EXPECT_EQ(sizeof(expectedBytes), storage.size());

for (bsl::size_t i = 0; i < data.size(); i++) {
EXPECT_EQ(expectedBytes[i], data[i]);
for (bsl::size_t i = 0; i < storage.size(); i++) {
EXPECT_EQ(expectedBytes[i], storage[i]);
}

rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
rmqt::FieldValue resultFieldValue;
const bool result =
rmqamqpt::Types::decodeFieldValue(&resultFieldValue, &buffer);
Expand Down Expand Up @@ -533,14 +523,12 @@ TEST(TypesEncoding, ShouldRoundTripFieldValueArrayWithFloatCorrectly)
// WHEN
rmqamqpt::Types::encodeFieldValue(writer, fieldValue);

StringStreamBufferType data(storage.data(), storage.size());

const BufferType expectedBytesArray(bsl::begin(expectedBytes),
bsl::end(expectedBytes));
const BufferType actualBytesArray(bsl::begin(data), bsl::end(data));
const BufferType actualBytesArray(bsl::begin(storage), bsl::end(storage));
EXPECT_EQ(expectedBytesArray, actualBytesArray);

rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
rmqt::FieldValue resultFieldValue;
const bool result =
rmqamqpt::Types::decodeFieldValue(&resultFieldValue, &buffer);
Expand Down Expand Up @@ -576,8 +564,7 @@ TEST(TypesEncoding, ShouldRoundTripFieldArrayStringsAndBytesCorrectly)
// WHEN
rmqamqpt::Types::encodeFieldArray(writer, fieldArray);

StringStreamBufferType data(storage.data(), storage.size());
rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
rmqt::FieldArray resultFieldArray;
bool result = rmqamqpt::Types::decodeFieldArray(&resultFieldArray, &buffer);

Expand All @@ -597,12 +584,11 @@ TEST(TypesEncoding, EmptyFieldTable)

rmqamqpt::Types::encodeFieldTable(writer, empty);

StringStreamBufferType str(storage.data(), storage.size());

EXPECT_EQ(str[0], 0);
EXPECT_EQ(str[1], 0);
EXPECT_EQ(str[2], 0);
EXPECT_EQ(str[3], 0);
EXPECT_EQ(storage.size(), 4);
EXPECT_EQ(storage[0], 0);
EXPECT_EQ(storage[1], 0);
EXPECT_EQ(storage[2], 0);
EXPECT_EQ(storage[3], 0);
}

TEST(TypesEncoding, Timestamp)
Expand All @@ -614,18 +600,16 @@ TEST(TypesEncoding, Timestamp)

rmqamqpt::Types::encodeTimestamp(writer, millennium);

StringStreamBufferType str(storage.data(), storage.size());

bdlb::BigEndianInt64 expected = bdlb::BigEndianInt64::make(946684800);
bsl::uint8_t* inspect = reinterpret_cast<bsl::uint8_t*>(&expected);

EXPECT_EQ(str.size(), 8);
for (bsl::size_t i = 0; i < str.size(); ++i) {
EXPECT_THAT(str[i], Eq(inspect[i]));
EXPECT_EQ(storage.size(), 8);
for (bsl::size_t i = 0; i < storage.size(); ++i) {
EXPECT_THAT(storage[i], Eq(inspect[i]));
}

bdlt::Datetime decoded;
rmqamqpt::Buffer buf(str.begin(), str.size());
rmqamqpt::Buffer buf(storage.data(), storage.size());
rmqamqpt::Types::decodeTimestamp(&decoded, &buf), Eq(true);

EXPECT_THAT(decoded, Eq(millennium));
Expand All @@ -639,8 +623,7 @@ rmqt::FieldValue roundTripFieldValue(bool* decodeResult,

rmqamqpt::Types::encodeFieldValue(writer, fv);

StringStreamBufferType data(storage.data(), storage.size());
rmqamqpt::Buffer buffer(data.begin(), data.size());
rmqamqpt::Buffer buffer(storage.data(), storage.size());
rmqt::FieldValue resultFieldValue(false);
*decodeResult =
rmqamqpt::Types::decodeFieldValue(&resultFieldValue, &buffer);
Expand Down
Loading