Skip to content

Instantly share code, notes, and snippets.

@maple3142
Created October 8, 2024 03:37
Show Gist options
  • Select an option

  • Save maple3142/b74c721aed9a12aa769b8385a46f11d3 to your computer and use it in GitHub Desktop.

Select an option

Save maple3142/b74c721aed9a12aa769b8385a46f11d3 to your computer and use it in GitHub Desktop.

Revisions

  1. maple3142 created this gist Oct 8, 2024.
    120 changes: 120 additions & 0 deletions go_duplex_not_working.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,120 @@
    package main

    import (
    "fmt"
    "io"
    "net"
    "net/http"
    "time"
    )

    type HttpStream struct {
    r *http.Request
    w http.ResponseWriter
    rc *http.ResponseController
    closeChan chan struct{}
    }

    func NewHttpStream(w http.ResponseWriter, r *http.Request, rc *http.ResponseController, closeChan chan struct{}) *HttpStream {
    return &HttpStream{
    r: r,
    w: w,
    rc: rc,
    closeChan: closeChan,
    }
    }

    func (wh *HttpStream) Read(p []byte) (n int, err error) {
    n, err = wh.r.Body.Read(p)
    return
    }

    func (wh *HttpStream) Write(p []byte) (n int, err error) {
    n, err = wh.w.Write(p)
    wh.rc.Flush()
    return
    }

    func (wh *HttpStream) Close() error {
    wh.closeChan <- struct{}{}
    return nil
    }

    type Handler struct{}

    func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    rc := http.NewResponseController(w)
    err := rc.EnableFullDuplex()
    if err != nil {
    panic(err)
    }

    // buf := make([]byte, 1024)
    // for {
    // n, err := r.Body.Read(buf)
    // if err != nil {
    // break
    // }
    // w.Write(buf[:n])
    // rc.Flush()
    // }

    ch := make(chan struct{})
    stream := NewHttpStream(w, r, rc, ch)
    go func() {
    buf := make([]byte, 1024)
    for i := 0; i < 3; i++ {
    n, err := stream.Read(buf)
    if err != nil {
    break
    }
    stream.Write(buf[:n])
    }
    stream.Close()
    }()
    <-ch
    fmt.Println("close")
    }

    func main() {
    server := &http.Server{
    Handler: &Handler{},
    }
    listenSuccess := make(chan struct{})
    go func() {
    l, err := net.Listen("tcp", ":3000")
    if err != nil {
    panic(err)
    }
    listenSuccess <- struct{}{}
    server.Serve(l)
    }()
    <-listenSuccess
    fmt.Println("server started")

    reader, writer := io.Pipe()
    req, err := http.NewRequest("POST", "http://localhost:3000", reader)
    if err != nil {
    panic(err)
    }
    fmt.Println("req created")
    resp, err := http.DefaultClient.Do(req) // this blocks untill the request body is fully read...
    if err != nil {
    panic(err)
    }
    fmt.Println("got resp")
    go func() {
    for {
    writer.Write([]byte("hello"))
    time.Sleep(time.Second)
    }
    }()
    buf := make([]byte, 1024)
    for {
    n, err := resp.Body.Read(buf)
    if err != nil {
    break
    }
    fmt.Printf("received %s\n", string(buf[:n]))
    }
    }
    31 changes: 31 additions & 0 deletions http_duplex.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,31 @@
    const http = require('http')

    http.createServer((req, res) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' })
    req.on('data', chunk => {
    console.log('server recv', chunk)
    res.write(chunk.map(v => v + 1))
    })
    req.on('end', () => {
    res.end('END')
    })
    }).listen(3000)

    const req = http.request('http://localhost:3000/lol', { method: 'POST' }, res => {
    res.on('data', chunk => {
    console.log('client recv', chunk)
    })
    res.on('end', () => {
    console.log('client end')
    })
    res.on('error', err => {
    console.error(err)
    })
    })
    const it = setInterval(() => {
    req.write('a'.repeat(10))
    }, 1000)
    setTimeout(() => {
    req.end()
    clearInterval(it)
    }, 5000)