Skip to content

Instantly share code, notes, and snippets.

@nabeken
Last active October 29, 2015 12:08
Show Gist options
  • Select an option

  • Save nabeken/7f85026ccdb828f7bca7 to your computer and use it in GitHub Desktop.

Select an option

Save nabeken/7f85026ccdb828f7bca7 to your computer and use it in GitHub Desktop.

Revisions

  1. nabeken revised this gist Oct 29, 2015. 1 changed file with 6 additions and 15 deletions.
    21 changes: 6 additions & 15 deletions readseekcloser.go
    Original file line number Diff line number Diff line change
    @@ -6,6 +6,7 @@ import (
    "log"
    "os"
    "sync"
    "sync/atomic"
    )

    type ReadSeekCloser interface {
    @@ -37,7 +38,6 @@ type ReadSeeker struct {

    once sync.Once

    mupos sync.Mutex
    // position for rw
    wpos int64
    // position for rs
    @@ -50,10 +50,8 @@ func (rs *ReadSeeker) Read(p []byte) (int, error) {
    rs.mu.Lock()
    defer rs.mu.Unlock()

    rs.mupos.Lock()
    rpos := rs.rpos
    wpos := rs.wpos
    rs.mupos.Unlock()
    rpos := atomic.LoadInt64(&rs.rpos)
    wpos := atomic.LoadInt64(&rs.wpos)

    if !rs.closed && wpos == rpos {
    // we are waiting for data to be available
    @@ -63,9 +61,7 @@ func (rs *ReadSeeker) Read(p []byte) (int, error) {
    }

    n, err := rs.rs.Read(p)
    rs.mupos.Lock()
    rs.rpos += int64(n)
    rs.mupos.Unlock()
    atomic.AddInt64(&rs.rpos, int64(n))

    if rs.closed {
    log.Printf("read: copying is over: err:%s rs.err:%s rpos:%d wpos: %d", err, rs.err, rs.rpos, rs.wpos)
    @@ -91,10 +87,8 @@ func (rs *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
    rs.once.Do(func() { go rs.copy() })

    n, err := rs.rs.Seek(offset, whence)
    rs.mupos.Lock()
    rs.rpos = n
    atomic.StoreInt64(&rs.rpos, n)
    log.Printf("seek: seek pos:%d offset:%d whence:%d err:%s rpos:%d wpos: %d", n, offset, whence, err, rs.rpos, rs.wpos)
    rs.mupos.Unlock()

    return n, err
    }
    @@ -125,11 +119,8 @@ func (rs *ReadSeeker) copy() {
    break
    }

    // count how much data is availavle for read
    rs.mupos.Lock()
    rs.wpos += int64(nw)
    atomic.AddInt64(&rs.wpos, int64(nw))
    log.Printf("copy: read: n:%d rpos:%d wpos:%d", nw, rs.rpos, rs.wpos)
    rs.mupos.Unlock()

    // signal that data is available for read
    rs.copycond.Signal()
  2. nabeken created this gist Oct 29, 2015.
    191 changes: 191 additions & 0 deletions readseekcloser.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,191 @@
    package ioutils

    import (
    "io"
    "io/ioutil"
    "log"
    "os"
    "sync"
    )

    type ReadSeekCloser interface {
    io.Reader
    io.Seeker
    io.Closer
    }

    type ReadSeeker struct {
    // reader for source
    sr io.Reader
    // reader for tempfile-based buffer
    rs *os.File
    // writer for tempfile-based buffer
    rw *os.File

    mu sync.Mutex
    // err from sr and rw
    err error
    // whether rw is closed
    closed bool

    // buffer between sr and rw to write data to rw
    copybuf []byte
    // closed when copying is done
    copych chan struct{}
    // signaled when copy() writes data to rw
    copycond *sync.Cond

    once sync.Once

    mupos sync.Mutex
    // position for rw
    wpos int64
    // position for rs
    rpos int64
    }

    func (rs *ReadSeeker) Read(p []byte) (int, error) {
    rs.once.Do(func() { go rs.copy() })

    rs.mu.Lock()
    defer rs.mu.Unlock()

    rs.mupos.Lock()
    rpos := rs.rpos
    wpos := rs.wpos
    rs.mupos.Unlock()

    if !rs.closed && wpos == rpos {
    // we are waiting for data to be available
    log.Printf("read: waiting for data to be available: rpos:%d wpos: %d", rs.rpos, rs.wpos)
    rs.copycond.Wait()
    log.Printf("read: done waiting for data to be available: rpos:%d wpos: %d", rs.rpos, rs.wpos)
    }

    n, err := rs.rs.Read(p)
    rs.mupos.Lock()
    rs.rpos += int64(n)
    rs.mupos.Unlock()

    if rs.closed {
    log.Printf("read: copying is over: err:%s rs.err:%s rpos:%d wpos: %d", err, rs.err, rs.rpos, rs.wpos)
    // copying is over
    if rs.err != nil {
    err = rs.err
    }
    // we can return io.EOF after copying is over
    return n, err
    }

    log.Printf("read: copying is still ongoing: err:%s rpos:%d wpos: %d", err, rs.rpos, rs.wpos)

    // copying is still ongoing so even if we got EOF from rs
    if err == io.EOF {
    err = nil
    }

    return n, err
    }

    func (rs *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
    rs.once.Do(func() { go rs.copy() })

    n, err := rs.rs.Seek(offset, whence)
    rs.mupos.Lock()
    rs.rpos = n
    log.Printf("seek: seek pos:%d offset:%d whence:%d err:%s rpos:%d wpos: %d", n, offset, whence, err, rs.rpos, rs.wpos)
    rs.mupos.Unlock()

    return n, err
    }

    func (rs *ReadSeeker) Close() error {
    // Closing reader
    rs.rs.Close()

    // Closing rw but we should wait for copy() to be finishd
    // since copying is happened in another goroutine
    <-rs.copych

    return os.Remove(rs.rw.Name())
    }

    func (rs *ReadSeeker) copy() {
    for {
    nr, err := rs.sr.Read(rs.copybuf)
    if nr > 0 {
    nw, ew := rs.rw.Write(rs.copybuf[0:nr])
    if ew != nil {
    rs.err = ew
    break
    }

    if nr != nw {
    rs.err = io.ErrShortWrite
    break
    }

    // count how much data is availavle for read
    rs.mupos.Lock()
    rs.wpos += int64(nw)
    log.Printf("copy: read: n:%d rpos:%d wpos:%d", nw, rs.rpos, rs.wpos)
    rs.mupos.Unlock()

    // signal that data is available for read
    rs.copycond.Signal()
    }

    if err == io.EOF {
    break
    }

    if err != nil {
    rs.err = err
    break
    }
    }

    // we're done copying so closing...
    // reader must be blocked while we're closing
    rs.mu.Lock()
    defer rs.mu.Unlock()

    if err := rs.rw.Close(); err != nil {
    rs.err = err
    }
    rs.closed = true
    close(rs.copych)

    log.Printf("copy: done: rpos:%d wpos:%d", rs.rpos, rs.wpos)

    // signal that data is available for last read
    rs.copycond.Signal()
    }

    func NewReadSeeker(r_ io.Reader) (ReadSeekCloser, error) {
    rw, err := ioutil.TempFile("", "hoge")
    if err != nil {
    return nil, err
    }

    rs, err := os.Open(rw.Name())
    if err != nil {
    return nil, err
    }

    rsc := &ReadSeeker{
    sr: r_,
    rs: rs,
    rw: rw,

    // https://golang.org/src/io/io.go?s=12227:12287#L378
    copybuf: make([]byte, 32*1024),

    // this should be buffered since copy will send a message
    // when no one wait on Close()
    copych: make(chan struct{}, 1),
    }
    rsc.copycond = sync.NewCond(&rsc.mu)

    return rsc, nil
    }