Last active
March 27, 2023 08:38
-
-
Save rmosolgo/ba31acf93f07f8007d99ba365a662d8f to your computer and use it in GitHub Desktop.
GraphQL Ruby Subscriptions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require './app' | |
| run Sinatra::Application |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Example end-to-end web app with subscriptions, demonstrating APIs. | |
| # See https://github.com/rmosolgo/graphql-ruby/issues/613 | |
| # | |
| # @example Running this app | |
| # ruby subscriptions.rb | |
| # # visit localhost:4567 | |
| # Dependencies: | |
| require "bundler/inline" | |
| gemfile(true) do | |
| source 'https://rubygems.org' | |
| gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions", require: true | |
| gem "sinatra", require: true | |
| gem "thin", require: true | |
| end | |
| set :server, "thin" | |
| # Apparently these are required when using bundler/inline: | |
| set :run, true | |
| enable :inline_templates | |
| # Here's the application logic: | |
| # - A set of stateful counters, identified by `#id`. They can be incremented. | |
| # - An in-memory subscription database | |
| # - A transport for sending payloads over open sockets | |
| # - A queuing system for isolating subscription execution & delivery | |
| module App | |
| # An incrementing counter, identified by ID | |
| Counter = Struct.new(:id, :value) do | |
| def increment | |
| self.value += 1 | |
| end | |
| # Get or create a counter by id | |
| def self.find(id) | |
| @counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) } | |
| @counters[id] | |
| end | |
| end | |
| # A "database" of: | |
| # | |
| # - { channel => subscription } | |
| # - { event => [subscription, subcription ...] } | |
| # | |
| # Since we're storing data in memory, the `GraphQL::Query` itself is used as the subscription data. | |
| # IRL, you'd write the query data to disk, then reload it with `each_channel` + `get`. | |
| module SubscriptionDatabase | |
| class << self | |
| # Part of the subscription API: put these subscriptions in the database | |
| def set(query, events) | |
| puts "Registering #{query.context[:channel]}" | |
| queries[query.context[:channel]] = query | |
| events.each do |ev| | |
| subscriptions[ev.key] << query | |
| end | |
| end | |
| # Part of the subscription API: load the query data for this channel | |
| def get(channel) | |
| query = queries[channel] | |
| { | |
| query_string: query.query_string, | |
| variables: query.provided_variables, | |
| context: {}, | |
| operation_name: query.operation_name, | |
| transport: "stream", | |
| } | |
| end | |
| # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one | |
| def each_channel(event_key) | |
| subscriptions[event_key].each do |query| | |
| yield(query.context[:channel]) | |
| end | |
| end | |
| # Not used by GraphQL, but the Application needs some way to unsubscribe | |
| # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)` | |
| def delete(channel) | |
| queries.delete(channel) | |
| subscriptions.each do |event_key, queries| | |
| queries.reject! { |q| q.context[:channel] == channel } | |
| end | |
| StreamTransport.close(channel) | |
| end | |
| private | |
| def subscriptions | |
| @subscriptions ||= Hash.new { |h, event_id| h[event_id] = [] } | |
| end | |
| def queries | |
| @queries ||= {} | |
| end | |
| end | |
| end | |
| module Queue | |
| # Optional subscription API -- could use ActiveJob etc here: | |
| def self.enqueue(schema, channel, event_key, object) | |
| Thread.new { | |
| schema.subscriber.process(channel, event_key, object) | |
| } | |
| end | |
| end | |
| module StreamTransport | |
| class << self | |
| # Part of the subscription API: send `result` over `channel`. | |
| def deliver(channel, result, ctx) | |
| puts "Delivering to #{channel}: #{result}" | |
| stream = streams[channel] | |
| # The client _may_ have openned this channel: | |
| if stream | |
| stream << "event: update\n" | |
| stream << "data: #{JSON.dump(result)}\n\n" | |
| else | |
| API::Schema.subscriber.delete(channel) | |
| end | |
| end | |
| # Used by the transport layer: | |
| def open(channel, stream) | |
| streams[channel] = stream | |
| end | |
| # Not used by GraphQL, but needed by the App to unsubscribe | |
| def close(channel) | |
| streams.delete(channel) | |
| end | |
| private | |
| def streams | |
| @streams ||= {} | |
| end | |
| end | |
| end | |
| end | |
| # Here's the GraphQL API for this application: | |
| module API | |
| # Type system: | |
| Definition = <<-GRAPHQL | |
| type Subscription { | |
| counterIncremented(id: ID!): Counter | |
| } | |
| type Counter { | |
| id: ID! | |
| value: Int! | |
| } | |
| type Query { | |
| counter(id: ID!): Counter | |
| } | |
| type Mutation { | |
| incrementCounter(id: ID!): Counter | |
| } | |
| GRAPHQL | |
| # Resolve functions: | |
| Resolvers = { | |
| "Mutation" => { | |
| "incrementCounter" => ->(o, a, c) { | |
| counter = App::Counter.find(a["id"]) | |
| counter.increment | |
| API::Schema.subscriber.trigger("counterIncremented", a, counter) | |
| counter | |
| } | |
| }, | |
| "Query" => { | |
| "counter" => ->(o, a, c) { App::Counter.find(a["id"]) } | |
| }, | |
| "Counter" => { | |
| "value" => ->(o, a, c) { o.value }, | |
| "id" => ->(o, a, c) { o.id }, | |
| } | |
| } | |
| # Schema, defined from the definition then updated with subscription info | |
| Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do | |
| use GraphQL::Subscriptions, | |
| transports: { "stream" => App::StreamTransport }, | |
| queue: App::Queue, | |
| store: App::SubscriptionDatabase | |
| end | |
| end | |
| # Serve the HTML subscription dashboard | |
| get "/" do | |
| erb :index | |
| end | |
| # Send queries here, it will provide a Channel ID which the client _may_ open | |
| post "/graphql" do | |
| content_type "application/json" | |
| channel = rand(10000).to_s | |
| res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel }) | |
| [200, {"x-channel-id" => channel}, JSON.dump(res)] | |
| end | |
| # Clients may open their channels here to receive updates | |
| get '/channels/:channel' do | |
| content_type 'text/event-stream' | |
| stream(:keep_open) do |out| | |
| channel = params[:channel] | |
| puts "Stream for #{channel}" | |
| App::StreamTransport.open(channel, out) | |
| out.callback { | |
| puts "Unsubscribing #{channel}" | |
| # This is forwarded to the `store` | |
| API::Schema.subscriber.delete(channel) | |
| } | |
| end | |
| end | |
| __END__ | |
| @@ index | |
| <html> | |
| <head> | |
| <title>GraphQL Subscriptions Example</title> | |
| <script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script> | |
| <style> | |
| .dashboard { | |
| display: flex; | |
| } | |
| table { | |
| margin: 20px; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Subscriptions</h1> | |
| <div class="dashboard"> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th colspan="4">Counters</th> | |
| <tr> | |
| <tr> | |
| <th>Id</th> | |
| <th>Value</td> | |
| <th colspan="2">Actions</th> | |
| </tr> | |
| </thead> | |
| <tbody id="counters"> | |
| </tbody> | |
| </table> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th colspan="3">Updates</th> | |
| </tr> | |
| <tr> | |
| <th>Channel</th> | |
| <th>Counter</th> | |
| <th>Value</th> | |
| </tr> | |
| </thead> | |
| <tbody id="updates"> | |
| </tbody> | |
| </table> | |
| <table> | |
| <thead> | |
| <tr> | |
| <th colspan="2">Channels</th> | |
| </tr> | |
| <tr> | |
| <th>Id</th> | |
| <th>Actions</th> | |
| <tr> | |
| </thead> | |
| <tbody id="channels"> | |
| </tbody> | |
| </table> | |
| </div> | |
| <script> | |
| var eventSources = {} | |
| var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }" | |
| var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }" | |
| function incrementCounter(id) { | |
| $.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) { | |
| $("#counter-value-" + id).text(response.data.incrementCounter.value) | |
| }) | |
| } | |
| function subscribeToCounter(id) { | |
| $.ajax({ | |
| type: "POST", | |
| url: "/graphql", | |
| data: { | |
| query: subscriptionString, | |
| variables: { | |
| id: id, | |
| }, | |
| }, | |
| success: function(data, status, jqXHR) { | |
| var channelId = jqXHR.getResponseHeader("x-channel-id") | |
| openChannel(channelId) | |
| } | |
| }) | |
| } | |
| function openChannel(channelId) { | |
| var eventSource = new EventSource("/channels/" + channelId) | |
| eventSources[channelId] = eventSource | |
| eventSource.addEventListener("update", function(e) { | |
| console.log("message", e) | |
| var result = JSON.parse(e.data) | |
| var counter = result.data.counterIncremented | |
| $("#counter-value-" + counter.id).text(counter.value) | |
| $("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>") | |
| }) | |
| $("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId)) | |
| } | |
| function closeChannel(channelId) { | |
| eventSources[channelId].close() | |
| delete eventSources[channelId] | |
| $("#channel-" + channelId).remove() | |
| } | |
| var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>0</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>" | |
| var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>" | |
| var counterIds = [1, 2, 3] | |
| counterIds.forEach(function(counterId) { | |
| $("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId)) | |
| }) | |
| </script> | |
| </body> | |
| </html> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment