diff --git a/src/metadata.cpp b/src/metadata.cpp index 867ae329a..d10b6847c 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -788,7 +788,7 @@ std::string Metadata::full_function_name(const std::string& name, const StringVe Metadata::SchemaSnapshot Metadata::schema_snapshot(int protocol_version, const VersionNumber& cassandra_version) const { ScopedMutex l(&mutex_); - return SchemaSnapshot(schema_snapshot_version_, + return SchemaSnapshot(schema_snapshot_version_.load(), protocol_version, cassandra_version, front_.keyspaces(), @@ -796,7 +796,7 @@ Metadata::SchemaSnapshot Metadata::schema_snapshot(int protocol_version, const V } void Metadata::update_keyspaces(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -807,7 +807,7 @@ void Metadata::update_keyspaces(int protocol_version, const VersionNumber& cassa } void Metadata::update_tables(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -818,7 +818,7 @@ void Metadata::update_tables(int protocol_version, const VersionNumber& cassandr } void Metadata::update_views(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -829,7 +829,7 @@ void Metadata::update_views(int protocol_version, const VersionNumber& cassandra } void Metadata::update_columns(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -846,7 +846,7 @@ void Metadata::update_columns(int protocol_version, const VersionNumber& cassand } void Metadata::update_indexes(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -857,7 +857,7 @@ void Metadata::update_indexes(int protocol_version, const VersionNumber& cassand } void Metadata::update_user_types(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -868,7 +868,7 @@ void Metadata::update_user_types(int protocol_version, const VersionNumber& cass } void Metadata::update_functions(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -879,7 +879,7 @@ void Metadata::update_functions(int protocol_version, const VersionNumber& cassa } void Metadata::update_aggregates(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -891,12 +891,12 @@ void Metadata::update_aggregates(int protocol_version, const VersionNumber& cass void Metadata::update_partitions(int protocol_version, const VersionNumber& cassandra_version, ResultResponse* result) { ScopedMutex l(&mutex_); - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); updating_->update_partitions(protocol_version, cassandra_version, cache_, result); } void Metadata::drop_keyspace(const std::string& keyspace_name) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -907,7 +907,7 @@ void Metadata::drop_keyspace(const std::string& keyspace_name) { } void Metadata::drop_table_or_view(const std::string& keyspace_name, const std::string& table_or_view_name) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -918,7 +918,7 @@ void Metadata::drop_table_or_view(const std::string& keyspace_name, const std::s } void Metadata::drop_user_type(const std::string& keyspace_name, const std::string& type_name) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -929,7 +929,7 @@ void Metadata::drop_user_type(const std::string& keyspace_name, const std::strin } void Metadata::drop_function(const std::string& keyspace_name, const std::string& full_function_name) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -940,7 +940,7 @@ void Metadata::drop_function(const std::string& keyspace_name, const std::string } void Metadata::drop_aggregate(const std::string& keyspace_name, const std::string& full_aggregate_name) { - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); if (is_front_buffer()) { ScopedMutex l(&mutex_); @@ -952,23 +952,27 @@ void Metadata::drop_aggregate(const std::string& keyspace_name, const std::strin void Metadata::clear_and_update_back(const VersionNumber& cassandra_version) { back_.clear(); + // ASAN revealed that clear_and_update_back may race with update_partitions. + // Obtain lock via mutex_ to guard updating_. + // There shouldn't be deadlock because the caller doesn't hold other lock. + ScopedMutex l(&mutex_); updating_ = &back_; } void Metadata::swap_to_back_and_update_front() { { ScopedMutex l(&mutex_); - schema_snapshot_version_++; + schema_snapshot_version_.fetch_add(1); front_.swap(back_); + updating_ = &front_; } back_.clear(); - updating_ = &front_; } void Metadata::clear() { { ScopedMutex l(&mutex_); - schema_snapshot_version_ = 0; + schema_snapshot_version_.store(0); front_.clear(); } back_.clear(); diff --git a/src/metadata.hpp b/src/metadata.hpp index c92dd20ac..4eb26e0ed 100644 --- a/src/metadata.hpp +++ b/src/metadata.hpp @@ -831,7 +831,7 @@ class Metadata { InternalData front_; InternalData back_; - uint32_t schema_snapshot_version_; + Atomic schema_snapshot_version_; // This lock prevents partial snapshots when updating metadata mutable uv_mutex_t mutex_;