Skip to content

Instantly share code, notes, and snippets.

@claudemirogb
Last active September 22, 2016 20:58
Show Gist options
  • Select an option

  • Save claudemirogb/7a1f084ae0da3d6c800bab6e8b6c724b to your computer and use it in GitHub Desktop.

Select an option

Save claudemirogb/7a1f084ae0da3d6c800bab6e8b6c724b to your computer and use it in GitHub Desktop.

Revisions

  1. claudemirogb revised this gist Sep 22, 2016. 1 changed file with 73 additions and 19 deletions.
    92 changes: 73 additions & 19 deletions fbp.go
    Original file line number Diff line number Diff line change
    @@ -5,11 +5,13 @@ import (
    "errors"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "reflect"
    "strings"
    "time"

    "github.com/gorilla/websocket"
    "github.com/rs/xlog"
    )

    @@ -137,10 +139,10 @@ func (r *readComponent) Run(ctx context.Context) {
    log := xlog.FromContext(message.ctx)

    log.Info("Reading", xlog.F{
    "message": message.in,
    "message": message.str,
    })

    content, err := ioutil.ReadFile(message.in)
    content, err := ioutil.ReadFile(message.str)

    if err != nil {
    log.Error("Error", xlog.F{
    @@ -158,16 +160,16 @@ func (r *readComponent) Run(ctx context.Context) {
    }

    type sendRequestComponent struct {
    URL chan string `port:"url"`
    Message chan string `port:"message"`
    Err chan string `port:"err"`
    URL chan StringWithContext `port:"url"`
    Message chan StringWithContext `port:"message"`
    Err chan string `port:"err"`
    }

    func (s *sendRequestComponent) Run(ctx context.Context) {
    for {
    select {
    case url := <-s.URL:
    response, err := http.Get(url)
    response, err := http.Get(url.str)

    if err != nil {
    s.Err <- err.Error()
    @@ -181,7 +183,7 @@ func (s *sendRequestComponent) Run(ctx context.Context) {
    continue
    }

    s.Message <- string(data)
    s.Message <- NewStringWithContext(url.ctx, string(data))

    response.Body.Close()
    case <-ctx.Done():
    @@ -190,6 +192,22 @@ func (s *sendRequestComponent) Run(ctx context.Context) {
    }
    }

    type Notifier struct {
    In chan StringWithContext
    }

    func (n *Notifier) Run(ctx context.Context) {
    for {
    select {
    case content := <-n.In:
    ws := content.ctx.Value("ws").(*websocket.Conn)
    ws.WriteJSON(content.str)
    case <-ctx.Done():
    return
    }
    }
    }

    type Writer struct {
    In chan string
    }
    @@ -206,15 +224,15 @@ func (w *Writer) Run(ctx context.Context) {
    }

    type LowerCaseString struct {
    In chan string
    Out chan string
    In chan StringWithContext
    Out chan StringWithContext
    }

    func (l *LowerCaseString) Run(ctx context.Context) {
    for {
    select {
    case data := <-l.In:
    l.Out <- strings.ToLower(data)
    l.Out <- NewStringWithContext(data.ctx, strings.ToLower(data.str))
    case <-ctx.Done():
    return
    }
    @@ -223,35 +241,71 @@ func (l *LowerCaseString) Run(ctx context.Context) {

    type StringWithContext struct {
    ctx context.Context
    in string
    str string
    }

    func NewStringWithContext(ctx context.Context, str string) StringWithContext {
    return StringWithContext{
    ctx: ctx,
    in: str,
    str: str,
    }
    }

    func main() {
    initial := make(chan string)
    initial := make(chan StringWithContext)

    graph := &Graph{
    bufferCapacity: 10,
    }

    graph.AddNode("LowerCaseString", new(LowerCaseString))
    graph.AddNode("Writer", new(Writer))
    graph.AddNode("SendRequest", new(sendRequestComponent))
    graph.AddNode("Notifier", new(Notifier))
    // graph.AddNode("SendRequest", new(sendRequestComponent))

    //graph.AddEdge("LowerCaseString.Out -> SendRequest.URL", make(chan string))
    graph.AddEdge("LowerCaseString.Out -> SendRequest.URL")
    graph.AddEdge("SendRequest.Message -> Writer.In")
    graph.AddEdge("LowerCaseString.Out -> Notifier.In")
    // graph.AddEdge("SendRequest.Message -> Notifier.In")
    graph.AddInitial(initial, "LowerCaseString.In")

    graph.Run(context.Background())
    defer graph.Shutdown()

    initial <- "HTTPS://HTTPBIN.ORG/IP"
    time.Sleep(1 * time.Second)
    var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
    return true
    },
    }

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    ws, err := upgrader.Upgrade(w, r, nil)

    if err != nil {
    log.Println("upgrade:", err)
    return
    }

    defer ws.Close()

    ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
    defer cancel()

    go func() {
    for {
    var message string

    err := ws.ReadJSON(&message)

    if err != nil {
    log.Printf("readjson: %s", err)
    return
    }

    another := context.WithValue(ctx, "ws", ws)
    initial <- NewStringWithContext(another, message)
    }
    }()

    <-ctx.Done()
    })
    http.ListenAndServe(":8080", nil)
    }
  2. claudemirogb revised this gist Sep 18, 2016. 1 changed file with 123 additions and 90 deletions.
    213 changes: 123 additions & 90 deletions fbp.go
    Original file line number Diff line number Diff line change
    @@ -2,9 +2,12 @@ package main

    import (
    "context"
    "errors"
    "fmt"
    "io/ioutil"
    "net/http"
    "reflect"
    "strings"
    "time"

    "github.com/rs/xlog"
    @@ -15,23 +18,19 @@ type Component interface {
    Run(ctx context.Context)
    }

    // ComponentFunc is a wrapper for a Component
    type ComponentFunc func(ctx context.Context)

    // Run starts the component
    func (c ComponentFunc) Run(ctx context.Context) { c(ctx) }

    // Graph is a graph network
    // has a map ot registered elements
    type Graph struct {
    ctx context.Context
    cancel context.CancelFunc
    components map[string]Component
    ctx context.Context
    cancel context.CancelFunc
    components map[string]Component
    channels []reflect.Value
    bufferCapacity int
    }

    // Register is used to give a name to the Component, so it can be used in the Connect function
    // ex: RegisterComponent("read", c)
    func (n *Graph) Register(name string, c Component) error {
    // AddNode is used to give a name to the Component, so it can be used in the AddEdge function
    // ex: AddNodeComponent("read", c)
    func (n *Graph) AddNode(name string, c Component) error {
    if n.components == nil {
    n.components = make(map[string]Component)
    }
    @@ -41,11 +40,68 @@ func (n *Graph) Register(name string, c Component) error {
    return nil
    }

    // Connect connect the input and output of the given components
    func (n *Graph) AddInitial(in interface{}, definition string) error {
    left := strings.Split(definition, ".")
    c := strings.TrimSpace(left[0])
    out := strings.TrimSpace(left[1])

    inType := reflect.TypeOf(in)
    outType, _ := reflect.TypeOf(n.components[c]).Elem().FieldByName(out)

    if inType.Kind() != reflect.Chan {
    return errors.New("in must be a channel")
    }

    if outType.Type.Kind() != reflect.Chan {
    return errors.New("out must be a channel")
    }

    if inType != outType.Type {
    return errors.New("in and out must be the same channel types")
    }

    reflect.ValueOf(n.components[c]).Elem().FieldByName(out).Set(reflect.ValueOf(in))

    return nil
    }

    // AddEdge connect the input and output of the given components
    // in -> out
    // Ex: Connect("read.out -> write.in")
    // Panic if element is not registered???
    func (n *Graph) Connect(definition string) error {
    func (n *Graph) AddEdge(definition string) error {

    pieces := strings.Split(definition, "->")

    left := strings.Split(pieces[0], ".")
    a := strings.TrimSpace(left[0])
    in := strings.TrimSpace(left[1])

    right := strings.Split(pieces[1], ".")
    b := strings.TrimSpace(right[0])
    out := strings.TrimSpace(right[1])

    inType, _ := reflect.TypeOf(n.components[a]).Elem().FieldByName(in)
    outType, _ := reflect.TypeOf(n.components[b]).Elem().FieldByName(out)

    if inType.Type.Kind() != reflect.Chan {
    return errors.New("in must be a channel")
    }

    if outType.Type.Kind() != reflect.Chan {
    return errors.New("out must be a channel")
    }

    if inType.Type != outType.Type {
    return errors.New("in and out must be the same channel types")
    }

    edge := reflect.MakeChan(inType.Type, n.bufferCapacity)

    n.channels = append(n.channels, edge)

    reflect.ValueOf(n.components[a]).Elem().FieldByName(in).Set(edge)
    reflect.ValueOf(n.components[b]).Elem().FieldByName(out).Set(edge)

    return nil
    }

    @@ -62,6 +118,10 @@ func (n *Graph) Run(ctx context.Context) {
    // also should closes all the created channels.
    func (n *Graph) Shutdown() {
    n.cancel()

    for _, c := range n.channels {
    c.Close()
    }
    }

    type readComponent struct {
    @@ -98,30 +158,30 @@ func (r *readComponent) Run(ctx context.Context) {
    }

    type sendRequestComponent struct {
    url chan StringWithContext `port:"url"`
    message chan string `port:"message"`
    err chan string `port:"err"`
    URL chan string `port:"url"`
    Message chan string `port:"message"`
    Err chan string `port:"err"`
    }

    func (s *sendRequestComponent) Run(ctx context.Context) {
    for {
    select {
    case url := <-s.url:
    response, err := http.Get(url.in)
    case url := <-s.URL:
    response, err := http.Get(url)

    if err != nil {
    s.err <- err.Error()
    s.Err <- err.Error()
    continue
    }

    data, err := ioutil.ReadAll(response.Body)

    if err != nil {
    s.err <- err.Error()
    s.Err <- err.Error()
    continue
    }

    s.message <- string(data)
    s.Message <- string(data)

    response.Body.Close()
    case <-ctx.Done():
    @@ -130,17 +190,35 @@ func (s *sendRequestComponent) Run(ctx context.Context) {
    }
    }

    func writeComponent(in chan string) ComponentFunc {
    return ComponentFunc(func(ctx context.Context) {
    for {
    select {
    case content := <-in:
    fmt.Println(content)
    case <-ctx.Done():
    return
    }
    type Writer struct {
    In chan string
    }

    func (w *Writer) Run(ctx context.Context) {
    for {
    select {
    case content := <-w.In:
    fmt.Println(content)
    case <-ctx.Done():
    return
    }
    }
    }

    type LowerCaseString struct {
    In chan string
    Out chan string
    }

    func (l *LowerCaseString) Run(ctx context.Context) {
    for {
    select {
    case data := <-l.In:
    l.Out <- strings.ToLower(data)
    case <-ctx.Done():
    return
    }
    })
    }
    }

    type StringWithContext struct {
    @@ -156,69 +234,24 @@ func NewStringWithContext(ctx context.Context, str string) StringWithContext {
    }

    func main() {
    conf := xlog.Config{
    Level: xlog.LevelInfo,
    Fields: xlog.F{
    "role": "FBP",
    },
    Output: xlog.NewOutputChannel(xlog.NewConsoleOutput()),
    }
    initial := make(chan string)

    logger := xlog.New(conf)

    capacity := 10 // Opcional. Pode ser 0.

    urlPort := make(chan StringWithContext, capacity)
    readInPort := make(chan StringWithContext, capacity)
    readErrPort := make(chan string, capacity)
    writeInPort := make(chan string, capacity)

    s := &sendRequestComponent{
    url: urlPort,
    message: writeInPort,
    graph := &Graph{
    bufferCapacity: 10,
    }

    read := &readComponent{
    in: readInPort,
    out: writeInPort,
    err: readErrPort,
    }
    graph.AddNode("LowerCaseString", new(LowerCaseString))
    graph.AddNode("Writer", new(Writer))
    graph.AddNode("SendRequest", new(sendRequestComponent))

    // Component as a function
    write := writeComponent(writeInPort)
    errorWriter := writeComponent(readErrPort)

    graph := &Graph{}
    graph.Register("SendRequest", s)
    graph.Register("Read", read)
    graph.Register("Write", write)
    graph.Register("ErrorWrite", errorWriter)

    // Ainda não tenho certeza da forma como pode ser feita essa conexão :(
    // Isto não faz nada por enquanto
    // graph.Connect(s, "message", write, "in")
    // graph.Connect(theChannel, s, "message", write, "in")
    graph.Connect("SendRequest.message -> Write.in")
    graph.Connect("Read.out -> Write.in")
    graph.Connect("Read.err -> ErrorWrite.in")
    //graph.AddEdge("LowerCaseString.Out -> SendRequest.URL", make(chan string))
    graph.AddEdge("LowerCaseString.Out -> SendRequest.URL")
    graph.AddEdge("SendRequest.Message -> Writer.In")
    graph.AddInitial(initial, "LowerCaseString.In")

    graph.Run(context.Background())
    defer graph.Shutdown()

    ctx := xlog.NewContext(context.Background(), logger)
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    files := []string{
    "/home/claudemiro/Downloads/b9c4d81d9c8a4eef2cbfe1228c90eeea/fbp.go",
    "Hello World",
    }

    for i := 0; i < 2; i++ {
    readInPort <- NewStringWithContext(ctx, files[i])
    time.Sleep(500 * time.Millisecond)
    }

    urlPort <- NewStringWithContext(ctx, "https://httpbin.org/ip")
    time.Sleep(3 * time.Second)
    initial <- "HTTPS://HTTPBIN.ORG/IP"
    time.Sleep(1 * time.Second)
    }
  3. claudemirogb created this gist Sep 15, 2016.
    224 changes: 224 additions & 0 deletions fbp.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,224 @@
    package main

    import (
    "context"
    "fmt"
    "io/ioutil"
    "net/http"
    "time"

    "github.com/rs/xlog"
    )

    // Component represents a runnable component
    type Component interface {
    Run(ctx context.Context)
    }

    // ComponentFunc is a wrapper for a Component
    type ComponentFunc func(ctx context.Context)

    // Run starts the component
    func (c ComponentFunc) Run(ctx context.Context) { c(ctx) }

    // Graph is a graph network
    // has a map ot registered elements
    type Graph struct {
    ctx context.Context
    cancel context.CancelFunc
    components map[string]Component
    }

    // Register is used to give a name to the Component, so it can be used in the Connect function
    // ex: RegisterComponent("read", c)
    func (n *Graph) Register(name string, c Component) error {
    if n.components == nil {
    n.components = make(map[string]Component)
    }

    n.components[name] = c

    return nil
    }

    // Connect connect the input and output of the given components
    // in -> out
    // Ex: Connect("read.out -> write.in")
    // Panic if element is not registered???
    func (n *Graph) Connect(definition string) error {
    return nil
    }

    // Run start all components
    func (n *Graph) Run(ctx context.Context) {
    n.ctx, n.cancel = context.WithCancel(ctx)

    for _, c := range n.components {
    go c.Run(n.ctx)
    }
    }

    // Shutdown sends a cancel to all the components of the graph
    // also should closes all the created channels.
    func (n *Graph) Shutdown() {
    n.cancel()
    }

    type readComponent struct {
    in chan StringWithContext `port:"in"`
    out chan string `port:"out"`
    err chan string `port:"error"`
    }

    func (r *readComponent) Run(ctx context.Context) {
    for {
    select {
    case message := <-r.in:
    log := xlog.FromContext(message.ctx)

    log.Info("Reading", xlog.F{
    "message": message.in,
    })

    content, err := ioutil.ReadFile(message.in)

    if err != nil {
    log.Error("Error", xlog.F{
    "error": err,
    })
    r.err <- err.Error()
    continue
    }

    r.out <- string(content)
    case <-ctx.Done():
    return
    }
    }
    }

    type sendRequestComponent struct {
    url chan StringWithContext `port:"url"`
    message chan string `port:"message"`
    err chan string `port:"err"`
    }

    func (s *sendRequestComponent) Run(ctx context.Context) {
    for {
    select {
    case url := <-s.url:
    response, err := http.Get(url.in)

    if err != nil {
    s.err <- err.Error()
    continue
    }

    data, err := ioutil.ReadAll(response.Body)

    if err != nil {
    s.err <- err.Error()
    continue
    }

    s.message <- string(data)

    response.Body.Close()
    case <-ctx.Done():
    return
    }
    }
    }

    func writeComponent(in chan string) ComponentFunc {
    return ComponentFunc(func(ctx context.Context) {
    for {
    select {
    case content := <-in:
    fmt.Println(content)
    case <-ctx.Done():
    return
    }
    }
    })
    }

    type StringWithContext struct {
    ctx context.Context
    in string
    }

    func NewStringWithContext(ctx context.Context, str string) StringWithContext {
    return StringWithContext{
    ctx: ctx,
    in: str,
    }
    }

    func main() {
    conf := xlog.Config{
    Level: xlog.LevelInfo,
    Fields: xlog.F{
    "role": "FBP",
    },
    Output: xlog.NewOutputChannel(xlog.NewConsoleOutput()),
    }

    logger := xlog.New(conf)

    capacity := 10 // Opcional. Pode ser 0.

    urlPort := make(chan StringWithContext, capacity)
    readInPort := make(chan StringWithContext, capacity)
    readErrPort := make(chan string, capacity)
    writeInPort := make(chan string, capacity)

    s := &sendRequestComponent{
    url: urlPort,
    message: writeInPort,
    }

    read := &readComponent{
    in: readInPort,
    out: writeInPort,
    err: readErrPort,
    }

    // Component as a function
    write := writeComponent(writeInPort)
    errorWriter := writeComponent(readErrPort)

    graph := &Graph{}
    graph.Register("SendRequest", s)
    graph.Register("Read", read)
    graph.Register("Write", write)
    graph.Register("ErrorWrite", errorWriter)

    // Ainda não tenho certeza da forma como pode ser feita essa conexão :(
    // Isto não faz nada por enquanto
    // graph.Connect(s, "message", write, "in")
    // graph.Connect(theChannel, s, "message", write, "in")
    graph.Connect("SendRequest.message -> Write.in")
    graph.Connect("Read.out -> Write.in")
    graph.Connect("Read.err -> ErrorWrite.in")

    graph.Run(context.Background())
    defer graph.Shutdown()

    ctx := xlog.NewContext(context.Background(), logger)
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    files := []string{
    "/home/claudemiro/Downloads/b9c4d81d9c8a4eef2cbfe1228c90eeea/fbp.go",
    "Hello World",
    }

    for i := 0; i < 2; i++ {
    readInPort <- NewStringWithContext(ctx, files[i])
    time.Sleep(500 * time.Millisecond)
    }

    urlPort <- NewStringWithContext(ctx, "https://httpbin.org/ip")
    time.Sleep(3 * time.Second)
    }