# Example end-to-end web app with subscriptions, demonstrating APIs.
# See https://github.com/rmosolgo/graphql-ruby/issues/613
#
# @example Running this app
# ruby subscriptions.rb
# # visit localhost:4567
# Dependencies:
require "bundler/inline"
gemfile(true) do
source 'https://rubygems.org'
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions", require: true
gem "sinatra", require: true
gem "thin", require: true
end
set :server, "thin"
# Apparently these are required when using bundler/inline:
set :run, true
enable :inline_templates
# Here's the application logic:
# - A set of stateful counters, identified by `#id`. They can be incremented.
# - An in-memory subscription database
# - A transport for sending payloads over open sockets
# - A queuing system for isolating subscription execution & delivery
module App
# An incrementing counter, identified by ID
Counter = Struct.new(:id, :value) do
def increment
self.value += 1
end
# Get or create a counter by id
def self.find(id)
@counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) }
@counters[id]
end
end
# A "database" of:
#
# - { channel => subscription }
# - { event => [subscription, subcription ...] }
#
# Since we're storing data in memory, the `GraphQL::Query` itself is used as the subscription data.
# IRL, you'd write the query data to disk, then reload it with `each_channel` + `get`.
module SubscriptionDatabase
class << self
# Part of the subscription API: put these subscriptions in the database
def set(query, events)
puts "Registering #{query.context[:channel]}"
queries[query.context[:channel]] = query
events.each do |ev|
subscriptions[ev.key] << query
end
end
# Part of the subscription API: load the query data for this channel
def get(channel)
query = queries[channel]
{
query_string: query.query_string,
variables: query.provided_variables,
context: {},
operation_name: query.operation_name,
transport: "stream",
}
end
# Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one
def each_channel(event_key)
subscriptions[event_key].each do |query|
yield(query.context[:channel])
end
end
# Not used by GraphQL, but the Application needs some way to unsubscribe
# `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)`
def delete(channel)
queries.delete(channel)
subscriptions.each do |event_key, queries|
queries.reject! { |q| q.context[:channel] == channel }
end
StreamTransport.close(channel)
end
private
def subscriptions
@subscriptions ||= Hash.new { |h, event_id| h[event_id] = [] }
end
def queries
@queries ||= {}
end
end
end
module Queue
# Optional subscription API -- could use ActiveJob etc here:
def self.enqueue(schema, channel, event_key, object)
Thread.new {
schema.subscriber.process(channel, event_key, object)
}
end
end
module StreamTransport
class << self
# Part of the subscription API: send `result` over `channel`.
def deliver(channel, result, ctx)
puts "Delivering to #{channel}: #{result}"
stream = streams[channel]
# The client _may_ have openned this channel:
if stream
stream << "event: update\n"
stream << "data: #{JSON.dump(result)}\n\n"
else
API::Schema.subscriber.delete(channel)
end
end
# Used by the transport layer:
def open(channel, stream)
streams[channel] = stream
end
# Not used by GraphQL, but needed by the App to unsubscribe
def close(channel)
streams.delete(channel)
end
private
def streams
@streams ||= {}
end
end
end
end
# Here's the GraphQL API for this application:
module API
# Type system:
Definition = <<-GRAPHQL
type Subscription {
counterIncremented(id: ID!): Counter
}
type Counter {
id: ID!
value: Int!
}
type Query {
counter(id: ID!): Counter
}
type Mutation {
incrementCounter(id: ID!): Counter
}
GRAPHQL
# Resolve functions:
Resolvers = {
"Mutation" => {
"incrementCounter" => ->(o, a, c) {
counter = App::Counter.find(a["id"])
counter.increment
API::Schema.subscriber.trigger("counterIncremented", a, counter)
counter
}
},
"Query" => {
"counter" => ->(o, a, c) { App::Counter.find(a["id"]) }
},
"Counter" => {
"value" => ->(o, a, c) { o.value },
"id" => ->(o, a, c) { o.id },
}
}
# Schema, defined from the definition then updated with subscription info
Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do
use GraphQL::Subscriptions,
transports: { "stream" => App::StreamTransport },
queue: App::Queue,
store: App::SubscriptionDatabase
end
end
# Serve the HTML subscription dashboard
get "/" do
erb :index
end
# Send queries here, it will provide a Channel ID which the client _may_ open
post "/graphql" do
content_type "application/json"
channel = rand(10000).to_s
res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel })
[200, {"x-channel-id" => channel}, JSON.dump(res)]
end
# Clients may open their channels here to receive updates
get '/channels/:channel' do
content_type 'text/event-stream'
stream(:keep_open) do |out|
channel = params[:channel]
puts "Stream for #{channel}"
App::StreamTransport.open(channel, out)
out.callback {
puts "Unsubscribing #{channel}"
# This is forwarded to the `store`
API::Schema.subscriber.delete(channel)
}
end
end
__END__
@@ index
GraphQL Subscriptions Example
Subscriptions
| Counters |
| Id |
Value
| Actions |
| Updates |
| Channel |
Counter |
Value |