// based on http://www.research.ibm.com/people/m/michael/podc-2002.pdf // also resembles RCU. #ifndef SMRP_HPP #define SMRP_HPP #include #include #include #include #include #include namespace smrp { // Single Writer Multiple Readers pointer storage, // both writers and readers are lock-free, // writers wait for all readers to finish // with previous value so it can be deleted. // Thus, write throughput is limited by slowest // reader, and writes are meant to happen very rarely. template class swmr_pointer_store_t { static const size_t CACHE_LINE_SIZE = 64; typedef std::atomic aptr_t; // no-false-sharing-pointer class nfs_ptr_t { aptr_t aptr_; char pad_[CACHE_LINE_SIZE - sizeof (aptr_t)]; nfs_ptr_t (const nfs_ptr_t & other); nfs_ptr_t & operator= (const nfs_ptr_t & other); public: nfs_ptr_t () : aptr_ (0) {} const aptr_t * operator-> () const { return &aptr_; } aptr_t * operator-> () { return &aptr_; } }; // value owned by writer nfs_ptr_t main_ptr_; // values owned by readers mutable nfs_ptr_t hazard_ptrs_[READER_THREAD_NUM]; // Implements wait strategy for writers in case // the old value is used by readers. void wait () { // DEBUG: don't wait while testing, just spin. } // no copies! moving is ok swmr_pointer_store_t (const swmr_pointer_store_t & other); swmr_pointer_store_t & operator= (const swmr_pointer_store_t && other); public: swmr_pointer_store_t () {} // Must only be called by writer thread, // wait-free, 1 atomic load. T * get () const { return main_ptr_->load (std::memory_order_relaxed); } // Writer thread calls this to replace previous value, // the previous value is returned as soon as // no readers are using it. That is, this will work as slow // as the slowest reader works (and not faster than 'wait' above). T * set (T * ptr) { // replace old value with new value, T * const old = main_ptr_->exchange (ptr, std::memory_order_release); // do we actually need to check readers? if (old != 0 && ptr != old) { // now make sure old value is not in use bool locked = false; do { T * v = 0; for (size_t i = 0; v != old && i < READER_THREAD_NUM; ++i) v = hazard_ptrs_[i]->load (std::memory_order_acquire); locked = v == old; // some reader is using our value? // ok, lets wait to let him unlock it. if (locked) wait (); } while (locked); } // ok, no readers use old value now return old; } // Readers use 'rlock' to acquire current value of pointer, // and notify writer that the value is in use until 'runlock'. // This call might loop while in contention with writer, // but since writes are to happen rarely, looping is // very unlikely. T * rlock (const size_t thread_index) const { assert (thread_index < READER_THREAD_NUM); T * v = 0; T * v1 = 0; do { // load the pointer v = main_ptr_->load (std::memory_order_acquire); // save it to our thread's hazard pointer slot // FIXME SC is required, but makes it 10x times slower, // w/o SC test constantly fails. Why? hazard_ptrs_[thread_index]->store (v, std::memory_order_release); // now - check main pointer again, to make sure // writer hasn't managed to perform 'set' btw previous 2 operations v1 = main_ptr_->load (std::memory_order_acquire); // if writer didn't change anything, // return our 'locked' value } while (v1 != v); // we don't wait on retry as writer // has already completed (or we wouldn't need to retry) return v; } // Readers use 'runlock' to notify writers that the last // read value is no longer in use. It is wait-free - // 1 atomic store. void runlock (const size_t thread_index) const { assert (thread_index < READER_THREAD_NUM); // notify writer that we're done hazard_ptrs_[thread_index]->store (0, std::memory_order_release); } }; inline int test (int ac, char ** av) { assert (ac > 2); static const size_t MAX_READER_THREADS = 8; const size_t READERS = atoi (av[2]); assert (READERS <= MAX_READER_THREADS); swmr_pointer_store_t pointer; auto reader = [&] (const size_t reader_index) { int last_value = 0; const size_t N = 150000000; // takes ~1 sec on my taptop for (size_t i = 0; i < N; ++i) { int * v = pointer.rlock (reader_index); if (v) { assert (*v >= last_value); last_value = *v; } pointer.runlock (reader_index); } }; auto writer = [&] () { const size_t N = 10000000; // ~1 sec on my laptop int last = -1; for (size_t i = 0; i < N; ++i) { int * v = new int (i); assert (*v > last); int * old = pointer.set (v); if (old) assert (last == *old); delete old; last = *v; } delete pointer.set (0); }; std::thread w (writer); const auto start = std::chrono::high_resolution_clock::now (); std::vector readers; for (size_t i = 0; i < READERS; ++i) readers.push_back (std::thread (reader, i)); for (size_t i = 0; i < READERS; ++i) readers[i].join (); const auto end = std::chrono::high_resolution_clock::now (); const auto passed = std::chrono::duration_cast (end - start); printf ("readers done in %lld ms\n", (long long)passed.count ()); w.join (); return 0; } }; /* namespace smrp */ #endif /* SMRP_HPP */