- 
      
- 
        Save goodjob1114/9f590e482a8cbffd7d9bb817269793d5 to your computer and use it in GitHub Desktop. 
Revisions
- 
        rmosolgo revised this gist Aug 10, 2017 . 2 changed files with 85 additions and 100 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,27 +1,29 @@ GIT remote: git://github.com/rmosolgo/graphql-ruby.git revision: 2c011f6fbc50132706d14a52027f4eea9df2e103 branch: subscriptions specs: graphql (1.6.6) GEM remote: https://rubygems.org/ specs: daemons (1.2.4) eventmachine (1.2.5) mustermann (1.0.0) rack (2.0.3) rack-protection (2.0.0) rack sinatra (2.0.0) mustermann (~> 1.0) rack (~> 2.0) rack-protection (= 2.0.0) tilt (~> 2.0) thin (1.7.2) daemons (~> 1.0, >= 1.0.9) eventmachine (~> 1.0, >= 1.0.4) rack (>= 1, < 3) tilt (2.0.8) PLATFORMS ruby @@ -32,4 +34,4 @@ DEPENDENCIES thin BUNDLED WITH 1.15.3 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -21,104 +21,90 @@ def self.find(id) end end class Subscriptions < GraphQL::Subscriptions::Implementation def initialize(schema:) super # Here's the "database": # # For each counter, who is subscribed to it? # @return Hash<String => Array<GraphQL::Query>> @subscriptions = Hash.new { |h, event_id| h[event_id] = [] } # # Given a Channel, who does it belong to? # @return Hash<String => GraphQL::Query> @queries = {} # # Given a Channel, return the IO to write to # @return Hash<String => IO> @streams = {} end # Part of the subscription API: put these subscriptions in the database def subscribed(query, events) puts "Registering #{query.context[:channel]}" @queries[query.context[:channel]] = query events.each do |ev| @subscriptions[ev.key] << query end end # Part of the subscription API: load the query data for this channel def get_subscription(channel) query = @queries[channel] { query_string: query.query_string, variables: query.provided_variables, context: {}, operation_name: query.operation_name, } end # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one def each_channel(event_key) @subscriptions[event_key].each do |query| yield(query.context[:channel]) end end # Not used by GraphQL, but the Application needs some way to unsubscribe # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)` def delete(channel) @queries.delete(channel) @subscriptions.each do |event_key, queries| queries.reject! { |q| q.context[:channel] == channel } end close(channel) end # Optional subscription API -- could use ActiveJob etc here: def enqueue(channel, event_key, object) Thread.new { execute(channel, event_key, object) } end # Part of the subscription API: send `result` over `channel`. def deliver(channel, result, ctx) puts "Delivering to #{channel}: #{result}" stream = @streams[channel] # The client _may_ have opened this channel: if stream stream << "event: update\n" stream << "data: #{JSON.dump(result)}\n\n" else # Stream was closed or never opened delete(channel) end end # Used by the transport layer: def open(channel, stream) @streams[channel] = stream end # Not used by GraphQL, but needed by the App to unsubscribe def close(channel) @streams.delete(channel) end end end @@ -166,10 +152,7 @@ module API # Schema, defined from the definition then updated with subscription info Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do use GraphQL::Subscriptions, implementation: App::Subscriptions end end @@ -181,7 +164,7 @@ module API # Send queries here, it will provide a Channel ID which the client _may_ open post "/graphql" do content_type "application/json" channel = "socket-#{rand(10000)}" res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel }) [200, {"x-channel-id" => channel}, JSON.dump(res)] end @@ -192,11 +175,11 @@ module API stream(:keep_open) do |out| channel = params[:channel] puts "Stream for #{channel}" API::Schema.subscriber.implementation.open(channel, out) out.callback { puts "Unsubscribing #{channel}" # This is forwarded to the `store` API::Schema.subscriber.implementation.delete(channel) } end end 
