Example end-to-end web app with subscriptions, demonstrating APIs.
See rmosolgo/graphql-ruby#613.
To run:
bundle install
bundle exec rackup
open localhost:4567
Example end-to-end web app with subscriptions, demonstrating APIs.
See rmosolgo/graphql-ruby#613.
To run:
bundle install
bundle exec rackup
open localhost:4567
| require "graphql" | |
| require "sinatra" | |
| require "thin" | |
| # Here's the application logic: | |
| # - A set of stateful counters, identified by `#id`. They can be incremented. | |
| # - An in-memory subscription database | |
| # - A transport for sending payloads over open sockets | |
| # - A queuing system for isolating subscription execution & delivery | |
| module App | |
| # An incrementing counter, identified by ID | |
| Counter = Struct.new(:id, :value) do | |
| def increment | |
| self.value += 1 | |
| end | |
| # Get or create a counter by id | |
| def self.find(id) | |
| @counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) } | |
| @counters[id] | |
| end | |
| end | |
| class Subscriptions < GraphQL::Subscriptions::Implementation | |
| def initialize(schema:) | |
| super | |
| # Here's the "database": | |
| # | |
| # For each counter, who is subscribed to it? | |
| # @return Hash<String => Array<GraphQL::Query>> | |
| @subscriptions = Hash.new { |h, event_id| h[event_id] = [] } | |
| # | |
| # Given a Channel, who does it belong to? | |
| # @return Hash<String => GraphQL::Query> | |
| @queries = {} | |
| # | |
| # Given a Channel, return the IO to write to | |
| # @return Hash<String => IO> | |
| @streams = {} | |
| end | |
| # Part of the subscription API: put these subscriptions in the database | |
| def subscribed(query, events) | |
| puts "Registering #{query.context[:channel]}" | |
| @queries[query.context[:channel]] = query | |
| events.each do |ev| | |
| @subscriptions[ev.key] << query | |
| end | |
| end | |
| # Part of the subscription API: load the query data for this channel | |
| def get_subscription(channel) | |
| query = @queries[channel] | |
| { | |
| query_string: query.query_string, | |
| variables: query.provided_variables, | |
| context: {}, | |
| operation_name: query.operation_name, | |
| } | |
| end | |
| # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one | |
| def each_channel(event_key) | |
| @subscriptions[event_key].each do |query| | |
| yield(query.context[:channel]) | |
| end | |
| end | |
| # Not used by GraphQL, but the Application needs some way to unsubscribe | |
| # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)` | |
| def delete(channel) | |
| @queries.delete(channel) | |
| @subscriptions.each do |event_key, queries| | |
| queries.reject! { |q| q.context[:channel] == channel } | |
| end | |
| close(channel) | |
| end | |
| # Optional subscription API -- could use ActiveJob etc here: | |
| def enqueue(channel, event_key, object) | |
| Thread.new { | |
| execute(channel, event_key, object) | |
| } | |
| end | |
| # Part of the subscription API: send `result` over `channel`. | |
| def deliver(channel, result, ctx) | |
| puts "Delivering to #{channel}: #{result}" | |
| stream = @streams[channel] | |
| # The client _may_ have opened this channel: | |
| if stream | |
| stream << "event: update\n" | |
| stream << "data: #{JSON.dump(result)}\n\n" | |
| else | |
| # Stream was closed or never opened | |
| delete(channel) | |
| end | |
| end | |
| # Used by the transport layer: | |
| def open(channel, stream) | |
| @streams[channel] = stream | |
| end | |
| # Not used by GraphQL, but needed by the App to unsubscribe | |
| def close(channel) | |
| @streams.delete(channel) | |
| end | |
| end | |
| end | |
| # Here's the GraphQL API for this application: | |
| module API | |
| # Type system: | |
| Definition = <<-GRAPHQL | |
| type Subscription { | |
| counterIncremented(id: ID!): Counter | |
| } | |
| type Counter { | |
| id: ID! | |
| value: Int! | |
| } | |
| type Query { | |
| counter(id: ID!): Counter | |
| } | |
| type Mutation { | |
| incrementCounter(id: ID!): Counter | |
| } | |
| GRAPHQL | |
| # Resolve functions: | |
| Resolvers = { | |
| "Mutation" => { | |
| "incrementCounter" => ->(o, a, c) { | |
| counter = App::Counter.find(a["id"]) | |
| counter.increment | |
| API::Schema.subscriber.trigger("counterIncremented", a, counter) | |
| counter | |
| } | |
| }, | |
| "Query" => { | |
| "counter" => ->(o, a, c) { App::Counter.find(a["id"]) } | |
| }, | |
| "Counter" => { | |
| "value" => ->(o, a, c) { o.value }, | |
| "id" => ->(o, a, c) { o.id }, | |
| } | |
| } | |
| # Schema, defined from the definition then updated with subscription info | |
| Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do | |
| use GraphQL::Subscriptions, implementation: App::Subscriptions | |
| end | |
| end | |
| # Serve the HTML subscription dashboard | |
| get "/" do | |
| erb :index | |
| end | |
| # Send queries here, it will provide a Channel ID which the client _may_ open | |
| post "/graphql" do | |
| content_type "application/json" | |
| channel = "socket-#{rand(10000)}" | |
| res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel }) | |
| [200, {"x-channel-id" => channel}, JSON.dump(res)] | |
| end | |
| # Clients may open their channels here to receive updates | |
| get '/channels/:channel' do | |
| content_type 'text/event-stream' | |
| stream(:keep_open) do |out| | |
| channel = params[:channel] | |
| puts "Stream for #{channel}" | |
| API::Schema.subscriber.implementation.open(channel, out) | |
| out.callback { | |
| puts "Unsubscribing #{channel}" | |
| # This is forwarded to the `store` | |
| API::Schema.subscriber.implementation.delete(channel) | |
| } | |
| end | |
| end | |
| __END__ | |
| @@ index | |
| <html> | |
| <head> | |
| <title>GraphQL Subscriptions Example</title> | |
| <script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script> | |
| <style> | |
| .dashboard { | |
| display: flex; | |
| } | |
| table { | |
| margin: 20px; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Subscriptions</h1> | |
| <div class="dashboard"> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th colspan="4">Counters</th> | |
| <tr> | |
| <tr> | |
| <th>Id</th> | |
| <th>Value</td> | |
| <th colspan="2">Actions</th> | |
| </tr> | |
| </thead> | |
| <tbody id="counters"> | |
| </tbody> | |
| </table> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th colspan="3">Updates</th> | |
| </tr> | |
| <tr> | |
| <th>Channel</th> | |
| <th>Counter</th> | |
| <th>Value</th> | |
| </tr> | |
| </thead> | |
| <tbody id="updates"> | |
| </tbody> | |
| </table> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th colspan="2">Channels</th> | |
| </tr> | |
| <tr> | |
| <th>Id</th> | |
| <th>Actions</th> | |
| <tr> | |
| </thead> | |
| <tbody id="channels"> | |
| </tbody> | |
| </table> | |
| </div> | |
| <script> | |
| var eventSources = {} | |
| var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }" | |
| var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }" | |
| function incrementCounter(id) { | |
| $.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) { | |
| $("#counter-value-" + id).text(response.data.incrementCounter.value) | |
| }) | |
| } | |
| function subscribeToCounter(id) { | |
| $.ajax({ | |
| type: "POST", | |
| url: "/graphql", | |
| data: { | |
| query: subscriptionString, | |
| variables: { | |
| id: id, | |
| }, | |
| }, | |
| success: function(data, status, jqXHR) { | |
| var channelId = jqXHR.getResponseHeader("x-channel-id") | |
| openChannel(channelId) | |
| } | |
| }) | |
| } | |
| function openChannel(channelId) { | |
| var eventSource = new EventSource("/channels/" + channelId) | |
| eventSources[channelId] = eventSource | |
| eventSource.addEventListener("update", function(e) { | |
| console.log("message", e) | |
| var result = JSON.parse(e.data) | |
| var counter = result.data.counterIncremented | |
| $("#counter-value-" + counter.id).text(counter.value) | |
| $("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>") | |
| }) | |
| $("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId)) | |
| } | |
| function closeChannel(channelId) { | |
| eventSources[channelId].close() | |
| delete eventSources[channelId] | |
| $("#channel-" + channelId).remove() | |
| } | |
| var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>...</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>" | |
| var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>" | |
| var counterIds = [1, 2, 3] | |
| counterIds.forEach(function(counterId) { | |
| $("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId)) | |
| }) | |
| var initialLoadQuery = "{ c1: counter(id: 1) { value } c2: counter(id: 2) { value } c3: counter(id: 3) { value } }" | |
| $.post("/graphql", {query: initialLoadQuery}, function(data) { | |
| $("#counter-value-1").text(data.data.c1.value) | |
| $("#counter-value-2").text(data.data.c2.value) | |
| $("#counter-value-3").text(data.data.c3.value) | |
| }) | |
| </script> | |
| </body> | |
| </html> |
| require "./app" | |
| run Sinatra::Application |
| source 'https://rubygems.org' | |
| gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions" | |
| gem "sinatra" | |
| gem "thin" |
| GIT | |
| remote: git://github.com/rmosolgo/graphql-ruby.git | |
| revision: 2c011f6fbc50132706d14a52027f4eea9df2e103 | |
| branch: subscriptions | |
| specs: | |
| graphql (1.6.6) | |
| GEM | |
| remote: https://rubygems.org/ | |
| specs: | |
| daemons (1.2.4) | |
| eventmachine (1.2.5) | |
| mustermann (1.0.0) | |
| rack (2.0.3) | |
| rack-protection (2.0.0) | |
| rack | |
| sinatra (2.0.0) | |
| mustermann (~> 1.0) | |
| rack (~> 2.0) | |
| rack-protection (= 2.0.0) | |
| tilt (~> 2.0) | |
| thin (1.7.2) | |
| daemons (~> 1.0, >= 1.0.9) | |
| eventmachine (~> 1.0, >= 1.0.4) | |
| rack (>= 1, < 3) | |
| tilt (2.0.8) | |
| PLATFORMS | |
| ruby | |
| DEPENDENCIES | |
| graphql! | |
| sinatra | |
| thin | |
| BUNDLED WITH | |
| 1.15.3 |