Skip to content

Instantly share code, notes, and snippets.

@rmosolgo
Last active March 27, 2023 08:38
Show Gist options
  • Save rmosolgo/ba31acf93f07f8007d99ba365a662d8f to your computer and use it in GitHub Desktop.
Save rmosolgo/ba31acf93f07f8007d99ba365a662d8f to your computer and use it in GitHub Desktop.
GraphQL Ruby Subscriptions
require './app'
run Sinatra::Application
# Example end-to-end web app with subscriptions, demonstrating APIs.
# See https://github.com/rmosolgo/graphql-ruby/issues/613
#
# @example Running this app
# ruby subscriptions.rb
# # visit localhost:4567
# Dependencies:
require "bundler/inline"
gemfile(true) do
source 'https://rubygems.org'
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions", require: true
gem "sinatra", require: true
gem "thin", require: true
end
set :server, "thin"
# Apparently these are required when using bundler/inline:
set :run, true
enable :inline_templates
# Here's the application logic:
# - A set of stateful counters, identified by `#id`. They can be incremented.
# - An in-memory subscription database
# - A transport for sending payloads over open sockets
# - A queuing system for isolating subscription execution & delivery
module App
# An incrementing counter, identified by ID
Counter = Struct.new(:id, :value) do
def increment
self.value += 1
end
# Get or create a counter by id
def self.find(id)
@counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) }
@counters[id]
end
end
# A "database" of:
#
# - { channel => subscription }
# - { event => [subscription, subcription ...] }
#
# Since we're storing data in memory, the `GraphQL::Query` itself is used as the subscription data.
# IRL, you'd write the query data to disk, then reload it with `each_channel` + `get`.
module SubscriptionDatabase
class << self
# Part of the subscription API: put these subscriptions in the database
def set(query, events)
puts "Registering #{query.context[:channel]}"
queries[query.context[:channel]] = query
events.each do |ev|
subscriptions[ev.key] << query
end
end
# Part of the subscription API: load the query data for this channel
def get(channel)
query = queries[channel]
{
query_string: query.query_string,
variables: query.provided_variables,
context: {},
operation_name: query.operation_name,
transport: "stream",
}
end
# Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one
def each_channel(event_key)
subscriptions[event_key].each do |query|
yield(query.context[:channel])
end
end
# Not used by GraphQL, but the Application needs some way to unsubscribe
# `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)`
def delete(channel)
queries.delete(channel)
subscriptions.each do |event_key, queries|
queries.reject! { |q| q.context[:channel] == channel }
end
StreamTransport.close(channel)
end
private
def subscriptions
@subscriptions ||= Hash.new { |h, event_id| h[event_id] = [] }
end
def queries
@queries ||= {}
end
end
end
module Queue
# Optional subscription API -- could use ActiveJob etc here:
def self.enqueue(schema, channel, event_key, object)
Thread.new {
schema.subscriber.process(channel, event_key, object)
}
end
end
module StreamTransport
class << self
# Part of the subscription API: send `result` over `channel`.
def deliver(channel, result, ctx)
puts "Delivering to #{channel}: #{result}"
stream = streams[channel]
# The client _may_ have openned this channel:
if stream
stream << "event: update\n"
stream << "data: #{JSON.dump(result)}\n\n"
else
API::Schema.subscriber.delete(channel)
end
end
# Used by the transport layer:
def open(channel, stream)
streams[channel] = stream
end
# Not used by GraphQL, but needed by the App to unsubscribe
def close(channel)
streams.delete(channel)
end
private
def streams
@streams ||= {}
end
end
end
end
# Here's the GraphQL API for this application:
module API
# Type system:
Definition = <<-GRAPHQL
type Subscription {
counterIncremented(id: ID!): Counter
}
type Counter {
id: ID!
value: Int!
}
type Query {
counter(id: ID!): Counter
}
type Mutation {
incrementCounter(id: ID!): Counter
}
GRAPHQL
# Resolve functions:
Resolvers = {
"Mutation" => {
"incrementCounter" => ->(o, a, c) {
counter = App::Counter.find(a["id"])
counter.increment
API::Schema.subscriber.trigger("counterIncremented", a, counter)
counter
}
},
"Query" => {
"counter" => ->(o, a, c) { App::Counter.find(a["id"]) }
},
"Counter" => {
"value" => ->(o, a, c) { o.value },
"id" => ->(o, a, c) { o.id },
}
}
# Schema, defined from the definition then updated with subscription info
Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do
use GraphQL::Subscriptions,
transports: { "stream" => App::StreamTransport },
queue: App::Queue,
store: App::SubscriptionDatabase
end
end
# Serve the HTML subscription dashboard
get "/" do
erb :index
end
# Send queries here, it will provide a Channel ID which the client _may_ open
post "/graphql" do
content_type "application/json"
channel = rand(10000).to_s
res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel })
[200, {"x-channel-id" => channel}, JSON.dump(res)]
end
# Clients may open their channels here to receive updates
get '/channels/:channel' do
content_type 'text/event-stream'
stream(:keep_open) do |out|
channel = params[:channel]
puts "Stream for #{channel}"
App::StreamTransport.open(channel, out)
out.callback {
puts "Unsubscribing #{channel}"
# This is forwarded to the `store`
API::Schema.subscriber.delete(channel)
}
end
end
__END__
@@ index
<html>
<head>
<title>GraphQL Subscriptions Example</title>
<script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script>
<style>
.dashboard {
display: flex;
}
table {
margin: 20px;
}
</style>
</head>
<body>
<h1>Subscriptions</h1>
<div class="dashboard">
<table>
<thead>
<tr>
<th colspan="4">Counters</th>
<tr>
<tr>
<th>Id</th>
<th>Value</td>
<th colspan="2">Actions</th>
</tr>
</thead>
<tbody id="counters">
</tbody>
</table>
<table>
<thead>
<tr>
<th colspan="3">Updates</th>
</tr>
<tr>
<th>Channel</th>
<th>Counter</th>
<th>Value</th>
</tr>
</thead>
<tbody id="updates">
</tbody>
</table>
<table>
<thead>
<tr>
<th colspan="2">Channels</th>
</tr>
<tr>
<th>Id</th>
<th>Actions</th>
<tr>
</thead>
<tbody id="channels">
</tbody>
</table>
</div>
<script>
var eventSources = {}
var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }"
var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }"
function incrementCounter(id) {
$.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) {
$("#counter-value-" + id).text(response.data.incrementCounter.value)
})
}
function subscribeToCounter(id) {
$.ajax({
type: "POST",
url: "/graphql",
data: {
query: subscriptionString,
variables: {
id: id,
},
},
success: function(data, status, jqXHR) {
var channelId = jqXHR.getResponseHeader("x-channel-id")
openChannel(channelId)
}
})
}
function openChannel(channelId) {
var eventSource = new EventSource("/channels/" + channelId)
eventSources[channelId] = eventSource
eventSource.addEventListener("update", function(e) {
console.log("message", e)
var result = JSON.parse(e.data)
var counter = result.data.counterIncremented
$("#counter-value-" + counter.id).text(counter.value)
$("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>")
})
$("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId))
}
function closeChannel(channelId) {
eventSources[channelId].close()
delete eventSources[channelId]
$("#channel-" + channelId).remove()
}
var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>0</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>"
var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>"
var counterIds = [1, 2, 3]
counterIds.forEach(function(counterId) {
$("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId))
})
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment