Last active
September 15, 2020 13:29
-
-
Save cyriltovena/78cddf6ce7be11f62876b8655d2681a6 to your computer and use it in GitHub Desktop.
Revisions
-
cyriltovena revised this gist
Sep 15, 2020 . 1 changed file with 22 additions and 27 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -14,9 +14,7 @@ import ( "strings" "time" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/prometheus/common/model" "github.com/weaveworks/common/user" "google.golang.org/grpc" @@ -96,7 +94,6 @@ func main() { {Type: client.EQUAL, Name: "__name__", Value: "node_cpu_seconds_total"}, {Type: client.EQUAL, Name: "job", Value: "node"}, {Type: client.EQUAL, Name: "mode", Value: "idle"}, }, }) if err != nil { @@ -114,32 +111,30 @@ func main() { } // log.Println("msg rcv") for _, s := range msg.Chunkseries { log.Printf("labels: %s =>", client.FromLabelAdaptersToLabels(s.Labels).String()) // for _, c := range s.Chunks { // ch, err := encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding))) // if err != nil { // log.Println(err.Error()) // continue // } // err = ch.UnmarshalFromBuf(c.Data) // if err != nil { // log.Println(err.Error()) // continue // } // it := ch.NewIterator(nil) // for it.Scan() { // if it.Err() != nil { // log.Println(it.Err()) // continue // } // spew.Dump(it.Value()) // } // } } } } -
cyriltovena created this gist
Sep 15, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,146 @@ package main import ( "bytes" "context" "flag" "fmt" "io" "log" "net/http" "net/url" "os" "path/filepath" "strings" "time" "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/davecgh/go-spew/spew" "github.com/prometheus/common/model" "github.com/weaveworks/common/user" "google.golang.org/grpc" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" "k8s.io/client-go/util/homedir" ) func main() { log.SetOutput(os.Stdout) var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } roundTripper, upgrader, err := spdy.RoundTripperFor(config) if err != nil { panic(err) } for i := 0; i < 33; i++ { log.Printf("ingester-%d\n", i) path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", "thetradedesk", fmt.Sprintf("ingester-%d", i)) hostIP := strings.TrimLeft(config.Host, "htps:/") serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL) stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) out, errOut := new(bytes.Buffer), new(bytes.Buffer) ports := []string{fmt.Sprintf("500%d:9095", i)} forwarder, err := portforward.New(dialer, ports, stopChan, readyChan, out, errOut) if err != nil { panic(err) } go func() { for range readyChan { // Kubernetes will close this channel when it has something to tell us. } if len(errOut.String()) != 0 { panic(errOut.String()) } else if len(out.String()) != 0 { fmt.Println(out.String()) } }() go func() { if err = forwarder.ForwardPorts(); err != nil { // Locks until stopChan is closed. panic(err) } }() <-readyChan conn, err := grpc.Dial(fmt.Sprintf("localhost:500%d", i), grpc.WithInsecure()) if err != nil { panic(err) } defer conn.Close() c := client.NewIngesterClient(conn) ctx := user.InjectOrgID(context.Background(), "327") ctx1, err := user.InjectIntoGRPCRequest(ctx) if err != nil { panic(err) } end := model.Now() start := end.Add(-30 * time.Minute) stream, err := c.QueryStream(ctx1, &client.QueryRequest{ EndTimestampMs: int64(end), StartTimestampMs: int64(start), Matchers: []*client.LabelMatcher{ {Type: client.EQUAL, Name: "__name__", Value: "node_cpu_seconds_total"}, {Type: client.EQUAL, Name: "job", Value: "node"}, {Type: client.EQUAL, Name: "mode", Value: "idle"}, {Type: client.EQUAL, Name: "instance", Value: "p1-ukp-slb007"}, }, }) if err != nil { panic(err) } for { msg, err := stream.Recv() if err != nil { if err == io.EOF { // log.Println("DONE") stopChan <- struct{}{} break } panic(err) } // log.Println("msg rcv") for _, s := range msg.Chunkseries { if client.FromLabelAdaptersToLabels(s.Labels).Get("instance") == "p1-ukp-slb007" && client.FromLabelAdaptersToLabels(s.Labels).Get("cpu") == "11" { log.Printf("labels: %s =>", client.FromLabelAdaptersToLabels(s.Labels).String()) for _, c := range s.Chunks { ch, err := encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding))) if err != nil { log.Println(err.Error()) continue } err = ch.UnmarshalFromBuf(c.Data) if err != nil { log.Println(err.Error()) continue } it := ch.NewIterator(nil) for it.Scan() { if it.Err() != nil { log.Println(it.Err()) continue } spew.Dump(it.Value()) } } } } } } }