Skip to content

Instantly share code, notes, and snippets.

@embano1
Last active November 20, 2024 18:49
Show Gist options
  • Select an option

  • Save embano1/e0bf49d24f1cdd07cffad93097c04f0a to your computer and use it in GitHub Desktop.

Select an option

Save embano1/e0bf49d24f1cdd07cffad93097c04f0a to your computer and use it in GitHub Desktop.

Revisions

  1. embano1 revised this gist Feb 14, 2020. 2 changed files with 37 additions and 5 deletions.
    23 changes: 19 additions & 4 deletions client.go
    Original file line number Diff line number Diff line change
    @@ -11,6 +11,7 @@ import (
    "syscall"
    "time"

    "github.com/golang/protobuf/ptypes/empty"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    @@ -25,10 +26,9 @@ func main() {
    }
    defer conn.Close()
    cli := greeter.NewGreeterClient(conn)
    req := greeter.HelloRequest{}
    stream, err := cli.SayHello(ctx, &req)
    stream, err := cli.SayHelloStream(ctx, &greeter.HelloRequest{})
    if err != nil {
    log.Fatalf("could not establish stream: %v", err)
    log.Fatalf("could not create streaming client: %v", err)
    }

    sigCh := make(chan os.Signal, 1)
    @@ -46,14 +46,29 @@ func main() {
    }
    }()

    wg.Add(1)
    go func() {
    defer wg.Done()
    time.Sleep(5 * time.Second)
    resp, err := cli.SayHello(ctx, &empty.Empty{})
    if err != nil {
    if status.Code(err) == codes.Canceled {
    log.Println("context cancelled")
    return
    }
    log.Fatalf("could not perform regular rpc request: %v", err)
    }
    log.Printf("received SayHello response: %+v", resp)
    }()

    wg.Add(1)
    go func() {
    defer wg.Done()
    for {
    r, err := stream.Recv()
    if err != nil {
    if err == io.EOF || status.Code(err) == codes.Canceled {
    log.Println("stream closed")
    log.Println("stream closed (context cancelled)")
    cancel()
    return
    }
    19 changes: 18 additions & 1 deletion server.go
    Original file line number Diff line number Diff line change
    @@ -3,6 +3,7 @@ package main
    import (
    "context"
    "fmt"
    "grpc-tutorial/greeter"
    pb "grpc-tutorial/greeter"
    "log"
    "math/rand"
    @@ -14,7 +15,10 @@ import (
    "syscall"
    "time"

    "github.com/golang/protobuf/ptypes/empty"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    )

    type greeterServer struct {
    @@ -40,19 +44,32 @@ func generate(ctx context.Context) <-chan int {
    return ch
    }

    func (g *greeterServer) SayHello(req *pb.HelloRequest, stream pb.Greeter_SayHelloServer) error {
    func (g *greeterServer) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
    for n := range g.intCh {
    resp := pb.HelloReply{
    Message: strconv.Itoa(n),
    }

    if err := stream.Send(&resp); err != nil {
    if status.Code(err) == codes.Canceled {
    log.Println("stream closed (context cancelled)")
    return nil
    }
    log.Printf("could not send over stream: %v", err)
    return err
    }
    log.Printf("sent %d", n)
    }
    return nil
    }

    func (g *greeterServer) SayHello(context.Context, *empty.Empty) (*greeter.HelloReply, error) {
    resp := pb.HelloReply{
    Message: "this WORKED!",
    }
    return &resp, nil
    }

    func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
  2. embano1 created this gist Feb 12, 2020.
    66 changes: 66 additions & 0 deletions client.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    package main

    import (
    "context"
    "grpc-tutorial/greeter"
    "io"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    )

    func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    conn, err := grpc.DialContext(ctx, "localhost:8080", grpc.WithBlock(), grpc.WithTimeout(3*time.Second), grpc.WithInsecure())
    if err != nil {
    log.Fatalf("could not connect to server: %v", err)
    }
    defer conn.Close()
    cli := greeter.NewGreeterClient(conn)
    req := greeter.HelloRequest{}
    stream, err := cli.SayHello(ctx, &req)
    if err != nil {
    log.Fatalf("could not establish stream: %v", err)
    }

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
    defer wg.Done()
    select {
    case <-ctx.Done():
    return
    case s := <-sigCh:
    log.Printf("got signal %v, attempting graceful shutdown", s)
    cancel()
    }
    }()

    wg.Add(1)
    go func() {
    defer wg.Done()
    for {
    r, err := stream.Recv()
    if err != nil {
    if err == io.EOF || status.Code(err) == codes.Canceled {
    log.Println("stream closed")
    cancel()
    return
    }
    log.Fatalf("error while receiving stream response: %v", err)
    }
    log.Printf("received value: %+v", r)
    }
    }()
    wg.Wait()
    }
    92 changes: 92 additions & 0 deletions server.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,92 @@
    package main

    import (
    "context"
    "fmt"
    pb "grpc-tutorial/greeter"
    "log"
    "math/rand"
    "net"
    "os"
    "os/signal"
    "strconv"
    "sync"
    "syscall"
    "time"

    "google.golang.org/grpc"
    )

    type greeterServer struct {
    intCh <-chan int
    }

    func generate(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
    for {
    select {
    case <-time.Tick(time.Second):
    rand.Seed(time.Now().UnixNano())
    n := rand.Int()
    // log.Printf("generated %d", n)
    ch <- n
    case <-ctx.Done():
    close(ch)
    return
    }
    }
    }()
    return ch
    }

    func (g *greeterServer) SayHello(req *pb.HelloRequest, stream pb.Greeter_SayHelloServer) error {
    for n := range g.intCh {
    resp := pb.HelloReply{
    Message: strconv.Itoa(n),
    }
    if err := stream.Send(&resp); err != nil {
    return err
    }
    log.Printf("sent %d", n)
    }
    return nil
    }

    func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    ch := generate(ctx)
    gSrv := greeterServer{
    intCh: ch,
    }

    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 8080))
    if err != nil {
    log.Fatalf("failed to listen: %v", err)
    }
    grpc := grpc.NewServer()
    pb.RegisterGreeterServer(grpc, &gSrv)

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
    s := <-sigCh
    log.Printf("got signal %v, attempting graceful shutdown", s)
    cancel()
    grpc.GracefulStop()
    // grpc.Stop() // leads to error while receiving stream response: rpc error: code = Unavailable desc = transport is closing
    wg.Done()
    }()

    log.Println("starting grpc server")
    err = grpc.Serve(lis)
    if err != nil {
    log.Fatalf("could not serve: %v", err)
    }
    wg.Wait()
    log.Println("clean shutdown")
    }