require "graphql" require "sinatra" require "thin" # Here's the application logic: # - A set of stateful counters, identified by `#id`. They can be incremented. # - An in-memory subscription database # - A transport for sending payloads over open sockets # - A queuing system for isolating subscription execution & delivery module App # An incrementing counter, identified by ID Counter = Struct.new(:id, :value) do def increment self.value += 1 end # Get or create a counter by id def self.find(id) @counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) } @counters[id] end end class Subscriptions < GraphQL::Subscriptions::Implementation def initialize(schema:) super # Here's the "database": # # For each counter, who is subscribed to it? # @return Hash Array> @subscriptions = Hash.new { |h, event_id| h[event_id] = [] } # # Given a Channel, who does it belong to? # @return Hash GraphQL::Query> @queries = {} # # Given a Channel, return the IO to write to # @return Hash IO> @streams = {} end # Part of the subscription API: put these subscriptions in the database def subscribed(query, events) puts "Registering #{query.context[:channel]}" @queries[query.context[:channel]] = query events.each do |ev| @subscriptions[ev.key] << query end end # Part of the subscription API: load the query data for this channel def get_subscription(channel) query = @queries[channel] { query_string: query.query_string, variables: query.provided_variables, context: {}, operation_name: query.operation_name, } end # Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one def each_channel(event_key) @subscriptions[event_key].each do |query| yield(query.context[:channel]) end end # Not used by GraphQL, but the Application needs some way to unsubscribe # `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)` def delete(channel) @queries.delete(channel) @subscriptions.each do |event_key, queries| queries.reject! { |q| q.context[:channel] == channel } end close(channel) end # Optional subscription API -- could use ActiveJob etc here: def enqueue(channel, event_key, object) Thread.new { execute(channel, event_key, object) } end # Part of the subscription API: send `result` over `channel`. def deliver(channel, result, ctx) puts "Delivering to #{channel}: #{result}" stream = @streams[channel] # The client _may_ have opened this channel: if stream stream << "event: update\n" stream << "data: #{JSON.dump(result)}\n\n" else # Stream was closed or never opened delete(channel) end end # Used by the transport layer: def open(channel, stream) @streams[channel] = stream end # Not used by GraphQL, but needed by the App to unsubscribe def close(channel) @streams.delete(channel) end end end # Here's the GraphQL API for this application: module API # Type system: Definition = <<-GRAPHQL type Subscription { counterIncremented(id: ID!): Counter } type Counter { id: ID! value: Int! } type Query { counter(id: ID!): Counter } type Mutation { incrementCounter(id: ID!): Counter } GRAPHQL # Resolve functions: Resolvers = { "Mutation" => { "incrementCounter" => ->(o, a, c) { counter = App::Counter.find(a["id"]) counter.increment API::Schema.subscriber.trigger("counterIncremented", a, counter) counter } }, "Query" => { "counter" => ->(o, a, c) { App::Counter.find(a["id"]) } }, "Counter" => { "value" => ->(o, a, c) { o.value }, "id" => ->(o, a, c) { o.id }, } } # Schema, defined from the definition then updated with subscription info Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do use GraphQL::Subscriptions, implementation: App::Subscriptions end end # Serve the HTML subscription dashboard get "/" do erb :index end # Send queries here, it will provide a Channel ID which the client _may_ open post "/graphql" do content_type "application/json" channel = "socket-#{rand(10000)}" res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel }) [200, {"x-channel-id" => channel}, JSON.dump(res)] end # Clients may open their channels here to receive updates get '/channels/:channel' do content_type 'text/event-stream' stream(:keep_open) do |out| channel = params[:channel] puts "Stream for #{channel}" API::Schema.subscriber.implementation.open(channel, out) out.callback { puts "Unsubscribing #{channel}" # This is forwarded to the `store` API::Schema.subscriber.implementation.delete(channel) } end end __END__ @@ index GraphQL Subscriptions Example

Subscriptions

Counters
Id Value Actions
Updates
Channel Counter Value
Channels
Id Actions