package disruptor import ( "encoding/binary" "log" "runtime" "sync/atomic" "time" ) const ( RingBufferSize = 8 * 64 // 8 64-byte cache lines SizeOfInt = 8 ) type Offset = uint64 type ring struct { // Make sure 64-byte alignment buf [RingBufferSize]byte dirty Offset clean Offset datumSz Offset // max 264 bytes (4*64) retries int64 } func New(datumSize Offset) *ring { return &ring{datumSz: datumSize} } // commit should be inlined by the compiler func (r *ring) commit(oldDirty, newOffset Offset) bool { clean := atomic.LoadUint64(&r.clean) return atomic.CompareAndSwapUint64(&r.dirty, oldDirty, newOffset) && atomic.CompareAndSwapUint64(&r.clean, clean, newOffset) } func (r *ring) write(data []byte) int { sz := Offset(len(data)) if sz > r.datumSz { panic("unsupported data chunk size") } // Try until we succeed for { // Load stored area dirty := atomic.LoadUint64(&r.dirty) // Calculate new offset off := (dirty + sz) % r.datumSz // Copy data to the new slot _ = copy(r.buf[off:], data[:r.datumSz]) // Break if we are able to commit if r.commit(dirty, off) { break } atomic.AddInt64(&r.retries, 1) } return int(sz) } // Read returns data chunk available to read. It may return zeroed memory if there is no available // data in the buffer. func (r *ring) Read() []byte { return r.buf[atomic.LoadUint64(&r.clean):r.datumSz] } func TestDisruptor(samples int, wait *int64) *ring { data := make([]byte, SizeOfInt) r := New(SizeOfInt) // go func(t0 time.Time) { for i := 0; i < samples; i++ { binary.LittleEndian.PutUint64(data, uint64(i)) r.write(data) } elapsed := time.Since(t0) log.Printf("writer: finished writing %dM samples in %v [%.2f samples/s] (with %d contentions)", samples/1000000, elapsed, float64(samples)/elapsed.Seconds(), atomic.LoadInt64(&r.retries)) atomic.AddInt64(wait, -1) }(time.Now()) return r } func WriteN(n int, wait *int64) *ring { data := make([]byte, SizeOfInt) r := New(SizeOfInt) // go func() { for i := 10; i < 10+n; i++ { binary.LittleEndian.PutUint64(data, uint64(i)) r.write(data) runtime.Gosched() } atomic.AddInt64(wait, -1) }() return r }