Skip to content

Instantly share code, notes, and snippets.

@goodjob1114
Forked from rmosolgo/Gemfile
Created May 30, 2019 09:58
Show Gist options
  • Save goodjob1114/9f590e482a8cbffd7d9bb817269793d5 to your computer and use it in GitHub Desktop.
Save goodjob1114/9f590e482a8cbffd7d9bb817269793d5 to your computer and use it in GitHub Desktop.

Revisions

  1. @rmosolgo rmosolgo revised this gist Aug 10, 2017. 2 changed files with 85 additions and 100 deletions.
    26 changes: 14 additions & 12 deletions Gemfile.lock
    Original file line number Diff line number Diff line change
    @@ -1,27 +1,29 @@
    GIT
    remote: git://github.com/rmosolgo/graphql-ruby.git
    revision: f4500fd7619afa9fc904e0ae03beb6cdb1929f55
    revision: 2c011f6fbc50132706d14a52027f4eea9df2e103
    branch: subscriptions
    specs:
    graphql (1.5.7)
    graphql (1.6.6)

    GEM
    remote: https://rubygems.org/
    specs:
    daemons (1.2.4)
    eventmachine (1.2.3)
    rack (1.6.5)
    rack-protection (1.5.3)
    eventmachine (1.2.5)
    mustermann (1.0.0)
    rack (2.0.3)
    rack-protection (2.0.0)
    rack
    sinatra (1.4.8)
    rack (~> 1.5)
    rack-protection (~> 1.4)
    tilt (>= 1.3, < 3)
    thin (1.7.0)
    sinatra (2.0.0)
    mustermann (~> 1.0)
    rack (~> 2.0)
    rack-protection (= 2.0.0)
    tilt (~> 2.0)
    thin (1.7.2)
    daemons (~> 1.0, >= 1.0.9)
    eventmachine (~> 1.0, >= 1.0.4)
    rack (>= 1, < 3)
    tilt (2.0.7)
    tilt (2.0.8)

    PLATFORMS
    ruby
    @@ -32,4 +34,4 @@ DEPENDENCIES
    thin

    BUNDLED WITH
    1.13.6
    1.15.3
    159 changes: 71 additions & 88 deletions app.rb
    Original file line number Diff line number Diff line change
    @@ -21,104 +21,90 @@ def self.find(id)
    end
    end

    # A "database" of:
    #
    # - { channel => subscription }
    # - { event => [subscription, subcription ...] }
    #
    # Since we're storing data in memory, the `GraphQL::Query` itself is used as the subscription data.
    # IRL, you'd write the query data to disk, then reload it with `each_channel` + `get`.
    module SubscriptionDatabase
    class << self
    # Part of the subscription API: put these subscriptions in the database
    def set(query, events)
    puts "Registering #{query.context[:channel]}"
    queries[query.context[:channel]] = query
    events.each do |ev|
    subscriptions[ev.key] << query
    end
    end

    # Part of the subscription API: load the query data for this channel
    def get(channel)
    query = queries[channel]
    {
    query_string: query.query_string,
    variables: query.provided_variables,
    context: {},
    operation_name: query.operation_name,
    transport: "stream",
    }
    end

    # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one
    def each_channel(event_key)
    subscriptions[event_key].each do |query|
    yield(query.context[:channel])
    end
    end
    class Subscriptions < GraphQL::Subscriptions::Implementation
    def initialize(schema:)
    super
    # Here's the "database":
    #
    # For each counter, who is subscribed to it?
    # @return Hash<String => Array<GraphQL::Query>>
    @subscriptions = Hash.new { |h, event_id| h[event_id] = [] }
    #
    # Given a Channel, who does it belong to?
    # @return Hash<String => GraphQL::Query>
    @queries = {}
    #
    # Given a Channel, return the IO to write to
    # @return Hash<String => IO>
    @streams = {}
    end

    # Not used by GraphQL, but the Application needs some way to unsubscribe
    # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)`
    def delete(channel)
    queries.delete(channel)
    subscriptions.each do |event_key, queries|
    queries.reject! { |q| q.context[:channel] == channel }
    end
    StreamTransport.close(channel)
    # Part of the subscription API: put these subscriptions in the database
    def subscribed(query, events)
    puts "Registering #{query.context[:channel]}"
    @queries[query.context[:channel]] = query
    events.each do |ev|
    @subscriptions[ev.key] << query
    end
    end

    private
    # Part of the subscription API: load the query data for this channel
    def get_subscription(channel)
    query = @queries[channel]
    {
    query_string: query.query_string,
    variables: query.provided_variables,
    context: {},
    operation_name: query.operation_name,
    }
    end

    def subscriptions
    @subscriptions ||= Hash.new { |h, event_id| h[event_id] = [] }
    # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one
    def each_channel(event_key)
    @subscriptions[event_key].each do |query|
    yield(query.context[:channel])
    end
    end

    def queries
    @queries ||= {}
    # Not used by GraphQL, but the Application needs some way to unsubscribe
    # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)`
    def delete(channel)
    @queries.delete(channel)
    @subscriptions.each do |event_key, queries|
    queries.reject! { |q| q.context[:channel] == channel }
    end
    close(channel)
    end
    end

    module Queue
    # Optional subscription API -- could use ActiveJob etc here:
    def self.enqueue(schema, channel, event_key, object)
    def enqueue(channel, event_key, object)
    Thread.new {
    schema.subscriber.process(channel, event_key, object)
    execute(channel, event_key, object)
    }
    end
    end

    module StreamTransport
    class << self
    # Part of the subscription API: send `result` over `channel`.
    def deliver(channel, result, ctx)
    puts "Delivering to #{channel}: #{result}"
    stream = streams[channel]
    # The client _may_ have openned this channel:
    if stream
    stream << "event: update\n"
    stream << "data: #{JSON.dump(result)}\n\n"
    else
    API::Schema.subscriber.delete(channel)
    end
    end

    # Used by the transport layer:
    def open(channel, stream)
    streams[channel] = stream
    end

    # Not used by GraphQL, but needed by the App to unsubscribe
    def close(channel)
    streams.delete(channel)
    # Part of the subscription API: send `result` over `channel`.
    def deliver(channel, result, ctx)
    puts "Delivering to #{channel}: #{result}"
    stream = @streams[channel]
    # The client _may_ have opened this channel:
    if stream
    stream << "event: update\n"
    stream << "data: #{JSON.dump(result)}\n\n"
    else
    # Stream was closed or never opened
    delete(channel)
    end
    end

    private
    # Used by the transport layer:
    def open(channel, stream)
    @streams[channel] = stream
    end

    def streams
    @streams ||= {}
    end
    # Not used by GraphQL, but needed by the App to unsubscribe
    def close(channel)
    @streams.delete(channel)
    end
    end
    end
    @@ -166,10 +152,7 @@ module API

    # Schema, defined from the definition then updated with subscription info
    Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do
    use GraphQL::Subscriptions,
    transports: { "stream" => App::StreamTransport },
    queue: App::Queue,
    store: App::SubscriptionDatabase
    use GraphQL::Subscriptions, implementation: App::Subscriptions
    end
    end

    @@ -181,7 +164,7 @@ module API
    # Send queries here, it will provide a Channel ID which the client _may_ open
    post "/graphql" do
    content_type "application/json"
    channel = rand(10000).to_s
    channel = "socket-#{rand(10000)}"
    res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel })
    [200, {"x-channel-id" => channel}, JSON.dump(res)]
    end
    @@ -192,11 +175,11 @@ module API
    stream(:keep_open) do |out|
    channel = params[:channel]
    puts "Stream for #{channel}"
    App::StreamTransport.open(channel, out)
    API::Schema.subscriber.implementation.open(channel, out)
    out.callback {
    puts "Unsubscribing #{channel}"
    # This is forwarded to the `store`
    API::Schema.subscriber.delete(channel)
    API::Schema.subscriber.implementation.delete(channel)
    }
    end
    end
  2. @rmosolgo rmosolgo revised this gist Apr 21, 2017. 1 changed file with 8 additions and 1 deletion.
    9 changes: 8 additions & 1 deletion app.rb
    Original file line number Diff line number Diff line change
    @@ -312,12 +312,19 @@ module API
    $("#channel-" + channelId).remove()
    }

    var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>0</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>"
    var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>...</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>"
    var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>"
    var counterIds = [1, 2, 3]
    counterIds.forEach(function(counterId) {
    $("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId))
    })

    var initialLoadQuery = "{ c1: counter(id: 1) { value } c2: counter(id: 2) { value } c3: counter(id: 3) { value } }"
    $.post("/graphql", {query: initialLoadQuery}, function(data) {
    $("#counter-value-1").text(data.data.c1.value)
    $("#counter-value-2").text(data.data.c2.value)
    $("#counter-value-3").text(data.data.c3.value)
    })
    </script>
    </body>
    </html>
  3. @rmosolgo rmosolgo revised this gist Apr 21, 2017. 5 changed files with 56 additions and 21 deletions.
    4 changes: 4 additions & 0 deletions Gemfile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,4 @@
    source 'https://rubygems.org'
    gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions"
    gem "sinatra"
    gem "thin"
    35 changes: 35 additions & 0 deletions Gemfile.lock
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    GIT
    remote: git://github.com/rmosolgo/graphql-ruby.git
    revision: f4500fd7619afa9fc904e0ae03beb6cdb1929f55
    branch: subscriptions
    specs:
    graphql (1.5.7)

    GEM
    remote: https://rubygems.org/
    specs:
    daemons (1.2.4)
    eventmachine (1.2.3)
    rack (1.6.5)
    rack-protection (1.5.3)
    rack
    sinatra (1.4.8)
    rack (~> 1.5)
    rack-protection (~> 1.4)
    tilt (>= 1.3, < 3)
    thin (1.7.0)
    daemons (~> 1.0, >= 1.0.9)
    eventmachine (~> 1.0, >= 1.0.4)
    rack (>= 1, < 3)
    tilt (2.0.7)

    PLATFORMS
    ruby

    DEPENDENCIES
    graphql!
    sinatra
    thin

    BUNDLED WITH
    1.13.6
    23 changes: 3 additions & 20 deletions subscriptions.rb → app.rb
    Original file line number Diff line number Diff line change
    @@ -1,23 +1,6 @@
    # Example end-to-end web app with subscriptions, demonstrating APIs.
    # See https://github.com/rmosolgo/graphql-ruby/issues/613
    #
    # @example Running this app
    # ruby subscriptions.rb
    # # visit localhost:4567

    # Dependencies:
    require "bundler/inline"
    gemfile(true) do
    source 'https://rubygems.org'
    gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions", require: true
    gem "sinatra", require: true
    gem "thin", require: true
    end

    set :server, "thin"
    # Apparently these are required when using bundler/inline:
    set :run, true
    enable :inline_templates
    require "graphql"
    require "sinatra"
    require "thin"

    # Here's the application logic:
    # - A set of stateful counters, identified by `#id`. They can be incremented.
    2 changes: 1 addition & 1 deletion config.ru
    Original file line number Diff line number Diff line change
    @@ -1,2 +1,2 @@
    require './app'
    require "./app"
    run Sinatra::Application
    13 changes: 13 additions & 0 deletions readme.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,13 @@
    Example end-to-end web app with subscriptions, demonstrating APIs.

    See https://github.com/rmosolgo/graphql-ruby/issues/613.

    To run:

    ```
    bundle install
    bundle exec rackup
    open localhost:4567
    ```

    ![subscription-demo](https://cloud.githubusercontent.com/assets/2231765/25258475/a93a97fe-260d-11e7-9906-077d0c084851.gif)
  4. @rmosolgo rmosolgo revised this gist Apr 21, 2017. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions config.ru
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,2 @@
    require './app'
    run Sinatra::Application
  5. @rmosolgo rmosolgo created this gist Apr 21, 2017.
    340 changes: 340 additions & 0 deletions subscriptions.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,340 @@
    # Example end-to-end web app with subscriptions, demonstrating APIs.
    # See https://github.com/rmosolgo/graphql-ruby/issues/613
    #
    # @example Running this app
    # ruby subscriptions.rb
    # # visit localhost:4567

    # Dependencies:
    require "bundler/inline"
    gemfile(true) do
    source 'https://rubygems.org'
    gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions", require: true
    gem "sinatra", require: true
    gem "thin", require: true
    end

    set :server, "thin"
    # Apparently these are required when using bundler/inline:
    set :run, true
    enable :inline_templates

    # Here's the application logic:
    # - A set of stateful counters, identified by `#id`. They can be incremented.
    # - An in-memory subscription database
    # - A transport for sending payloads over open sockets
    # - A queuing system for isolating subscription execution & delivery
    module App
    # An incrementing counter, identified by ID
    Counter = Struct.new(:id, :value) do
    def increment
    self.value += 1
    end

    # Get or create a counter by id
    def self.find(id)
    @counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) }
    @counters[id]
    end
    end

    # A "database" of:
    #
    # - { channel => subscription }
    # - { event => [subscription, subcription ...] }
    #
    # Since we're storing data in memory, the `GraphQL::Query` itself is used as the subscription data.
    # IRL, you'd write the query data to disk, then reload it with `each_channel` + `get`.
    module SubscriptionDatabase
    class << self
    # Part of the subscription API: put these subscriptions in the database
    def set(query, events)
    puts "Registering #{query.context[:channel]}"
    queries[query.context[:channel]] = query
    events.each do |ev|
    subscriptions[ev.key] << query
    end
    end

    # Part of the subscription API: load the query data for this channel
    def get(channel)
    query = queries[channel]
    {
    query_string: query.query_string,
    variables: query.provided_variables,
    context: {},
    operation_name: query.operation_name,
    transport: "stream",
    }
    end

    # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one
    def each_channel(event_key)
    subscriptions[event_key].each do |query|
    yield(query.context[:channel])
    end
    end

    # Not used by GraphQL, but the Application needs some way to unsubscribe
    # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)`
    def delete(channel)
    queries.delete(channel)
    subscriptions.each do |event_key, queries|
    queries.reject! { |q| q.context[:channel] == channel }
    end
    StreamTransport.close(channel)
    end

    private

    def subscriptions
    @subscriptions ||= Hash.new { |h, event_id| h[event_id] = [] }
    end

    def queries
    @queries ||= {}
    end
    end
    end

    module Queue
    # Optional subscription API -- could use ActiveJob etc here:
    def self.enqueue(schema, channel, event_key, object)
    Thread.new {
    schema.subscriber.process(channel, event_key, object)
    }
    end
    end

    module StreamTransport
    class << self
    # Part of the subscription API: send `result` over `channel`.
    def deliver(channel, result, ctx)
    puts "Delivering to #{channel}: #{result}"
    stream = streams[channel]
    # The client _may_ have openned this channel:
    if stream
    stream << "event: update\n"
    stream << "data: #{JSON.dump(result)}\n\n"
    else
    API::Schema.subscriber.delete(channel)
    end
    end

    # Used by the transport layer:
    def open(channel, stream)
    streams[channel] = stream
    end

    # Not used by GraphQL, but needed by the App to unsubscribe
    def close(channel)
    streams.delete(channel)
    end

    private

    def streams
    @streams ||= {}
    end
    end
    end
    end

    # Here's the GraphQL API for this application:
    module API
    # Type system:
    Definition = <<-GRAPHQL
    type Subscription {
    counterIncremented(id: ID!): Counter
    }
    type Counter {
    id: ID!
    value: Int!
    }
    type Query {
    counter(id: ID!): Counter
    }
    type Mutation {
    incrementCounter(id: ID!): Counter
    }
    GRAPHQL

    # Resolve functions:
    Resolvers = {
    "Mutation" => {
    "incrementCounter" => ->(o, a, c) {
    counter = App::Counter.find(a["id"])
    counter.increment
    API::Schema.subscriber.trigger("counterIncremented", a, counter)
    counter
    }
    },
    "Query" => {
    "counter" => ->(o, a, c) { App::Counter.find(a["id"]) }
    },
    "Counter" => {
    "value" => ->(o, a, c) { o.value },
    "id" => ->(o, a, c) { o.id },
    }
    }

    # Schema, defined from the definition then updated with subscription info
    Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do
    use GraphQL::Subscriptions,
    transports: { "stream" => App::StreamTransport },
    queue: App::Queue,
    store: App::SubscriptionDatabase
    end
    end

    # Serve the HTML subscription dashboard
    get "/" do
    erb :index
    end

    # Send queries here, it will provide a Channel ID which the client _may_ open
    post "/graphql" do
    content_type "application/json"
    channel = rand(10000).to_s
    res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel })
    [200, {"x-channel-id" => channel}, JSON.dump(res)]
    end

    # Clients may open their channels here to receive updates
    get '/channels/:channel' do
    content_type 'text/event-stream'
    stream(:keep_open) do |out|
    channel = params[:channel]
    puts "Stream for #{channel}"
    App::StreamTransport.open(channel, out)
    out.callback {
    puts "Unsubscribing #{channel}"
    # This is forwarded to the `store`
    API::Schema.subscriber.delete(channel)
    }
    end
    end

    __END__
    @@ index
    <html>
    <head>
    <title>GraphQL Subscriptions Example</title>
    <script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script>
    <style>
    .dashboard {
    display: flex;
    }
    table {
    margin: 20px;
    }
    </style>
    </head>
    <body>
    <h1>Subscriptions</h1>
    <div class="dashboard">
    <table>
    <thead>
    <tr>
    <th colspan="4">Counters</th>
    <tr>
    <tr>
    <th>Id</th>
    <th>Value</td>
    <th colspan="2">Actions</th>
    </tr>
    </thead>
    <tbody id="counters">
    </tbody>
    </table>

    <table>
    <thead>
    <tr>
    <th colspan="3">Updates</th>
    </tr>
    <tr>
    <th>Channel</th>
    <th>Counter</th>
    <th>Value</th>
    </tr>
    </thead>
    <tbody id="updates">
    </tbody>
    </table>

    <table>
    <thead>
    <tr>
    <th colspan="2">Channels</th>
    </tr>
    <tr>
    <th>Id</th>
    <th>Actions</th>
    <tr>
    </thead>
    <tbody id="channels">
    </tbody>
    </table>
    </div>


    <script>
    var eventSources = {}
    var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }"
    var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }"

    function incrementCounter(id) {
    $.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) {
    $("#counter-value-" + id).text(response.data.incrementCounter.value)
    })
    }

    function subscribeToCounter(id) {
    $.ajax({
    type: "POST",
    url: "/graphql",
    data: {
    query: subscriptionString,
    variables: {
    id: id,
    },
    },
    success: function(data, status, jqXHR) {
    var channelId = jqXHR.getResponseHeader("x-channel-id")
    openChannel(channelId)
    }
    })
    }

    function openChannel(channelId) {
    var eventSource = new EventSource("/channels/" + channelId)
    eventSources[channelId] = eventSource
    eventSource.addEventListener("update", function(e) {
    console.log("message", e)
    var result = JSON.parse(e.data)
    var counter = result.data.counterIncremented
    $("#counter-value-" + counter.id).text(counter.value)
    $("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>")
    })
    $("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId))
    }

    function closeChannel(channelId) {
    eventSources[channelId].close()
    delete eventSources[channelId]
    $("#channel-" + channelId).remove()
    }

    var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>0</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>"
    var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>"
    var counterIds = [1, 2, 3]
    counterIds.forEach(function(counterId) {
    $("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId))
    })
    </script>
    </body>
    </html>