package main import ( "errors" "fmt" "math/rand" "net/http" "sync" "time" "github.com/gorilla/websocket" ) // Request represents a request from client type Request struct { ID uint64 `json:"id"` Message interface{} `json:"message"` } // Response is the reply message from the server type Response struct { ID uint64 `json:"id"` Message interface{} `json:"message"` } // Call represents an active request type Call struct { Req Request Res Response Done chan bool Error error } func NewCall(req Request) *Call { done := make(chan bool) return &Call{ Req: req, Done: done, } } type WSClient struct { mutex sync.Mutex conn *websocket.Conn pending map[uint64]*Call counter uint64 } func New() *WSClient { return &WSClient{ pending: make(map[uint64]*Call, 1), counter: 1, } } func (c *WSClient) read() { var err error for err == nil { var res Response err = c.conn.ReadJSON(&res) if err != nil { err = fmt.Errorf("error reading message: %q", err) continue } // fmt.Printf("received message: %+v\n", res) c.mutex.Lock() call := c.pending[res.ID] delete(c.pending, res.ID) c.mutex.Unlock() if call == nil { err = errors.New("no pending request found") continue } call.Res = res call.Done <- true } c.mutex.Lock() for _, call := range c.pending { call.Error = err call.Done <- true } c.mutex.Unlock() } func (c *WSClient) Connect(url string) error { conn, _, err := websocket.DefaultDialer.Dial(url, http.Header{ "User-Agent": []string{"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36"}, }) if err != nil { return err } c.conn = conn go c.read() return nil } func (c *WSClient) Request(payload interface{}) (interface{}, error) { c.mutex.Lock() id := c.counter c.counter++ req := Request{ID: id, Message: payload} call := NewCall(req) c.pending[id] = call err := c.conn.WriteJSON(&req) if err != nil { delete(c.pending, id) c.mutex.Unlock() return nil, err } c.mutex.Unlock() select { case <-call.Done: case <-time.After(2 * time.Second): call.Error = errors.New("request timeout") } if call.Error != nil { return nil, call.Error } return call.Res.Message, nil } func (c *WSClient) Close() error { return c.conn.Close() } func main() { client := New() err := client.Connect("ws://echo.websocket.org") if err != nil { panic(err) } var wg sync.WaitGroup wg.Add(20) for i := 1; i <= 20; i++ { go func() { want := rand.Intn(100) res, err := client.Request(want) if err != nil { fmt.Println("error transaction: %d", err) wg.Done() return } got := int(res.(float64)) if got != want { panic(fmt.Errorf("got: %d\nwant: %d\n", got, want)) } fmt.Printf("transaction %d : %d\n", want, got) wg.Done() }() } wg.Wait() defer func() { err = client.Close() if err != nil { panic(err) } }() }