diff --git a/.github/actions/install-essential-dependencies/action.yml b/.github/actions/install-essential-dependencies/action.yml index 3411b7f7c1..d6c5da96c1 100644 --- a/.github/actions/install-essential-dependencies/action.yml +++ b/.github/actions/install-essential-dependencies/action.yml @@ -3,5 +3,5 @@ runs: steps: - run: ulimit -c unlimited -S && sudo bash -c "echo 'core.%e.%p' > /proc/sys/kernel/core_pattern" shell: bash - - run: sudo apt-get install -y git g++ make libssl-dev libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev + - run: sudo apt-get update && sudo apt-get install -y git g++ make libssl-dev libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev shell: bash diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp index e635668373..c9b9d6baa0 100644 --- a/src/bthread/rwlock.cpp +++ b/src/bthread/rwlock.cpp @@ -17,273 +17,166 @@ #include "bvar/collector.h" #include "bthread/rwlock.h" +#include "bthread/mutex.h" #include "bthread/butex.h" namespace bthread { -// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock, -// which is a bthread implementation of golang RWMutex. -// The lock can be held by an arbitrary number of readers or a single writer. -// For details, see https://github.com/golang/go/blob/master/src/sync/rwmutex.go - -// Define in bthread/mutex.cpp -class ContentionProfiler; -extern ContentionProfiler* g_cp; -extern bvar::CollectorSpeedLimit g_cp_sl; -extern bool is_contention_site_valid(const bthread_contention_site_t& cs); -extern void make_contention_site_invalid(bthread_contention_site_t* cs); -extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns); - -// It is enough for readers. If the reader exceeds this value, -// need to use `int64_t' instead of `int'. -const int RWLockMaxReaders = 1 << 30; - -// For reading. -static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->fetch_add(1, butil::memory_order_acquire) + 1; - // Fast path. - if (reader_count >= 0) { - CHECK_LT(reader_count, RWLockMaxReaders); - return 0; - } - // Slow path. +int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock, + const struct timespec* abstime) { + auto whole = (butil::atomic*)rwlock->state; + auto w_wait_count = (butil::atomic*)rwlock->w_wait_count; + while (true) { + // Write priority. + unsigned w = w_wait_count->load(butil::memory_order_acquire); + if (w > 0) { + if (try_lock) { + return EBUSY; + } else if (butex_wait(w_wait_count, w, abstime) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + return errno; + } + continue; + } - // Don't sample when contention profiler is off. - if (NULL == bthread::g_cp) { - return bthread_sem_timedwait(&rwlock->reader_sema, abstime); - } - // Ask Collector if this (contended) locking should be sampled. - const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); - if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample. - return bthread_sem_timedwait(&rwlock->reader_sema, abstime); + // 2^31 should be enough. + unsigned r = whole->load(butil::memory_order_relaxed); + if ((r >> 31) == 0) { + if (whole->compare_exchange_weak(r, r + 1, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + return 0; + } + } else if (try_lock) { + // Write exists. + return EBUSY; + } } - - // Sample. - const int64_t start_ns = butil::cpuwide_time_ns(); - int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime); - const int64_t end_ns = butil::cpuwide_time_ns(); - const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; - // Submit `csite' for each reader immediately after - // owning rdlock to avoid the contention of `csite'. - bthread::submit_contention(csite, end_ns); - - return rc; } -static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) { - return rwlock_rdlock_impl(rwlock, NULL); -} +int rwlock_unrdlock(bthread_rwlock_t* rwlock) { + auto whole = (butil::atomic*)rwlock->state; + auto w_wait_count = (butil::atomic*)rwlock->w_wait_count; -static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - return rwlock_rdlock_impl(rwlock, abstime); -} - -// Returns 0 if the lock was acquired, otherwise errno. -static inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) { while (true) { - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->load(butil::memory_order_relaxed); - if (reader_count < 0) { - // Failed to acquire the read lock because there is a writer. - return EBUSY; + unsigned r = whole->load(butil::memory_order_relaxed); + if (r == 0 || (r >> 31) != 0) { + LOG(ERROR) << "Invalid unrlock!"; + return -1; } - if (((butil::atomic*)&rwlock->reader_count) - ->compare_exchange_weak(reader_count, reader_count + 1, - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - return 0; + if(!(whole->compare_exchange_weak(r, r - 1, + butil::memory_order_release, + butil::memory_order_relaxed))) { + continue; + } + // Only wake up write waiters if there are actually write waiters. + // Check if this is the last reader and there are pending writers. + if (r == 1) { + butex_wake(whole); } - } -} - -static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) { - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->fetch_add(-1, butil::memory_order_relaxed) - 1; - // Fast path. - if (reader_count >= 0) { - return 0; - } - // Slow path. - - if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == -RWLockMaxReaders)) { - CHECK(false) << "rwlock_unrdlock of unlocked rwlock"; - return EINVAL; - } - - // A writer is pending. - int reader_wait = ((butil::atomic*)&rwlock->reader_wait) - ->fetch_add(-1, butil::memory_order_relaxed) - 1; - if (reader_wait != 0) { return 0; } - // The last reader unblocks the writer. +} - if (NULL == bthread::g_cp) { - bthread_sem_post(&rwlock->writer_sema); - return 0; +static BUTIL_FORCE_INLINE void rwlock_wrlock_cleanup(bthread_rwlock_t* rwlock, bool write_queue_locked) { + if (write_queue_locked) { + bthread_mutex_unlock(&rwlock->write_queue_mutex); } - // Ask Collector if this (contended) locking should be sampled. - const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); - if (!sampling_range) { // Don't sample - bthread_sem_post(&rwlock->writer_sema); - return 0; + auto w_wait_count = (butil::atomic*)rwlock->w_wait_count; + // Fail to acquire the wrlock. + auto w = w_wait_count->fetch_sub(1, butil::memory_order_relaxed); + // Only wake read waiters if there might be any (w_wait_count > 0 means readers are waiting). + if (w == 1) { + butex_wake_all(w_wait_count); } - - // Sampling. - const int64_t start_ns = butil::cpuwide_time_ns(); - bthread_sem_post(&rwlock->writer_sema); - const int64_t end_ns = butil::cpuwide_time_ns(); - const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; - // Submit `csite' for each reader immediately after - // releasing rdlock to avoid the contention of `csite'. - bthread::submit_contention(csite, end_ns); - return 0; } -#define DO_CSITE_IF_NEED \ - do { \ - /* Don't sample when contention profiler is off. */ \ - if (NULL != bthread::g_cp) { \ - /* Ask Collector if this (contended) locking should be sampled. */ \ - sampling_range = bvar::is_collectable(&bthread::g_cp_sl); \ - start_ns = bvar::is_sampling_range_valid(sampling_range) ? \ - butil::cpuwide_time_ns() : -1; \ - } else { \ - start_ns = -1; \ - } \ - } while (0) - -#define SUBMIT_CSITE_IF_NEED \ - do { \ - if (ETIMEDOUT == rc && start_ns > 0) { \ - /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */ \ - const int64_t end_ns = butil::cpuwide_time_ns(); \ - const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; \ - bthread::submit_contention(csite, end_ns); \ - } \ - } while (0) - -// For writing. -static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { +int rwlock_wrlock(bthread_rwlock_t* rwlock, bool try_lock, + const struct timespec* abstime) { + auto w_wait_count = (butil::atomic*)rwlock->w_wait_count; + // 2^31 should be enough. + w_wait_count->fetch_add(1, butil::memory_order_relaxed); + // First, resolve competition with other writers. int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex); - size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; - // -1: don't sample. - // 0: default value. - // > 0: Start time of sampling. - int64_t start_ns = 0; if (0 != rc) { - DO_CSITE_IF_NEED; - + if (try_lock) { + // Fail to acquire the wrlock. + rwlock_wrlock_cleanup(rwlock, false); + return rc; + } rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime); if (0 != rc) { - SUBMIT_CSITE_IF_NEED; + // Fail to acquire the wrlock. + rwlock_wrlock_cleanup(rwlock, false); return rc; } } - // Announce to readers there is a pending writer. - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->fetch_add(-RWLockMaxReaders, butil::memory_order_release); - // Wait for active readers. - if (reader_count != 0 && - ((butil::atomic*)&rwlock->reader_wait) - ->fetch_add(reader_count) + reader_count != 0) { - rc = bthread_sem_trywait(&rwlock->writer_sema); - if (0 != rc) { - if (0 == start_ns) { - DO_CSITE_IF_NEED; + auto whole = (butil::atomic*)rwlock->state; + while (true) { + unsigned r = whole->load(butil::memory_order_relaxed); + if (r != 0) { + if (try_lock) { + errno = EBUSY; + break; } - - rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime); - if (0 != rc) { - SUBMIT_CSITE_IF_NEED; - bthread_mutex_unlock(&rwlock->write_queue_mutex); - return rc; + if (butex_wait(whole, r, abstime) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + break; } + continue; + } + if (whole->compare_exchange_weak(r, (unsigned)(1 << 31), + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + return 0; } } - if (start_ns > 0) { - rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns; - rwlock->writer_csite.sampling_range = sampling_range; - } - rwlock->wlock_flag = true; - return 0; -} -#undef DO_CSITE_IF_NEED -#undef SUBMIT_CSITE_IF_NEED -static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) { - return rwlock_wrlock_impl(rwlock, NULL); + rwlock_wrlock_cleanup(rwlock, true); + return errno; } -static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - return rwlock_wrlock_impl(rwlock, abstime); -} +int rwlock_unwrlock(bthread_rwlock_t* rwlock) { + auto whole = (butil::atomic*)rwlock->state; + auto w_wait_count = (butil::atomic*)rwlock->w_wait_count; -static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) { - int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex); - if (0 != rc) { - return rc; - } + while (true) { + unsigned r = whole->load(butil::memory_order_relaxed); + if (r != (unsigned)(1 << 31) ) { + LOG(ERROR) << "Invalid unwrlock!"; + return EINVAL; + } + if (!whole->compare_exchange_weak(r, 0, + butil::memory_order_release, + butil::memory_order_relaxed)) { + continue; + } - int expected = 0; - if (!((butil::atomic*)&rwlock->reader_count) - ->compare_exchange_strong(expected, -RWLockMaxReaders, - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - // Failed to acquire the write lock because there are active readers. + // Wake up one write waiter first if there are more write waiters. + // w > 1 means there are other write waiters (w was the count before decrement). + // if (w > 1) { + // butex_wake(whole); + // } + // Unlock `write_queue_mutex' to wake up other write waiters (if any). bthread_mutex_unlock(&rwlock->write_queue_mutex); - return EBUSY; - } - rwlock->wlock_flag = true; - - return 0; -} - -static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int reader_count) { - bthread_sem_post_n(&rwlock->reader_sema, reader_count); - // Allow other writers to proceed. - bthread_mutex_unlock(&rwlock->write_queue_mutex); -} -static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) { - rwlock->wlock_flag = false; - - // Announce to readers there is no active writer. - int reader_count = ((butil::atomic*)&rwlock->reader_count)->fetch_add( - RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders; - if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) { - CHECK(false) << "rwlock_unwlock of unlocked rwlock"; - return EINVAL; - } - - bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite); - if (BAIDU_UNLIKELY(is_valid)) { - bthread_contention_site_t saved_csite = rwlock->writer_csite; - bthread::make_contention_site_invalid(&rwlock->writer_csite); - - const int64_t unlock_start_ns = butil::cpuwide_time_ns(); - rwlock_unwrlock_slow(rwlock, reader_count); - const int64_t unlock_end_ns = butil::cpuwide_time_ns(); - saved_csite.duration_ns += unlock_end_ns - unlock_start_ns; - bthread::submit_contention(saved_csite, unlock_end_ns); - } else { - rwlock_unwrlock_slow(rwlock, reader_count); + // Decrement write waiter count. + unsigned w = w_wait_count->fetch_sub(1, butil::memory_order_relaxed); + // Wake up all read waiters if no more write waiters. + if (w == 1) { + butex_wake_all(w_wait_count); + } + return 0; } - - return 0; } -static inline int rwlock_unlock(bthread_rwlock_t* rwlock) { - if (rwlock->wlock_flag) { +int rwlock_unlock(bthread_rwlock_t* rwlock) { + auto whole = (butil::atomic*)rwlock->state; + if ((whole->load(butil::memory_order_relaxed) >> 31) != 0) { return rwlock_unwrlock(rwlock); } else { return rwlock_unrdlock(rwlock); @@ -296,69 +189,60 @@ __BEGIN_DECLS int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, const bthread_rwlockattr_t* __restrict) { - int rc = bthread_sem_init(&rwlock->reader_sema, 0); - if (BAIDU_UNLIKELY(0 != rc)) { - return rc; + rwlock->w_wait_count = bthread::butex_create_checked(); + rwlock->state = bthread::butex_create_checked(); + if (NULL == rwlock->w_wait_count || NULL == rwlock->state) { + LOG(ERROR) << "Out of memory"; + return ENOMEM; } - bthread_sem_disable_csite(&rwlock->reader_sema); - rc = bthread_sem_init(&rwlock->writer_sema, 0); - if (BAIDU_UNLIKELY(0 != rc)) { - bthread_sem_destroy(&rwlock->reader_sema); - return rc; - } - bthread_sem_disable_csite(&rwlock->writer_sema); - - rwlock->reader_count = 0; - rwlock->reader_wait = 0; - rwlock->wlock_flag = false; + *rwlock->w_wait_count = 0; + *rwlock->state = 0; bthread_mutexattr_t attr; bthread_mutexattr_init(&attr); bthread_mutexattr_disable_csite(&attr); - rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr); - if (BAIDU_UNLIKELY(0 != rc)) { - bthread_sem_destroy(&rwlock->reader_sema); - bthread_sem_destroy(&rwlock->writer_sema); - return rc; - } - bthread_mutexattr_destroy(&attr); - - bthread::make_contention_site_invalid(&rwlock->writer_csite); - + bthread_mutex_init(&rwlock->write_queue_mutex, &attr); return 0; } int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) { - bthread_sem_destroy(&rwlock->reader_sema); - bthread_sem_destroy(&rwlock->writer_sema); - bthread_mutex_destroy(&rwlock->write_queue_mutex); + bthread::butex_destroy(rwlock->w_wait_count); + bthread::butex_destroy(rwlock->state); return 0; } int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_rdlock(rwlock); + return bthread::rwlock_rdlock(rwlock, false, NULL); } int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_tryrdlock(rwlock); + return bthread::rwlock_rdlock(rwlock, true, NULL); } int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime) { - return bthread::rwlock_timedrdlock(rwlock, abstime); + return bthread::rwlock_rdlock(rwlock, false, abstime); } int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_wrlock(rwlock); + return bthread::rwlock_wrlock(rwlock, false, NULL); } int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_trywrlock(rwlock); + return bthread::rwlock_wrlock(rwlock, true, NULL); } int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime) { - return bthread::rwlock_timedwrlock(rwlock, abstime); + return bthread::rwlock_wrlock(rwlock, false, abstime); +} + +int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_unrdlock(rwlock); +} + +int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) { + return bthread::rwlock_unwrlock(rwlock); } int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { diff --git a/src/bthread/types.h b/src/bthread/types.h index 86148c938b..4df0cc8a63 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -225,15 +225,17 @@ typedef struct bthread_sem_t { typedef struct bthread_rwlock_t { #if defined(__cplusplus) bthread_rwlock_t() - : reader_count(0), reader_wait(0), wlock_flag(false), writer_csite{} {} + : w_wait_count(0), state(NULL), writer_csite{} {} DISALLOW_COPY_AND_ASSIGN(bthread_rwlock_t); #endif - bthread_sem_t reader_sema; // Semaphore for readers to wait for completing writers. - bthread_sem_t writer_sema; // Semaphore for writers to wait for completing readers. - int reader_count; // Number of pending readers. - int reader_wait; // Number of departing readers. - bool wlock_flag; // Flag used to indicate that a write lock has been held. - bthread_mutex_t write_queue_mutex; // Held if there are pending writers. + // Count of the bthread who holding write lock yet. + unsigned* w_wait_count; + // Held if there are pending writers. + bthread_mutex_t write_queue_mutex; + // Highest bit 1 for write locked, + // low 31 bit for read lock holding count, + // 0 for unlocked. + unsigned* state; bthread_contention_site_t writer_csite; } bthread_rwlock_t; diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp index 2da226cb2f..e428d1a308 100644 --- a/test/bthread_rwlock_unittest.cpp +++ b/test/bthread_rwlock_unittest.cpp @@ -284,6 +284,7 @@ TEST(RWLockTest, mix_thread_types) { } ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); + LOG(INFO) << 11111111; } struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { @@ -386,13 +387,14 @@ void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/, int thread_num, << " writer_ratio=" << writer_ratio << " reader_num=" << reader_num << " read_count=" << read_count - << " read_average_time=" << (read_count == 0 ? 0 : read_wait_time / (double)read_count) + << " read_average_time=" << (read_count == 0 ? 0 : read_wait_time / (double)read_count) << "ns" << " writer_num=" << writer_num << " write_count=" << write_count - << " write_average_time=" << (write_count == 0 ? 0 : write_wait_time / (double)write_count); + << " write_average_time=" << (write_count == 0 ? 0 : write_wait_time / (double)write_count) << "ns"; } TEST(RWLockTest, performance) { + bthread_setconcurrency(16); const int thread_num = 12; PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join);