Skip to content

Instantly share code, notes, and snippets.

@cyriltovena
Last active September 15, 2020 13:29
Show Gist options
  • Save cyriltovena/78cddf6ce7be11f62876b8655d2681a6 to your computer and use it in GitHub Desktop.
Save cyriltovena/78cddf6ce7be11f62876b8655d2681a6 to your computer and use it in GitHub Desktop.

Revisions

  1. cyriltovena revised this gist Sep 15, 2020. 1 changed file with 22 additions and 27 deletions.
    49 changes: 22 additions & 27 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -14,9 +14,7 @@ import (
    "strings"
    "time"

    "github.com/cortexproject/cortex/pkg/chunk/encoding"
    "github.com/cortexproject/cortex/pkg/ingester/client"
    "github.com/davecgh/go-spew/spew"
    "github.com/prometheus/common/model"
    "github.com/weaveworks/common/user"
    "google.golang.org/grpc"
    @@ -96,7 +94,6 @@ func main() {
    {Type: client.EQUAL, Name: "__name__", Value: "node_cpu_seconds_total"},
    {Type: client.EQUAL, Name: "job", Value: "node"},
    {Type: client.EQUAL, Name: "mode", Value: "idle"},
    {Type: client.EQUAL, Name: "instance", Value: "p1-ukp-slb007"},
    },
    })
    if err != nil {
    @@ -114,32 +111,30 @@ func main() {
    }
    // log.Println("msg rcv")
    for _, s := range msg.Chunkseries {
    if client.FromLabelAdaptersToLabels(s.Labels).Get("instance") == "p1-ukp-slb007" &&
    client.FromLabelAdaptersToLabels(s.Labels).Get("cpu") == "11" {

    log.Printf("labels: %s =>", client.FromLabelAdaptersToLabels(s.Labels).String())
    log.Printf("labels: %s =>", client.FromLabelAdaptersToLabels(s.Labels).String())

    for _, c := range s.Chunks {
    ch, err := encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding)))
    if err != nil {
    log.Println(err.Error())
    continue
    }
    err = ch.UnmarshalFromBuf(c.Data)
    if err != nil {
    log.Println(err.Error())
    continue
    }
    it := ch.NewIterator(nil)
    for it.Scan() {
    if it.Err() != nil {
    log.Println(it.Err())
    continue
    }
    spew.Dump(it.Value())
    }
    }
    }
    // for _, c := range s.Chunks {
    // ch, err := encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding)))
    // if err != nil {
    // log.Println(err.Error())
    // continue
    // }
    // err = ch.UnmarshalFromBuf(c.Data)
    // if err != nil {
    // log.Println(err.Error())
    // continue
    // }
    // it := ch.NewIterator(nil)
    // for it.Scan() {
    // if it.Err() != nil {
    // log.Println(it.Err())
    // continue
    // }
    // spew.Dump(it.Value())
    // }

    // }
    }
    }
    }
  2. cyriltovena created this gist Sep 15, 2020.
    146 changes: 146 additions & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,146 @@
    package main

    import (
    "bytes"
    "context"
    "flag"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "path/filepath"
    "strings"
    "time"

    "github.com/cortexproject/cortex/pkg/chunk/encoding"
    "github.com/cortexproject/cortex/pkg/ingester/client"
    "github.com/davecgh/go-spew/spew"
    "github.com/prometheus/common/model"
    "github.com/weaveworks/common/user"
    "google.golang.org/grpc"
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/tools/portforward"
    "k8s.io/client-go/transport/spdy"
    "k8s.io/client-go/util/homedir"
    )

    func main() {
    log.SetOutput(os.Stdout)
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
    kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
    kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
    panic(err)
    }
    roundTripper, upgrader, err := spdy.RoundTripperFor(config)
    if err != nil {
    panic(err)
    }
    for i := 0; i < 33; i++ {
    log.Printf("ingester-%d\n", i)
    path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", "thetradedesk", fmt.Sprintf("ingester-%d", i))
    hostIP := strings.TrimLeft(config.Host, "htps:/")
    serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}

    dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL)
    stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
    out, errOut := new(bytes.Buffer), new(bytes.Buffer)
    ports := []string{fmt.Sprintf("500%d:9095", i)}

    forwarder, err := portforward.New(dialer, ports, stopChan, readyChan, out, errOut)
    if err != nil {
    panic(err)
    }

    go func() {
    for range readyChan { // Kubernetes will close this channel when it has something to tell us.
    }
    if len(errOut.String()) != 0 {
    panic(errOut.String())
    } else if len(out.String()) != 0 {
    fmt.Println(out.String())
    }
    }()
    go func() {
    if err = forwarder.ForwardPorts(); err != nil { // Locks until stopChan is closed.
    panic(err)
    }
    }()
    <-readyChan
    conn, err := grpc.Dial(fmt.Sprintf("localhost:500%d", i), grpc.WithInsecure())
    if err != nil {
    panic(err)
    }
    defer conn.Close()
    c := client.NewIngesterClient(conn)
    ctx := user.InjectOrgID(context.Background(), "327")
    ctx1, err := user.InjectIntoGRPCRequest(ctx)
    if err != nil {
    panic(err)
    }
    end := model.Now()
    start := end.Add(-30 * time.Minute)
    stream, err := c.QueryStream(ctx1, &client.QueryRequest{
    EndTimestampMs: int64(end),
    StartTimestampMs: int64(start),
    Matchers: []*client.LabelMatcher{
    {Type: client.EQUAL, Name: "__name__", Value: "node_cpu_seconds_total"},
    {Type: client.EQUAL, Name: "job", Value: "node"},
    {Type: client.EQUAL, Name: "mode", Value: "idle"},
    {Type: client.EQUAL, Name: "instance", Value: "p1-ukp-slb007"},
    },
    })
    if err != nil {
    panic(err)
    }
    for {
    msg, err := stream.Recv()
    if err != nil {
    if err == io.EOF {
    // log.Println("DONE")
    stopChan <- struct{}{}
    break
    }
    panic(err)
    }
    // log.Println("msg rcv")
    for _, s := range msg.Chunkseries {
    if client.FromLabelAdaptersToLabels(s.Labels).Get("instance") == "p1-ukp-slb007" &&
    client.FromLabelAdaptersToLabels(s.Labels).Get("cpu") == "11" {

    log.Printf("labels: %s =>", client.FromLabelAdaptersToLabels(s.Labels).String())

    for _, c := range s.Chunks {
    ch, err := encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding)))
    if err != nil {
    log.Println(err.Error())
    continue
    }
    err = ch.UnmarshalFromBuf(c.Data)
    if err != nil {
    log.Println(err.Error())
    continue
    }
    it := ch.NewIterator(nil)
    for it.Scan() {
    if it.Err() != nil {
    log.Println(it.Err())
    continue
    }
    spew.Dump(it.Value())
    }
    }
    }
    }
    }
    }
    }