- 
        rmosolgo revised this gist Apr 21, 2017 . 1 changed file with 8 additions and 1 deletion.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -312,12 +312,19 @@ module API $("#channel-" + channelId).remove() } var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>...</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>" var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>" var counterIds = [1, 2, 3] counterIds.forEach(function(counterId) { $("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId)) }) var initialLoadQuery = "{ c1: counter(id: 1) { value } c2: counter(id: 2) { value } c3: counter(id: 3) { value } }" $.post("/graphql", {query: initialLoadQuery}, function(data) { $("#counter-value-1").text(data.data.c1.value) $("#counter-value-2").text(data.data.c2.value) $("#counter-value-3").text(data.data.c3.value) }) </script> </body> </html> 
- 
        rmosolgo revised this gist Apr 21, 2017 . 5 changed files with 56 additions and 21 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,4 @@ source 'https://rubygems.org' gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions" gem "sinatra" gem "thin" This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,35 @@ GIT remote: git://github.com/rmosolgo/graphql-ruby.git revision: f4500fd7619afa9fc904e0ae03beb6cdb1929f55 branch: subscriptions specs: graphql (1.5.7) GEM remote: https://rubygems.org/ specs: daemons (1.2.4) eventmachine (1.2.3) rack (1.6.5) rack-protection (1.5.3) rack sinatra (1.4.8) rack (~> 1.5) rack-protection (~> 1.4) tilt (>= 1.3, < 3) thin (1.7.0) daemons (~> 1.0, >= 1.0.9) eventmachine (~> 1.0, >= 1.0.4) rack (>= 1, < 3) tilt (2.0.7) PLATFORMS ruby DEPENDENCIES graphql! sinatra thin BUNDLED WITH 1.13.6 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,23 +1,6 @@ require "graphql" require "sinatra" require "thin" # Here's the application logic: # - A set of stateful counters, identified by `#id`. They can be incremented. This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,2 +1,2 @@ require "./app" run Sinatra::Application This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,13 @@ Example end-to-end web app with subscriptions, demonstrating APIs. See https://github.com/rmosolgo/graphql-ruby/issues/613. To run: ``` bundle install bundle exec rackup open localhost:4567 ```  
- 
        rmosolgo revised this gist Apr 21, 2017 . 1 changed file with 2 additions and 0 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,2 @@ require './app' run Sinatra::Application 
- 
        rmosolgo created this gist Apr 21, 2017 .There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,340 @@ # Example end-to-end web app with subscriptions, demonstrating APIs. # See https://github.com/rmosolgo/graphql-ruby/issues/613 # # @example Running this app # ruby subscriptions.rb # # visit localhost:4567 # Dependencies: require "bundler/inline" gemfile(true) do source 'https://rubygems.org' gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions", require: true gem "sinatra", require: true gem "thin", require: true end set :server, "thin" # Apparently these are required when using bundler/inline: set :run, true enable :inline_templates # Here's the application logic: # - A set of stateful counters, identified by `#id`. They can be incremented. # - An in-memory subscription database # - A transport for sending payloads over open sockets # - A queuing system for isolating subscription execution & delivery module App # An incrementing counter, identified by ID Counter = Struct.new(:id, :value) do def increment self.value += 1 end # Get or create a counter by id def self.find(id) @counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) } @counters[id] end end # A "database" of: # # - { channel => subscription } # - { event => [subscription, subcription ...] } # # Since we're storing data in memory, the `GraphQL::Query` itself is used as the subscription data. # IRL, you'd write the query data to disk, then reload it with `each_channel` + `get`. module SubscriptionDatabase class << self # Part of the subscription API: put these subscriptions in the database def set(query, events) puts "Registering #{query.context[:channel]}" queries[query.context[:channel]] = query events.each do |ev| subscriptions[ev.key] << query end end # Part of the subscription API: load the query data for this channel def get(channel) query = queries[channel] { query_string: query.query_string, variables: query.provided_variables, context: {}, operation_name: query.operation_name, transport: "stream", } end # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one def each_channel(event_key) subscriptions[event_key].each do |query| yield(query.context[:channel]) end end # Not used by GraphQL, but the Application needs some way to unsubscribe # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)` def delete(channel) queries.delete(channel) subscriptions.each do |event_key, queries| queries.reject! { |q| q.context[:channel] == channel } end StreamTransport.close(channel) end private def subscriptions @subscriptions ||= Hash.new { |h, event_id| h[event_id] = [] } end def queries @queries ||= {} end end end module Queue # Optional subscription API -- could use ActiveJob etc here: def self.enqueue(schema, channel, event_key, object) Thread.new { schema.subscriber.process(channel, event_key, object) } end end module StreamTransport class << self # Part of the subscription API: send `result` over `channel`. def deliver(channel, result, ctx) puts "Delivering to #{channel}: #{result}" stream = streams[channel] # The client _may_ have openned this channel: if stream stream << "event: update\n" stream << "data: #{JSON.dump(result)}\n\n" else API::Schema.subscriber.delete(channel) end end # Used by the transport layer: def open(channel, stream) streams[channel] = stream end # Not used by GraphQL, but needed by the App to unsubscribe def close(channel) streams.delete(channel) end private def streams @streams ||= {} end end end end # Here's the GraphQL API for this application: module API # Type system: Definition = <<-GRAPHQL type Subscription { counterIncremented(id: ID!): Counter } type Counter { id: ID! value: Int! } type Query { counter(id: ID!): Counter } type Mutation { incrementCounter(id: ID!): Counter } GRAPHQL # Resolve functions: Resolvers = { "Mutation" => { "incrementCounter" => ->(o, a, c) { counter = App::Counter.find(a["id"]) counter.increment API::Schema.subscriber.trigger("counterIncremented", a, counter) counter } }, "Query" => { "counter" => ->(o, a, c) { App::Counter.find(a["id"]) } }, "Counter" => { "value" => ->(o, a, c) { o.value }, "id" => ->(o, a, c) { o.id }, } } # Schema, defined from the definition then updated with subscription info Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do use GraphQL::Subscriptions, transports: { "stream" => App::StreamTransport }, queue: App::Queue, store: App::SubscriptionDatabase end end # Serve the HTML subscription dashboard get "/" do erb :index end # Send queries here, it will provide a Channel ID which the client _may_ open post "/graphql" do content_type "application/json" channel = rand(10000).to_s res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel }) [200, {"x-channel-id" => channel}, JSON.dump(res)] end # Clients may open their channels here to receive updates get '/channels/:channel' do content_type 'text/event-stream' stream(:keep_open) do |out| channel = params[:channel] puts "Stream for #{channel}" App::StreamTransport.open(channel, out) out.callback { puts "Unsubscribing #{channel}" # This is forwarded to the `store` API::Schema.subscriber.delete(channel) } end end __END__ @@ index <html> <head> <title>GraphQL Subscriptions Example</title> <script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script> <style> .dashboard { display: flex; } table { margin: 20px; } </style> </head> <body> <h1>Subscriptions</h1> <div class="dashboard"> <table> <thead> <tr> <th colspan="4">Counters</th> <tr> <tr> <th>Id</th> <th>Value</td> <th colspan="2">Actions</th> </tr> </thead> <tbody id="counters"> </tbody> </table> <table> <thead> <tr> <th colspan="3">Updates</th> </tr> <tr> <th>Channel</th> <th>Counter</th> <th>Value</th> </tr> </thead> <tbody id="updates"> </tbody> </table> <table> <thead> <tr> <th colspan="2">Channels</th> </tr> <tr> <th>Id</th> <th>Actions</th> <tr> </thead> <tbody id="channels"> </tbody> </table> </div> <script> var eventSources = {} var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }" var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }" function incrementCounter(id) { $.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) { $("#counter-value-" + id).text(response.data.incrementCounter.value) }) } function subscribeToCounter(id) { $.ajax({ type: "POST", url: "/graphql", data: { query: subscriptionString, variables: { id: id, }, }, success: function(data, status, jqXHR) { var channelId = jqXHR.getResponseHeader("x-channel-id") openChannel(channelId) } }) } function openChannel(channelId) { var eventSource = new EventSource("/channels/" + channelId) eventSources[channelId] = eventSource eventSource.addEventListener("update", function(e) { console.log("message", e) var result = JSON.parse(e.data) var counter = result.data.counterIncremented $("#counter-value-" + counter.id).text(counter.value) $("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>") }) $("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId)) } function closeChannel(channelId) { eventSources[channelId].close() delete eventSources[channelId] $("#channel-" + channelId).remove() } var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>0</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>" var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>" var counterIds = [1, 2, 3] counterIds.forEach(function(counterId) { $("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId)) }) </script> </body> </html>