Last active
November 20, 2024 18:49
-
-
Save embano1/e0bf49d24f1cdd07cffad93097c04f0a to your computer and use it in GitHub Desktop.
Revisions
-
embano1 revised this gist
Feb 14, 2020 . 2 changed files with 37 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -11,6 +11,7 @@ import ( "syscall" "time" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -25,10 +26,9 @@ func main() { } defer conn.Close() cli := greeter.NewGreeterClient(conn) stream, err := cli.SayHelloStream(ctx, &greeter.HelloRequest{}) if err != nil { log.Fatalf("could not create streaming client: %v", err) } sigCh := make(chan os.Signal, 1) @@ -46,14 +46,29 @@ func main() { } }() wg.Add(1) go func() { defer wg.Done() time.Sleep(5 * time.Second) resp, err := cli.SayHello(ctx, &empty.Empty{}) if err != nil { if status.Code(err) == codes.Canceled { log.Println("context cancelled") return } log.Fatalf("could not perform regular rpc request: %v", err) } log.Printf("received SayHello response: %+v", resp) }() wg.Add(1) go func() { defer wg.Done() for { r, err := stream.Recv() if err != nil { if err == io.EOF || status.Code(err) == codes.Canceled { log.Println("stream closed (context cancelled)") cancel() return } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,6 +3,7 @@ package main import ( "context" "fmt" "grpc-tutorial/greeter" pb "grpc-tutorial/greeter" "log" "math/rand" @@ -14,7 +15,10 @@ import ( "syscall" "time" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type greeterServer struct { @@ -40,19 +44,32 @@ func generate(ctx context.Context) <-chan int { return ch } func (g *greeterServer) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error { for n := range g.intCh { resp := pb.HelloReply{ Message: strconv.Itoa(n), } if err := stream.Send(&resp); err != nil { if status.Code(err) == codes.Canceled { log.Println("stream closed (context cancelled)") return nil } log.Printf("could not send over stream: %v", err) return err } log.Printf("sent %d", n) } return nil } func (g *greeterServer) SayHello(context.Context, *empty.Empty) (*greeter.HelloReply, error) { resp := pb.HelloReply{ Message: "this WORKED!", } return &resp, nil } func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() -
embano1 created this gist
Feb 12, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,66 @@ package main import ( "context" "grpc-tutorial/greeter" "io" "log" "os" "os/signal" "sync" "syscall" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() conn, err := grpc.DialContext(ctx, "localhost:8080", grpc.WithBlock(), grpc.WithTimeout(3*time.Second), grpc.WithInsecure()) if err != nil { log.Fatalf("could not connect to server: %v", err) } defer conn.Close() cli := greeter.NewGreeterClient(conn) req := greeter.HelloRequest{} stream, err := cli.SayHello(ctx, &req) if err != nil { log.Fatalf("could not establish stream: %v", err) } sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() select { case <-ctx.Done(): return case s := <-sigCh: log.Printf("got signal %v, attempting graceful shutdown", s) cancel() } }() wg.Add(1) go func() { defer wg.Done() for { r, err := stream.Recv() if err != nil { if err == io.EOF || status.Code(err) == codes.Canceled { log.Println("stream closed") cancel() return } log.Fatalf("error while receiving stream response: %v", err) } log.Printf("received value: %+v", r) } }() wg.Wait() } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,92 @@ package main import ( "context" "fmt" pb "grpc-tutorial/greeter" "log" "math/rand" "net" "os" "os/signal" "strconv" "sync" "syscall" "time" "google.golang.org/grpc" ) type greeterServer struct { intCh <-chan int } func generate(ctx context.Context) <-chan int { ch := make(chan int) go func() { for { select { case <-time.Tick(time.Second): rand.Seed(time.Now().UnixNano()) n := rand.Int() // log.Printf("generated %d", n) ch <- n case <-ctx.Done(): close(ch) return } } }() return ch } func (g *greeterServer) SayHello(req *pb.HelloRequest, stream pb.Greeter_SayHelloServer) error { for n := range g.intCh { resp := pb.HelloReply{ Message: strconv.Itoa(n), } if err := stream.Send(&resp); err != nil { return err } log.Printf("sent %d", n) } return nil } func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ch := generate(ctx) gSrv := greeterServer{ intCh: ch, } lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 8080)) if err != nil { log.Fatalf("failed to listen: %v", err) } grpc := grpc.NewServer() pb.RegisterGreeterServer(grpc, &gSrv) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) wg := sync.WaitGroup{} wg.Add(1) go func() { s := <-sigCh log.Printf("got signal %v, attempting graceful shutdown", s) cancel() grpc.GracefulStop() // grpc.Stop() // leads to error while receiving stream response: rpc error: code = Unavailable desc = transport is closing wg.Done() }() log.Println("starting grpc server") err = grpc.Serve(lis) if err != nil { log.Fatalf("could not serve: %v", err) } wg.Wait() log.Println("clean shutdown") }