path: root/actioncable
diff options
Diffstat (limited to 'actioncable')
86 files changed, 5002 insertions, 0 deletions
diff --git a/actioncable/.gitignore b/actioncable/.gitignore
new file mode 100644
index 0000000000..0a04b29786
--- /dev/null
+++ b/actioncable/.gitignore
@@ -0,0 +1,2 @@
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
new file mode 100644
index 0000000000..e671a07563
--- /dev/null
+++ b/actioncable/CHANGELOG.md
@@ -0,0 +1,28 @@
+## Rails 5.0.0.beta2 (February 01, 2016) ##
+* Support PostgreSQL pubsub adapter.
+ *Jon Moss*
+* Remove EventMachine dependency.
+ *Matthew Draper*
+* Remove Celluloid dependency.
+ *Mike Perham*
+* Create notion of an `ActionCable::SubscriptionAdapter`.
+ Separate out Redis functionality into
+ `ActionCable::SubscriptionAdapter::Redis`, and add a
+ PostgreSQL adapter as well. Configuration file for
+ ActionCable was changed from`config/redis/cable.yml` to
+ `config/cable.yml`.
+ *Jon Moss*
+## Rails 5.0.0.beta1 (December 18, 2015) ##
+* Added to Rails!
+ *DHH*
diff --git a/actioncable/MIT-LICENSE b/actioncable/MIT-LICENSE
new file mode 100644
index 0000000000..27a17cf41b
--- /dev/null
+++ b/actioncable/MIT-LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2015-2016 Basecamp, LLC
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
diff --git a/actioncable/README.md b/actioncable/README.md
new file mode 100644
index 0000000000..ac57532b62
--- /dev/null
+++ b/actioncable/README.md
@@ -0,0 +1,473 @@
+# Action Cable – Integrated WebSockets for Rails
+Action Cable seamlessly integrates WebSockets with the rest of your Rails application.
+It allows for real-time features to be written in Ruby in the same style
+and form as the rest of your Rails application, while still being performant
+and scalable. It's a full-stack offering that provides both a client-side
+JavaScript framework and a server-side Ruby framework. You have access to your full
+domain model written with Active Record or your ORM of choice.
+## Terminology
+A single Action Cable server can handle multiple connection instances. It has one
+connection instance per WebSocket connection. A single user may have multiple
+WebSockets open to your application if they use multiple browser tabs or devices.
+The client of a WebSocket connection is called the consumer.
+Each consumer can in turn subscribe to multiple cable channels. Each channel encapsulates
+a logical unit of work, similar to what a controller does in a regular MVC setup. For example,
+you could have a `ChatChannel` and a `AppearancesChannel`, and a consumer could be subscribed to either
+or to both of these channels. At the very least, a consumer should be subscribed to one channel.
+When the consumer is subscribed to a channel, they act as a subscriber. The connection between
+the subscriber and the channel is, surprise-surprise, called a subscription. A consumer
+can act as a subscriber to a given channel any number of times. For example, a consumer
+could subscribe to multiple chat rooms at the same time. (And remember that a physical user may
+have multiple consumers, one per tab/device open to your connection).
+Each channel can then again be streaming zero or more broadcastings. A broadcasting is a
+pubsub link where anything transmitted by the broadcaster is sent directly to the channel
+subscribers who are streaming that named broadcasting.
+As you can see, this is a fairly deep architectural stack. There's a lot of new terminology
+to identify the new pieces, and on top of that, you're dealing with both client and server side
+reflections of each unit.
+## Examples
+### A full-stack example
+The first thing you must do is define your `ApplicationCable::Connection` class in Ruby. This
+is the place where you authorize the incoming connection, and proceed to establish it
+if all is well. Here's the simplest example starting with the server-side connection class:
+# app/channels/application_cable/connection.rb
+module ApplicationCable
+ class Connection < ActionCable::Connection::Base
+ identified_by :current_user
+ def connect
+ self.current_user = find_verified_user
+ end
+ protected
+ def find_verified_user
+ if current_user = User.find_by(id: cookies.signed[:user_id])
+ current_user
+ else
+ reject_unauthorized_connection
+ end
+ end
+ end
+Here `identified_by` is a connection identifier that can be used to find the specific connection again or later.
+Note that anything marked as an identifier will automatically create a delegate by the same name on any channel instances created off the connection.
+This relies on the fact that you will already have handled authentication of the user, and
+that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
+automatically sent to the connection instance when a new connection is attempted, and you
+use that to set the `current_user`. By identifying the connection by this same current_user,
+you're also ensuring that you can later retrieve all open connections by a given user (and
+potentially disconnect them all if the user is deleted or deauthorized).
+Then you should define your `ApplicationCable::Channel` class in Ruby. This is the place where you put
+shared logic between your channels.
+# app/channels/application_cable/channel.rb
+module ApplicationCable
+ class Channel < ActionCable::Channel::Base
+ end
+The client-side needs to setup a consumer instance of this connection. That's done like so:
+# app/assets/javascripts/cable.coffee
+#= require action_cable
+@App = {}
+App.cable = ActionCable.createConsumer("ws://cable.example.com")
+The ws://cable.example.com address must point to your set of Action Cable servers, and it
+must share a cookie namespace with the rest of the application (which may live under http://example.com).
+This ensures that the signed cookie will be correctly sent.
+That's all you need to establish the connection! But of course, this isn't very useful in
+itself. This just gives you the plumbing. To make stuff happen, you need content. That content
+is defined by declaring channels on the server and allowing the consumer to subscribe to them.
+### Channel example 1: User appearances
+Here's a simple example of a channel that tracks whether a user is online or not and what page they're on.
+(This is useful for creating presence features like showing a green dot next to a user name if they're online).
+First you declare the server-side channel:
+# app/channels/appearance_channel.rb
+class AppearanceChannel < ApplicationCable::Channel
+ def subscribed
+ current_user.appear
+ end
+ def unsubscribed
+ current_user.disappear
+ end
+ def appear(data)
+ current_user.appear on: data['appearing_on']
+ end
+ def away
+ current_user.away
+ end
+The `#subscribed` callback is invoked when, as we'll show below, a client-side subscription is initiated. In this case,
+we take that opportunity to say "the current user has indeed appeared". That appear/disappear API could be backed by
+Redis or a database or whatever else. Here's what the client-side of that looks like:
+# app/assets/javascripts/cable/subscriptions/appearance.coffee
+App.cable.subscriptions.create "AppearanceChannel",
+ # Called when the subscription is ready for use on the server
+ connected: ->
+ @install()
+ @appear()
+ # Called when the WebSocket connection is closed
+ disconnected: ->
+ @uninstall()
+ # Called when the subscription is rejected by the server
+ rejected: ->
+ @uninstall()
+ appear: ->
+ # Calls `AppearanceChannel#appear(data)` on the server
+ @perform("appear", appearing_on: $("main").data("appearing-on"))
+ away: ->
+ # Calls `AppearanceChannel#away` on the server
+ @perform("away")
+ buttonSelector = "[data-behavior~=appear_away]"
+ install: ->
+ $(document).on "page:change.appearance", =>
+ @appear()
+ $(document).on "click.appearance", buttonSelector, =>
+ @away()
+ false
+ $(buttonSelector).show()
+ uninstall: ->
+ $(document).off(".appearance")
+ $(buttonSelector).hide()
+Simply calling `App.cable.subscriptions.create` will setup the subscription, which will call `AppearanceChannel#subscribed`,
+which in turn is linked to original `App.cable` -> `ApplicationCable::Connection` instances.
+We then link the client-side `appear` method to `AppearanceChannel#appear(data)`. This is possible because the server-side
+channel instance will automatically expose the public methods declared on the class (minus the callbacks), so that these
+can be reached as remote procedure calls via a subscription's `perform` method.
+### Channel example 2: Receiving new web notifications
+The appearance example was all about exposing server functionality to client-side invocation over the WebSocket connection.
+But the great thing about WebSockets is that it's a two-way street. So now let's show an example where the server invokes
+action on the client.
+This is a web notification channel that allows you to trigger client-side web notifications when you broadcast to the right
+# app/channels/web_notifications_channel.rb
+class WebNotificationsChannel < ApplicationCable::Channel
+ def subscribed
+ stream_from "web_notifications_#{current_user.id}"
+ end
+# Client-side, which assumes you've already requested the right to send web notifications
+App.cable.subscriptions.create "WebNotificationsChannel",
+ received: (data) ->
+ new Notification data["title"], body: data["body"]
+# Somewhere in your app this is called, perhaps from a NewCommentJob
+ActionCable.server.broadcast \
+ "web_notifications_#{current_user.id}", { title: 'New things!', body: 'All the news that is fit to print' }
+The `ActionCable.server.broadcast` call places a message in the Redis' pubsub queue under a separate broadcasting name for each user. For a user with an ID of 1, the broadcasting name would be `web_notifications_1`.
+The channel has been instructed to stream everything that arrives at `web_notifications_1` directly to the client by invoking the
+`#received(data)` callback. The data is the hash sent as the second parameter to the server-side broadcast call, JSON encoded for the trip
+across the wire, and unpacked for the data argument arriving to `#received`.
+### Passing Parameters to Channel
+You can pass parameters from the client side to the server side when creating a subscription. For example:
+# app/channels/chat_channel.rb
+class ChatChannel < ApplicationCable::Channel
+ def subscribed
+ stream_from "chat_#{params[:room]}"
+ end
+Pass an object as the first argument to `subscriptions.create`, and that object will become your params hash in your cable channel. The keyword `channel` is required.
+# Client-side, which assumes you've already requested the right to send web notifications
+App.cable.subscriptions.create { channel: "ChatChannel", room: "Best Room" },
+ received: (data) ->
+ @appendLine(data)
+ appendLine: (data) ->
+ html = @createLine(data)
+ $("[data-chat-room='Best Room']").append(html)
+ createLine: (data) ->
+ """
+ <article class="chat-line">
+ <span class="speaker">#{data["sent_by"]}</span>
+ <span class="body">#{data["body"]}</span>
+ </article>
+ """
+# Somewhere in your app this is called, perhaps from a NewCommentJob
+ActionCable.server.broadcast \
+ "chat_#{room}", { sent_by: 'Paul', body: 'This is a cool chat app.' }
+### Rebroadcasting message
+A common use case is to rebroadcast a message sent by one client to any other connected clients.
+# app/channels/chat_channel.rb
+class ChatChannel < ApplicationCable::Channel
+ def subscribed
+ stream_from "chat_#{params[:room]}"
+ end
+ def receive(data)
+ ActionCable.server.broadcast "chat_#{params[:room]}", data
+ end
+# Client-side, which assumes you've already requested the right to send web notifications
+App.chatChannel = App.cable.subscriptions.create { channel: "ChatChannel", room: "Best Room" },
+ received: (data) ->
+ # data => { sent_by: "Paul", body: "This is a cool chat app." }
+App.chatChannel.send({ sent_by: "Paul", body: "This is a cool chat app." })
+The rebroadcast will be received by all connected clients, _including_ the client that sent the message. Note that params are the same as they were when you subscribed to the channel.
+### More complete examples
+See the [rails/actioncable-examples](http://github.com/rails/actioncable-examples) repository for a full example of how to setup Action Cable in a Rails app and adding channels.
+## Configuration
+Action Cable has three required configurations: the Redis connection, allowed request origins, and the cable server url (which can optionally be set on the client side).
+### Redis
+By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/cable.yml')`.
+This file must specify a Redis url for each Rails environment. It may use the following format:
+production: &production
+ url: redis://
+development: &development
+ url: redis://localhost:6379
+test: *development
+You can also change the location of the Redis config file in a Rails initializer with something like:
+Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml"
+### Allowed Request Origins
+Action Cable will only accept requests from specified origins, which are passed to the server config as an array. The origins can be instances of strings or regular expressions, against which a check for match will be performed.
+ActionCable.server.config.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/]
+When running in the development environment, this defaults to "http://localhost:3000".
+To disable and allow requests from any origin:
+ActionCable.server.config.disable_request_forgery_protection = true
+### Consumer Configuration
+Once you have decided how to run your cable server (see below), you must provide the server url (or path) to your client-side setup.
+There are two ways you can do this.
+The first is to simply pass it in when creating your consumer. For a standalone server,
+this would be something like: `App.cable = ActionCable.createConsumer("ws://example.com:28080")`, and for an in-app server,
+something like: `App.cable = ActionCable.createConsumer("/cable")`.
+The second option is to pass the server url through the `action_cable_meta_tag` in your layout.
+This uses a url or path typically set via `config.action_cable.url` in the environment configuration files, or defaults to "/cable".
+This method is especially useful if your websocket url might change between environments. If you host your production server via https, you will need to use the wss scheme
+for your ActionCable server, but development might remain http and use the ws scheme. You might use localhost in development and your
+domain in production.
+In any case, to vary the websocket url between environments, add the following configuration to each environment:
+config.action_cable.url = "ws://example.com:28080"
+Then add the following line to your layout before your JavaScript tag:
+<%= action_cable_meta_tag %>
+And finally, create your consumer like so:
+App.cable = ActionCable.createConsumer()
+### Other Configurations
+The other common option to configure is the log tags applied to the per-connection logger. Here's close to what we're using in Basecamp:
+ActionCable.server.config.log_tags = [
+ -> request { request.env['bc.account_id'] || "no-account" },
+ :action_cable,
+ -> request { request.uuid }
+For a full list of all configuration options, see the `ActionCable::Server::Configuration` class.
+Also note that your server must provide at least the same number of database connections as you have workers. The default worker pool is set to 100, so that means you have to make at least that available. You can change that in `config/database.yml` through the `pool` attribute.
+## Running the cable server
+### Standalone
+The cable server(s) is separated from your normal application server. It's still a rack application, but it is its own rack
+application. The recommended basic setup is as follows:
+# cable/config.ru
+require ::File.expand_path('../../config/environment', __FILE__)
+run ActionCable.server
+Then you start the server using a binstub in bin/cable ala:
+bundle exec puma -p 28080 cable/config.ru
+The above will start a cable server on port 28080.
+### In app
+If you are using a threaded server like Puma or Thin, the current implementation of ActionCable can run side-along with your Rails application. For example, to listen for WebSocket requests on `/cable`, mount the server at that path:
+# config/routes.rb
+Example::Application.routes.draw do
+ mount ActionCable.server => '/cable'
+For every instance of your server you create and for every worker your server spawns, you will also have a new instance of ActionCable, but the use of Redis keeps messages synced across connections.
+### Notes
+Beware that currently the cable server will _not_ auto-reload any changes in the framework. As we've discussed, long-running cable connections mean long-running objects. We don't yet have a way of reloading the classes of those objects in a safe manner. So when you change your channels, or the model your channels use, you must restart the cable server.
+We'll get all this abstracted properly when the framework is integrated into Rails.
+The WebSocket server doesn't have access to the session, but it has access to the cookies. This can be used when you need to handle authentication. You can see one way of doing that with Devise in this [article](http://www.rubytutorial.io/actioncable-devise-authentication).
+## Dependencies
+Action Cable is currently tied to Redis through its use of the pubsub feature to route
+messages back and forth over the WebSocket cable connection. This dependency may well
+be alleviated in the future, but for the moment that's what it is. So be sure to have
+Redis installed and running.
+The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).
+## Deployment
+Action Cable is powered by a combination of websockets and threads. All of the
+connection management is handled internally by utilizing Ruby’s native thread
+support, which means you can use all your regular Rails models with no problems
+as long as you haven’t committed any thread-safety sins.
+But this also means that Action Cable needs to run in its own server process.
+So you'll have one set of server processes for your normal web work, and another
+set of server processes for the Action Cable. The former can be single-threaded,
+like Unicorn, but the latter must be multi-threaded, like Puma.
+## License
+Action Cable is released under the MIT license:
+* http://www.opensource.org/licenses/MIT
+## Support
+API documentation is at:
+* http://api.rubyonrails.org
+Bug reports can be filed for the Ruby on Rails project here:
+* https://github.com/rails/rails/issues
+Feature requests should be discussed on the rails-core mailing list here:
+* https://groups.google.com/forum/?fromgroups#!forum/rubyonrails-core
diff --git a/actioncable/Rakefile b/actioncable/Rakefile
new file mode 100644
index 0000000000..1d77fc7067
--- /dev/null
+++ b/actioncable/Rakefile
@@ -0,0 +1,57 @@
+require 'rake/testtask'
+require 'pathname'
+require 'sprockets'
+require 'coffee-script'
+require 'action_cable'
+dir = File.dirname(__FILE__)
+task :default => :test
+task :package => "assets:compile"
+task "package:clean" => "assets:clean"
+Rake::TestTask.new do |t|
+ t.libs << "test"
+ t.test_files = Dir.glob("#{dir}/test/**/*_test.rb")
+ t.warning = true
+ t.verbose = true
+ t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
+namespace :assets do
+ root_path = Pathname.new(dir)
+ destination_path = root_path.join("lib/assets/compiled")
+ desc "Compile dist/action_cable.js"
+ task :compile do
+ puts 'Compiling Action Cable assets...'
+ precompile_list = %w(action_cable.js)
+ environment = Sprockets::Environment.new
+ environment.gzip = false
+ Pathname.glob(root_path.join("app/assets/*/")) do |subdir|
+ environment.append_path subdir
+ end
+ compile_path = root_path.join("tmp/sprockets")
+ compile_path.rmtree if compile_path.exist?
+ compile_path.mkpath
+ manifest = Sprockets::Manifest.new(environment.index, compile_path)
+ manifest.compile(precompile_list)
+ destination_path.rmtree if destination_path.exist?
+ manifest.assets.each do |path, fingerprint_path|
+ destination_path.join(path).dirname.mkpath
+ FileUtils.cp(compile_path.join(fingerprint_path), destination_path.join(path))
+ end
+ puts 'Done'
+ end
+ task :clean do
+ destination_path.rmtree if destination_path.exist?
+ end
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
new file mode 100644
index 0000000000..c65ff7871f
--- /dev/null
+++ b/actioncable/actioncable.gemspec
@@ -0,0 +1,25 @@
+version = File.read(File.expand_path('../../RAILS_VERSION', __FILE__)).strip
+Gem::Specification.new do |s|
+ s.platform = Gem::Platform::RUBY
+ s.name = 'actioncable'
+ s.version = version
+ s.summary = 'WebSocket framework for Rails.'
+ s.description = 'Structure many real-time application concerns into channels over a single WebSocket connection.'
+ s.required_ruby_version = '>= 2.2.2'
+ s.license = 'MIT'
+ s.author = ['Pratik Naik', 'David Heinemeier Hansson']
+ s.email = ['pratiknaik@gmail.com', 'david@loudthinking.com']
+ s.homepage = 'http://rubyonrails.org'
+ s.files = Dir['CHANGELOG.md', 'MIT-LICENSE', 'README.md', 'lib/**/*']
+ s.require_path = 'lib'
+ s.add_dependency 'actionpack', version
+ s.add_dependency 'nio4r', '~> 1.2'
+ s.add_dependency 'websocket-driver', '~> 0.6.1'
diff --git a/actioncable/app/assets/javascripts/action_cable.coffee.erb b/actioncable/app/assets/javascripts/action_cable.coffee.erb
new file mode 100644
index 0000000000..18a48c0610
--- /dev/null
+++ b/actioncable/app/assets/javascripts/action_cable.coffee.erb
@@ -0,0 +1,23 @@
+#= require_self
+#= require ./action_cable/consumer
+@ActionCable =
+ INTERNAL: <%= ActionCable::INTERNAL.to_json %>
+ createConsumer: (url = @getConfig("url")) ->
+ new ActionCable.Consumer @createWebSocketURL(url)
+ getConfig: (name) ->
+ element = document.head.querySelector("meta[name='action-cable-#{name}']")
+ element?.getAttribute("content")
+ createWebSocketURL: (url) ->
+ if url and not /^wss?:/i.test(url)
+ a = document.createElement("a")
+ a.href = url
+ # Fix populating Location properties in IE. Otherwise, protocol will be blank.
+ a.href = a.href
+ a.protocol = a.protocol.replace("http", "ws")
+ a.href
+ else
+ url
diff --git a/actioncable/app/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee
new file mode 100644
index 0000000000..fbd7dbd35b
--- /dev/null
+++ b/actioncable/app/assets/javascripts/action_cable/connection.coffee
@@ -0,0 +1,81 @@
+# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation.
+{message_types} = ActionCable.INTERNAL
+class ActionCable.Connection
+ @reopenDelay: 500
+ constructor: (@consumer) ->
+ @open()
+ send: (data) ->
+ if @isOpen()
+ @webSocket.send(JSON.stringify(data))
+ true
+ else
+ false
+ open: =>
+ if @webSocket and not @isState("closed")
+ throw new Error("Existing connection must be closed before opening")
+ else
+ @webSocket = new WebSocket(@consumer.url)
+ @installEventHandlers()
+ true
+ close: ->
+ @webSocket?.close()
+ reopen: ->
+ if @isState("closed")
+ @open()
+ else
+ try
+ @close()
+ finally
+ setTimeout(@open, @constructor.reopenDelay)
+ isOpen: ->
+ @isState("open")
+ # Private
+ isState: (states...) ->
+ @getState() in states
+ getState: ->
+ return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState
+ null
+ installEventHandlers: ->
+ for eventName of @events
+ handler = @events[eventName].bind(this)
+ @webSocket["on#{eventName}"] = handler
+ return
+ events:
+ message: (event) ->
+ {identifier, message, type} = JSON.parse(event.data)
+ switch type
+ when message_types.confirmation
+ @consumer.subscriptions.notify(identifier, "connected")
+ when message_types.rejection
+ @consumer.subscriptions.reject(identifier)
+ else
+ @consumer.subscriptions.notify(identifier, "received", message)
+ open: ->
+ @disconnected = false
+ @consumer.subscriptions.reload()
+ close: ->
+ @disconnect()
+ error: ->
+ @disconnect()
+ disconnect: ->
+ return if @disconnected
+ @disconnected = true
+ @consumer.subscriptions.notifyAll("disconnected")
diff --git a/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee
new file mode 100644
index 0000000000..99b9a1c6d5
--- /dev/null
+++ b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee
@@ -0,0 +1,79 @@
+# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting
+# revival reconnections if things go astray. Internal class, not intended for direct user manipulation.
+class ActionCable.ConnectionMonitor
+ @pollInterval:
+ min: 3
+ max: 30
+ @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings)
+ identifier: ActionCable.INTERNAL.identifiers.ping
+ constructor: (@consumer) ->
+ @consumer.subscriptions.add(this)
+ @start()
+ connected: ->
+ @reset()
+ @pingedAt = now()
+ delete @disconnectedAt
+ disconnected: ->
+ @disconnectedAt = now()
+ received: ->
+ @pingedAt = now()
+ reset: ->
+ @reconnectAttempts = 0
+ start: ->
+ @reset()
+ delete @stoppedAt
+ @startedAt = now()
+ @poll()
+ document.addEventListener("visibilitychange", @visibilityDidChange)
+ stop: ->
+ @stoppedAt = now()
+ document.removeEventListener("visibilitychange", @visibilityDidChange)
+ poll: ->
+ setTimeout =>
+ unless @stoppedAt
+ @reconnectIfStale()
+ @poll()
+ , @getInterval()
+ getInterval: ->
+ {min, max} = @constructor.pollInterval
+ interval = 5 * Math.log(@reconnectAttempts + 1)
+ clamp(interval, min, max) * 1000
+ reconnectIfStale: ->
+ if @connectionIsStale()
+ @reconnectAttempts++
+ unless @disconnectedRecently()
+ @consumer.connection.reopen()
+ connectionIsStale: ->
+ secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold
+ disconnectedRecently: ->
+ @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold
+ visibilityDidChange: =>
+ if document.visibilityState is "visible"
+ setTimeout =>
+ if @connectionIsStale() or not @consumer.connection.isOpen()
+ @consumer.connection.reopen()
+ , 200
+ now = ->
+ new Date().getTime()
+ secondsSince = (time) ->
+ (now() - time) / 1000
+ clamp = (number, min, max) ->
+ Math.max(min, Math.min(max, number))
diff --git a/actioncable/app/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee
new file mode 100644
index 0000000000..717c0641a9
--- /dev/null
+++ b/actioncable/app/assets/javascripts/action_cable/consumer.coffee
@@ -0,0 +1,25 @@
+#= require ./connection
+#= require ./connection_monitor
+#= require ./subscriptions
+#= require ./subscription
+# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established,
+# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates.
+# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription
+# method.
+# The following example shows how this can be setup:
+# @App = {}
+# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1"
+# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
+# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription.
+class ActionCable.Consumer
+ constructor: (@url) ->
+ @subscriptions = new ActionCable.Subscriptions this
+ @connection = new ActionCable.Connection this
+ @connectionMonitor = new ActionCable.ConnectionMonitor this
+ send: (data) ->
+ @connection.send(data)
diff --git a/actioncable/app/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee
new file mode 100644
index 0000000000..339d676933
--- /dev/null
+++ b/actioncable/app/assets/javascripts/action_cable/subscription.coffee
@@ -0,0 +1,68 @@
+# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer.
+# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding
+# Channel instance on the server side.
+# An example demonstrates the basic functionality:
+# App.appearance = App.cable.subscriptions.create "AppearanceChannel",
+# connected: ->
+# # Called once the subscription has been successfully completed
+# appear: ->
+# @perform 'appear', appearing_on: @appearingOn()
+# away: ->
+# @perform 'away'
+# appearingOn: ->
+# $('main').data 'appearing-on'
+# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server
+# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away).
+# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter.
+# This is how the server component would look:
+# class AppearanceChannel < ApplicationActionCable::Channel
+# def subscribed
+# current_user.appear
+# end
+# def unsubscribed
+# current_user.disappear
+# end
+# def appear(data)
+# current_user.appear on: data['appearing_on']
+# end
+# def away
+# current_user.away
+# end
+# end
+# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name.
+# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method.
+class ActionCable.Subscription
+ constructor: (@subscriptions, params = {}, mixin) ->
+ @identifier = JSON.stringify(params)
+ extend(this, mixin)
+ @subscriptions.add(this)
+ @consumer = @subscriptions.consumer
+ # Perform a channel action with the optional data passed as an attribute
+ perform: (action, data = {}) ->
+ data.action = action
+ @send(data)
+ send: (data) ->
+ @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data))
+ unsubscribe: ->
+ @subscriptions.remove(this)
+ extend = (object, properties) ->
+ if properties?
+ for key, value of properties
+ object[key] = value
+ object
diff --git a/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee
new file mode 100644
index 0000000000..ae041ffa2b
--- /dev/null
+++ b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee
@@ -0,0 +1,64 @@
+# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user
+# us ActionCable.Subscriptions#create, and it should be called through the consumer like so:
+# @App = {}
+# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1"
+# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
+# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription.
+class ActionCable.Subscriptions
+ constructor: (@consumer) ->
+ @subscriptions = []
+ create: (channelName, mixin) ->
+ channel = channelName
+ params = if typeof channel is "object" then channel else {channel}
+ new ActionCable.Subscription this, params, mixin
+ # Private
+ add: (subscription) ->
+ @subscriptions.push(subscription)
+ @notify(subscription, "initialized")
+ @sendCommand(subscription, "subscribe")
+ remove: (subscription) ->
+ @forget(subscription)
+ unless @findAll(subscription.identifier).length
+ @sendCommand(subscription, "unsubscribe")
+ reject: (identifier) ->
+ for subscription in @findAll(identifier)
+ @forget(subscription)
+ @notify(subscription, "rejected")
+ forget: (subscription) ->
+ @subscriptions = (s for s in @subscriptions when s isnt subscription)
+ findAll: (identifier) ->
+ s for s in @subscriptions when s.identifier is identifier
+ reload: ->
+ for subscription in @subscriptions
+ @sendCommand(subscription, "subscribe")
+ notifyAll: (callbackName, args...) ->
+ for subscription in @subscriptions
+ @notify(subscription, callbackName, args...)
+ notify: (subscription, callbackName, args...) ->
+ if typeof subscription is "string"
+ subscriptions = @findAll(subscription)
+ else
+ subscriptions = [subscription]
+ for subscription in subscriptions
+ subscription[callbackName]?(args...)
+ sendCommand: (subscription, command) ->
+ {identifier} = subscription
+ if identifier is ActionCable.INTERNAL.identifiers.ping
+ @consumer.connection.isOpen()
+ else
+ @consumer.send({command, identifier})
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb
new file mode 100644
index 0000000000..1dc66ef3ad
--- /dev/null
+++ b/actioncable/lib/action_cable.rb
@@ -0,0 +1,51 @@
+# Copyright (c) 2015-2016 Basecamp, LLC
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+require 'active_support'
+require 'active_support/rails'
+require 'action_cable/version'
+module ActionCable
+ extend ActiveSupport::Autoload
+ identifiers: {
+ ping: '_ping'.freeze
+ },
+ message_types: {
+ confirmation: 'confirm_subscription'.freeze,
+ rejection: 'reject_subscription'.freeze
+ }
+ }
+ # Singleton instance of the server
+ module_function def server
+ @server ||= ActionCable::Server::Base.new
+ end
+ autoload :Server
+ autoload :Connection
+ autoload :Channel
+ autoload :RemoteConnections
+ autoload :SubscriptionAdapter
diff --git a/actioncable/lib/action_cable/channel.rb b/actioncable/lib/action_cable/channel.rb
new file mode 100644
index 0000000000..7ae262ce5f
--- /dev/null
+++ b/actioncable/lib/action_cable/channel.rb
@@ -0,0 +1,14 @@
+module ActionCable
+ module Channel
+ extend ActiveSupport::Autoload
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Callbacks
+ autoload :Naming
+ autoload :PeriodicTimers
+ autoload :Streams
+ end
+ end
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
new file mode 100644
index 0000000000..874ebe2e71
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -0,0 +1,286 @@
+require 'set'
+module ActionCable
+ module Channel
+ # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
+ # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
+ # responding to the subscriber's direct requests.
+ #
+ # Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then
+ # lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care
+ # not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released
+ # as is normally the case with a controller instance that gets thrown away after every request.
+ #
+ # Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user
+ # record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it.
+ #
+ # The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests
+ # can interact with. Here's a quick example:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # end
+ #
+ # def speak(data)
+ # @room.speak data, user: current_user
+ # end
+ # end
+ #
+ # The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that
+ # subscriber wants to say something in the room.
+ #
+ # == Action processing
+ #
+ # Unlike subclasses of ActionController::Base, channels do not follow a REST
+ # constraint form for their actions. Instead, ActionCable operates through a
+ # remote-procedure call model. You can declare any public method on the
+ # channel (optionally taking a <tt>data</tt> argument), and this method is
+ # automatically exposed as callable to the client.
+ #
+ # Example:
+ #
+ # class AppearanceChannel < ApplicationCable::Channel
+ # def subscribed
+ # @connection_token = generate_connection_token
+ # end
+ #
+ # def unsubscribed
+ # current_user.disappear @connection_token
+ # end
+ #
+ # def appear(data)
+ # current_user.appear @connection_token, on: data['appearing_on']
+ # end
+ #
+ # def away
+ # current_user.away @connection_token
+ # end
+ #
+ # private
+ # def generate_connection_token
+ # SecureRandom.hex(36)
+ # end
+ # end
+ #
+ # In this example, subscribed/unsubscribed are not callable methods, as they
+ # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
+ # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
+ # callable as it's a private method. You'll see that appear accepts a data
+ # parameter, which it then uses as part of its model call. <tt>#away</tt>
+ # does not, since it's simply a trigger action.
+ #
+ # Also note that in this example, <tt>current_user</tt> is available because
+ # it was marked as an identifying attribute on the connection. All such
+ # identifiers will automatically create a delegation method of the same name
+ # on the channel instance.
+ #
+ # == Rejecting subscription requests
+ #
+ # A channel can reject a subscription request in the #subscribed callback by
+ # invoking the #reject method:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # reject unless current_user.can_access?(@room)
+ # end
+ # end
+ #
+ # In this example, the subscription will be rejected if the
+ # <tt>current_user</tt> does not have access to the chat room. On the
+ # client-side, the <tt>Channel#rejected</tt> callback will get invoked when
+ # the server rejects the subscription request.
+ class Base
+ include Callbacks
+ include PeriodicTimers
+ include Streams
+ include Naming
+ include Broadcasting
+ attr_reader :params, :connection, :identifier
+ delegate :logger, to: :connection
+ class << self
+ # A list of method names that should be considered actions. This
+ # includes all public instance methods on a channel, less
+ # any internal methods (defined on Base), adding back in
+ # any methods that are internal, but still exist on the class
+ # itself.
+ #
+ # ==== Returns
+ # * <tt>Set</tt> - A set of all methods that should be considered actions.
+ def action_methods
+ @action_methods ||= begin
+ # All public instance methods of this class, including ancestors
+ methods = (public_instance_methods(true) -
+ # Except for public instance methods of Base and its ancestors
+ ActionCable::Channel::Base.public_instance_methods(true) +
+ # Be sure to include shadowed public instance methods of this class
+ public_instance_methods(false)).uniq.map(&:to_s)
+ methods.to_set
+ end
+ end
+ protected
+ # action_methods are cached and there is sometimes need to refresh
+ # them. ::clear_action_methods! allows you to do that, so next time
+ # you run action_methods, they will be recalculated
+ def clear_action_methods!
+ @action_methods = nil
+ end
+ # Refresh the cached action_methods when a new action_method is added.
+ def method_added(name)
+ super
+ clear_action_methods!
+ end
+ end
+ def initialize(connection, identifier, params = {})
+ @connection = connection
+ @identifier = identifier
+ @params = params
+ # When a channel is streaming via pubsub, we want to delay the confirmation
+ # transmission until pubsub subscription is confirmed.
+ @defer_subscription_confirmation = false
+ @reject_subscription = nil
+ @subscription_confirmation_sent = nil
+ delegate_connection_identifiers
+ subscribe_to_channel
+ end
+ # Extract the action name from the passed data and process it via the channel. The process will ensure
+ # that the action requested is a public method on the channel declared by the user (so not one of the callbacks
+ # like #subscribed).
+ def perform_action(data)
+ action = extract_action(data)
+ if processable_action?(action)
+ dispatch_action(action, data)
+ else
+ logger.error "Unable to process #{action_signature(action, data)}"
+ end
+ end
+ # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
+ # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
+ def unsubscribe_from_channel
+ run_callbacks :unsubscribe do
+ unsubscribed
+ end
+ end
+ protected
+ # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams
+ # you want this channel to be sending to the subscriber.
+ def subscribed
+ # Override in subclasses
+ end
+ # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
+ # people as offline or the like.
+ def unsubscribed
+ # Override in subclasses
+ end
+ # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
+ # the proper channel identifier marked as the recipient.
+ def transmit(data, via: nil)
+ logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+ end
+ def defer_subscription_confirmation!
+ @defer_subscription_confirmation = true
+ end
+ def defer_subscription_confirmation?
+ @defer_subscription_confirmation
+ end
+ def subscription_confirmation_sent?
+ @subscription_confirmation_sent
+ end
+ def reject
+ @reject_subscription = true
+ end
+ def subscription_rejected?
+ @reject_subscription
+ end
+ private
+ def delegate_connection_identifiers
+ connection.identifiers.each do |identifier|
+ define_singleton_method(identifier) do
+ connection.send(identifier)
+ end
+ end
+ end
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+ if subscription_rejected?
+ reject_subscription
+ else
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+ end
+ def extract_action(data)
+ (data['action'].presence || :receive).to_sym
+ end
+ def processable_action?(action)
+ self.class.action_methods.include?(action.to_s)
+ end
+ def dispatch_action(action, data)
+ logger.info action_signature(action, data)
+ if method(action).arity == 1
+ public_send action, data
+ else
+ public_send action
+ end
+ end
+ def action_signature(action, data)
+ "#{self.class.name}##{action}".tap do |signature|
+ if (arguments = data.except('action')).any?
+ signature << "(#{arguments.inspect})"
+ end
+ end
+ end
+ def transmit_subscription_confirmation
+ unless subscription_confirmation_sent?
+ logger.info "#{self.class.name} is transmitting the subscription confirmation"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
+ @subscription_confirmation_sent = true
+ end
+ end
+ def reject_subscription
+ connection.subscriptions.remove_subscription self
+ transmit_subscription_rejection
+ end
+ def transmit_subscription_rejection
+ logger.info "#{self.class.name} is transmitting the subscription rejection"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb
new file mode 100644
index 0000000000..afc23d7d1a
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/broadcasting.rb
@@ -0,0 +1,29 @@
+require 'active_support/core_ext/object/to_param'
+module ActionCable
+ module Channel
+ module Broadcasting
+ extend ActiveSupport::Concern
+ delegate :broadcasting_for, to: :class
+ class_methods do
+ # Broadcast a hash to a unique broadcasting for this <tt>model</tt> in this channel.
+ def broadcast_to(model, message)
+ ActionCable.server.broadcast(broadcasting_for([ channel_name, model ]), message)
+ end
+ def broadcasting_for(model) #:nodoc:
+ case
+ when model.is_a?(Array)
+ model.map { |m| broadcasting_for(m) }.join(':')
+ when model.respond_to?(:to_gid_param)
+ model.to_gid_param
+ else
+ model.to_param
+ end
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb
new file mode 100644
index 0000000000..295d750e86
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/callbacks.rb
@@ -0,0 +1,35 @@
+require 'active_support/callbacks'
+module ActionCable
+ module Channel
+ module Callbacks
+ extend ActiveSupport::Concern
+ include ActiveSupport::Callbacks
+ included do
+ define_callbacks :subscribe
+ define_callbacks :unsubscribe
+ end
+ class_methods do
+ def before_subscribe(*methods, &block)
+ set_callback(:subscribe, :before, *methods, &block)
+ end
+ def after_subscribe(*methods, &block)
+ set_callback(:subscribe, :after, *methods, &block)
+ end
+ alias_method :on_subscribe, :after_subscribe
+ def before_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :before, *methods, &block)
+ end
+ def after_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :after, *methods, &block)
+ end
+ alias_method :on_unsubscribe, :after_unsubscribe
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb
new file mode 100644
index 0000000000..4c9d53b15a
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/naming.rb
@@ -0,0 +1,22 @@
+module ActionCable
+ module Channel
+ module Naming
+ extend ActiveSupport::Concern
+ class_methods do
+ # Returns the name of the channel, underscored, without the <tt>Channel</tt> ending.
+ # If the channel is in a namespace, then the namespaces are represented by single
+ # colon separators in the channel name.
+ #
+ # ChatChannel.channel_name # => 'chat'
+ # Chats::AppearancesChannel.channel_name # => 'chats:appearances'
+ def channel_name
+ @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore
+ end
+ end
+ # Delegates to the class' <tt>channel_name</tt>
+ delegate :channel_name, to: :class
+ end
+ end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
new file mode 100644
index 0000000000..56597d02d7
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -0,0 +1,41 @@
+module ActionCable
+ module Channel
+ module PeriodicTimers
+ extend ActiveSupport::Concern
+ included do
+ class_attribute :periodic_timers, instance_reader: false
+ self.periodic_timers = []
+ after_subscribe :start_periodic_timers
+ after_unsubscribe :stop_periodic_timers
+ end
+ module ClassMethods
+ # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
+ # for sending a steady flow of updates to a client based off an object that was configured on subscription.
+ # It's an alternative to using streams if the channel is able to do the work internally.
+ def periodically(callback, every:)
+ self.periodic_timers += [ [ callback, every: every ] ]
+ end
+ end
+ private
+ def active_periodic_timers
+ @active_periodic_timers ||= []
+ end
+ def start_periodic_timers
+ self.class.periodic_timers.each do |callback, options|
+ active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
+ connection.worker_pool.async_run_periodic_timer(self, callback)
+ end
+ end
+ end
+ def stop_periodic_timers
+ active_periodic_timers.each { |timer| timer.shutdown }
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
new file mode 100644
index 0000000000..3158f30814
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -0,0 +1,116 @@
+module ActionCable
+ module Channel
+ # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data
+ # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
+ # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later.
+ #
+ # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
+ # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
+ # comments on a given page:
+ #
+ # class CommentsChannel < ApplicationCable::Channel
+ # def follow(data)
+ # stream_from "comments_for_#{data['recording_id']}"
+ # end
+ #
+ # def unfollow
+ # stop_all_streams
+ # end
+ # end
+ #
+ # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there.
+ # That looks like so from that side of things:
+ #
+ # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
+ #
+ # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
+ # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`
+ #
+ # class CommentsChannel < ApplicationCable::Channel
+ # def subscribed
+ # post = Post.find(params[:id])
+ # stream_for post
+ # end
+ # end
+ #
+ # You can then broadcast to this channel using:
+ #
+ # CommentsChannel.broadcast_to(@post, @comment)
+ #
+ # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out.
+ # Example below shows how you can use this to provide performance introspection in the process:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ #
+ # stream_for @room, -> (encoded_message) do
+ # message = ActiveSupport::JSON.decode(encoded_message)
+ #
+ # if message['originated_at'].present?
+ # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
+ #
+ # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
+ # logger.info "Message took #{elapsed_time}s to arrive"
+ # end
+ #
+ # transmit message
+ # end
+ # end
+ # end
+ #
+ # You can stop streaming from all broadcasts by calling #stop_all_streams.
+ module Streams
+ extend ActiveSupport::Concern
+ included do
+ on_unsubscribe :stop_all_streams
+ end
+ # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
+ # instead of the default of just transmitting the updates straight to the subscriber.
+ def stream_from(broadcasting, callback = nil)
+ # Hold off the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+ callback ||= default_stream_callback(broadcasting)
+ streams << [ broadcasting, callback ]
+ Concurrent.global_io_executor.post do
+ pubsub.subscribe(broadcasting, callback, lambda do
+ transmit_subscription_confirmation
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end)
+ end
+ end
+ # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
+ # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
+ # to the subscriber.
+ def stream_for(model, callback = nil)
+ stream_from(broadcasting_for([ channel_name, model ]), callback)
+ end
+ # Unsubscribes all streams associated with this channel from the pubsub queue.
+ def stop_all_streams
+ streams.each do |broadcasting, callback|
+ pubsub.unsubscribe broadcasting, callback
+ logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
+ end.clear
+ end
+ private
+ delegate :pubsub, to: :connection
+ def streams
+ @_streams ||= []
+ end
+ def default_stream_callback(broadcasting)
+ -> (message) do
+ transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
new file mode 100644
index 0000000000..902efb07e2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection.rb
@@ -0,0 +1,19 @@
+module ActionCable
+ module Connection
+ extend ActiveSupport::Autoload
+ eager_autoload do
+ autoload :Authorization
+ autoload :Base
+ autoload :ClientSocket
+ autoload :Identification
+ autoload :InternalChannel
+ autoload :MessageBuffer
+ autoload :Stream
+ autoload :StreamEventLoop
+ autoload :Subscriptions
+ autoload :TaggedLoggerProxy
+ autoload :WebSocket
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb
new file mode 100644
index 0000000000..070a70e4e2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/authorization.rb
@@ -0,0 +1,13 @@
+module ActionCable
+ module Connection
+ module Authorization
+ class UnauthorizedError < StandardError; end
+ private
+ def reject_unauthorized_connection
+ logger.error "An unauthorized connection attempt was rejected"
+ raise UnauthorizedError
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
new file mode 100644
index 0000000000..b5f898436a
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -0,0 +1,222 @@
+require 'action_dispatch'
+module ActionCable
+ module Connection
+ # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
+ # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # authentication and authorization.
+ #
+ # Here's a basic example:
+ #
+ # module ApplicationCable
+ # class Connection < ActionCable::Connection::Base
+ # identified_by :current_user
+ #
+ # def connect
+ # self.current_user = find_verified_user
+ # logger.add_tags current_user.name
+ # end
+ #
+ # def disconnect
+ # # Any cleanup work needed when the cable connection is cut.
+ # end
+ #
+ # protected
+ # def find_verified_user
+ # if current_user = User.find_by_identity cookies.signed[:identity_id]
+ # current_user
+ # else
+ # reject_unauthorized_connection
+ # end
+ # end
+ # end
+ # end
+ #
+ # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
+ # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
+ # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
+ #
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
+ #
+ # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ #
+ # Pretty simple, eh?
+ class Base
+ include Identification
+ include InternalChannel
+ include Authorization
+ attr_reader :server, :env, :subscriptions, :logger
+ delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ def initialize(server, env)
+ @server, @env = server, env
+ @logger = new_tagged_logger
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @subscriptions = ActionCable::Connection::Subscriptions.new(self)
+ @message_buffer = ActionCable::Connection::MessageBuffer.new(self)
+ @_internal_subscriptions = nil
+ @started_at = Time.now
+ end
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
+ def process
+ logger.info started_request_message
+ if websocket.possible? && allow_request_origin?
+ respond_to_successful_request
+ else
+ respond_to_invalid_request
+ end
+ end
+ # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
+ # The data is routed to the proper channel that the connection has subscribed to.
+ def receive(data_in_json)
+ if websocket.alive?
+ subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
+ else
+ logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
+ end
+ end
+ # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
+ # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
+ def transmit(data)
+ websocket.transmit data
+ end
+ # Close the WebSocket connection.
+ def close
+ websocket.close
+ end
+ # Invoke a method on the connection asynchronously through the pool of thread workers.
+ def send_async(method, *arguments)
+ worker_pool.async_invoke(self, method, *arguments)
+ end
+ # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
+ # This can be returned by a health check against the connection.
+ def statistics
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+ def beat
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
+ end
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+ def on_error(message) # :nodoc:
+ # ignore
+ end
+ def on_close(reason, code) # :nodoc:
+ send_async :handle_close
+ end
+ protected
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
+ def request
+ @request ||= begin
+ environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
+ ActionDispatch::Request.new(environment || env)
+ end
+ end
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
+ def cookies
+ request.cookie_jar
+ end
+ attr_reader :websocket
+ attr_reader :message_buffer
+ private
+ def handle_open
+ connect if respond_to?(:connect)
+ subscribe_to_internal_channel
+ beat
+ message_buffer.process!
+ server.add_connection(self)
+ rescue ActionCable::Connection::Authorization::UnauthorizedError
+ respond_to_invalid_request
+ end
+ def handle_close
+ logger.info finished_request_message
+ server.remove_connection(self)
+ subscriptions.unsubscribe_from_all
+ unsubscribe_from_internal_channel
+ disconnect if respond_to?(:disconnect)
+ end
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+ if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] }
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+ def respond_to_successful_request
+ websocket.rack_response
+ end
+ def respond_to_invalid_request
+ close if websocket.alive?
+ logger.info finished_request_message
+ [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
+ end
+ # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
+ def new_tagged_logger
+ TaggedLoggerProxy.new server.logger,
+ tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
+ end
+ def started_request_message
+ 'Started %s "%s"%s for %s at %s' % [
+ request.request_method,
+ request.filtered_path,
+ websocket.possible? ? ' [WebSocket]' : '',
+ request.ip,
+ Time.now.to_s ]
+ end
+ def finished_request_message
+ 'Finished "%s"%s for %s at %s' % [
+ request.filtered_path,
+ websocket.possible? ? ' [WebSocket]' : '',
+ request.ip,
+ Time.now.to_s ]
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
new file mode 100644
index 0000000000..ef937d7c16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -0,0 +1,150 @@
+require 'websocket/driver'
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class ClientSocket # :nodoc:
+ def self.determine_url(env)
+ scheme = secure_request?(env) ? 'wss:' : 'ws:'
+ "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
+ end
+ def self.secure_request?(env)
+ return true if env['HTTPS'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
+ return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
+ return true if env['rack.url_scheme'] == 'https'
+ return false
+ end
+ OPEN = 1
+ CLOSED = 3
+ attr_reader :env, :url
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+ @stream_event_loop = stream_event_loop
+ @url = ClientSocket.determine_url(@env)
+ @driver = @driver_started = nil
+ @close_params = ['', 1006]
+ @ready_state = CONNECTING
+ # The driver calls +env+, +url+, and +write+
+ @driver = ::WebSocket::Driver.rack(self)
+ @driver.on(:open) { |e| open }
+ @driver.on(:message) { |e| receive_message(e.data) }
+ @driver.on(:close) { |e| begin_close(e.reason, e.code) }
+ @driver.on(:error) { |e| emit_error(e.message) }
+ @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+ if callback = @env['async.callback']
+ callback.call([101, {}, @stream])
+ end
+ end
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @driver_started = true
+ @driver.start
+ end
+ def rack_response
+ start_driver
+ [ -1, {}, [] ]
+ end
+ def write(data)
+ @stream.write(data)
+ end
+ def transmit(message)
+ return false if @ready_state > OPEN
+ case message
+ when Numeric then @driver.text(message.to_s)
+ when String then @driver.text(message)
+ when Array then @driver.binary(message)
+ else false
+ end
+ end
+ def close(code = nil, reason = nil)
+ code ||= 1000
+ reason ||= ''
+ unless code == 1000 or (code >= 3000 and code <= 4999)
+ raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
+ "The code must be either 1000, or between 3000 and 4999. " +
+ "#{code} is neither."
+ end
+ @ready_state = CLOSING unless @ready_state == CLOSED
+ @driver.close(reason, code)
+ end
+ def parse(data)
+ @driver.parse(data)
+ end
+ def client_gone
+ finalize_close
+ end
+ def alive?
+ @ready_state == OPEN
+ end
+ private
+ def open
+ return unless @ready_state == CONNECTING
+ @ready_state = OPEN
+ @event_target.on_open
+ end
+ def receive_message(data)
+ return unless @ready_state == OPEN
+ @event_target.on_message(data)
+ end
+ def emit_error(message)
+ return if @ready_state >= CLOSING
+ @event_target.on_error(message)
+ end
+ def begin_close(reason, code)
+ return if @ready_state == CLOSED
+ @ready_state = CLOSING
+ @close_params = [reason, code]
+ if @stream
+ @stream.shutdown
+ else
+ finalize_close
+ end
+ end
+ def finalize_close
+ return if @ready_state == CLOSED
+ @ready_state = CLOSED
+ @event_target.on_close(*@close_params)
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
new file mode 100644
index 0000000000..885ff3f102
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -0,0 +1,46 @@
+require 'set'
+module ActionCable
+ module Connection
+ module Identification
+ extend ActiveSupport::Concern
+ included do
+ class_attribute :identifiers
+ self.identifiers = Set.new
+ end
+ class_methods do
+ # Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
+ # Common identifiers are current_user and current_account, but could be anything really.
+ #
+ # Note that anything marked as an identifier will automatically create a delegate by the same name on any
+ # channel instances created off the connection.
+ def identified_by(*identifiers)
+ Array(identifiers).each { |identifier| attr_accessor identifier }
+ self.identifiers += identifiers
+ end
+ end
+ # Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
+ def connection_identifier
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+ @connection_identifier
+ end
+ private
+ def connection_gid(ids)
+ ids.map do |o|
+ if o.respond_to? :to_gid_param
+ o.to_gid_param
+ else
+ o.to_s
+ end
+ end.sort.join(":")
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
new file mode 100644
index 0000000000..27826792b3
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -0,0 +1,45 @@
+module ActionCable
+ module Connection
+ # Makes it possible for the RemoteConnection to disconnect a specific connection.
+ module InternalChannel
+ extend ActiveSupport::Concern
+ private
+ def internal_channel
+ "action_cable/#{connection_identifier}"
+ end
+ def subscribe_to_internal_channel
+ if connection_identifier.present?
+ callback = -> (message) { process_internal_message(message) }
+ @_internal_subscriptions ||= []
+ @_internal_subscriptions << [ internal_channel, callback ]
+ Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
+ logger.info "Registered connection (#{connection_identifier})"
+ end
+ end
+ def unsubscribe_from_internal_channel
+ if @_internal_subscriptions.present?
+ @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
+ end
+ end
+ def process_internal_message(message)
+ message = ActiveSupport::JSON.decode(message)
+ case message['type']
+ when 'disconnect'
+ logger.info "Removing connection (#{connection_identifier})"
+ websocket.close
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+ close
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
new file mode 100644
index 0000000000..2f65a1e84a
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -0,0 +1,54 @@
+module ActionCable
+ module Connection
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
+ # Entirely internal operation and should not be used directly by the user.
+ class MessageBuffer
+ def initialize(connection)
+ @connection = connection
+ @buffered_messages = []
+ end
+ def append(message)
+ if valid? message
+ if processing?
+ receive message
+ else
+ buffer message
+ end
+ else
+ connection.logger.error "Couldn't handle non-string message: #{message.class}"
+ end
+ end
+ def processing?
+ @processing
+ end
+ def process!
+ @processing = true
+ receive_buffered_messages
+ end
+ protected
+ attr_reader :connection
+ attr_accessor :buffered_messages
+ private
+ def valid?(message)
+ message.is_a?(String)
+ end
+ def receive(message)
+ connection.send_async :receive, message
+ end
+ def buffer(message)
+ buffered_messages << message
+ end
+ def receive_buffered_messages
+ receive buffered_messages.shift until buffered_messages.empty?
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
new file mode 100644
index 0000000000..ace250cd16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -0,0 +1,59 @@
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class Stream
+ def initialize(event_loop, socket)
+ @event_loop = event_loop
+ @socket_object = socket
+ @stream_send = socket.env['stream.send']
+ @rack_hijack_io = nil
+ hijack_rack_socket
+ end
+ def each(&callback)
+ @stream_send ||= callback
+ end
+ def close
+ shutdown
+ @socket_object.client_gone
+ end
+ def shutdown
+ clean_rack_hijack
+ end
+ def write(data)
+ return @rack_hijack_io.write(data) if @rack_hijack_io
+ return @stream_send.call(data) if @stream_send
+ rescue EOFError
+ @socket_object.client_gone
+ end
+ def receive(data)
+ @socket_object.parse(data)
+ end
+ private
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+ def clean_rack_hijack
+ return unless @rack_hijack_io
+ @event_loop.detach(@rack_hijack_io, self)
+ @rack_hijack_io = nil
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..e6335082d2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,94 @@
+require 'nio'
+require 'thread'
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = @thread = nil
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+ @spawn_mutex = Mutex.new
+ spawn
+ end
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = stream
+ @nio.register(io, :r)
+ end
+ wakeup
+ end
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister io
+ @map.delete io
+ end
+ wakeup
+ end
+ def stop
+ @stopping = true
+ wakeup if @nio
+ end
+ private
+ def spawn
+ return if @thread && @thread.status
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
+ return true
+ end
+ end
+ def wakeup
+ spawn || @nio.wakeup
+ end
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+ next unless monitors = @nio.select
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = @map[io]
+ begin
+ stream.receive io.read_nonblock(4096)
+ rescue IO::WaitReadable
+ next
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
+ end
+ end
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
new file mode 100644
index 0000000000..d7f95e6a62
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -0,0 +1,75 @@
+require 'active_support/core_ext/hash/indifferent_access'
+module ActionCable
+ module Connection
+ # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
+ # the connection to the proper channel. Should not be used directly by the user.
+ class Subscriptions
+ def initialize(connection)
+ @connection = connection
+ @subscriptions = {}
+ end
+ def execute_command(data)
+ case data['command']
+ when 'subscribe' then add data
+ when 'unsubscribe' then remove data
+ when 'message' then perform_action data
+ else
+ logger.error "Received unrecognized command in #{data.inspect}"
+ end
+ rescue Exception => e
+ logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
+ end
+ def add(data)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+ subscription_klass = connection.server.channel_classes[id_options[:channel]]
+ if subscription_klass
+ subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ else
+ logger.error "Subscription class not found (#{data.inspect})"
+ end
+ end
+ def remove(data)
+ logger.info "Unsubscribing from channel: #{data['identifier']}"
+ remove_subscription subscriptions[data['identifier']]
+ end
+ def remove_subscription(subscription)
+ subscription.unsubscribe_from_channel
+ subscriptions.delete(subscription.identifier)
+ end
+ def perform_action(data)
+ find(data).perform_action ActiveSupport::JSON.decode(data['data'])
+ end
+ def identifiers
+ subscriptions.keys
+ end
+ def unsubscribe_from_all
+ subscriptions.each { |id, channel| channel.unsubscribe_from_channel }
+ end
+ protected
+ attr_reader :connection, :subscriptions
+ private
+ delegate :logger, to: :connection
+ def find(data)
+ if subscription = subscriptions[data['identifier']]
+ subscription
+ else
+ raise "Unable to find subscription with identifier: #{data['identifier']}"
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
new file mode 100644
index 0000000000..41afa9680a
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -0,0 +1,40 @@
+module ActionCable
+ module Connection
+ # Allows the use of per-connection tags against the server logger. This wouldn't work using the traditional
+ # <tt>ActiveSupport::TaggedLogging</tt> enhanced Rails.logger, as that logger will reset the tags between requests.
+ # The connection is long-lived, so it needs its own set of tags for its independent duration.
+ class TaggedLoggerProxy
+ attr_reader :tags
+ def initialize(logger, tags:)
+ @logger = logger
+ @tags = tags.flatten
+ end
+ def add_tags(*tags)
+ @tags += tags.flatten
+ @tags = @tags.uniq
+ end
+ def tag(logger)
+ if logger.respond_to?(:tagged)
+ current_tags = tags - logger.formatter.current_tags
+ logger.tagged(*current_tags) { yield }
+ else
+ yield
+ end
+ end
+ %i( debug info warn error fatal unknown ).each do |severity|
+ define_method(severity) do |message|
+ log severity, message
+ end
+ end
+ protected
+ def log(type, message)
+ tag(@logger) { @logger.send type, message }
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
new file mode 100644
index 0000000000..5e89fb9b72
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -0,0 +1,35 @@
+require 'websocket/driver'
+module ActionCable
+ module Connection
+ # Wrap the real socket to minimize the externally-presented API
+ class WebSocket
+ def initialize(env, event_target, stream_event_loop)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
+ end
+ def possible?
+ websocket
+ end
+ def alive?
+ websocket && websocket.alive?
+ end
+ def transmit(data)
+ websocket.transmit data
+ end
+ def close
+ websocket.close
+ end
+ def rack_response
+ websocket.rack_response
+ end
+ protected
+ attr_reader :websocket
+ end
+ end
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
new file mode 100644
index 0000000000..f5e233e091
--- /dev/null
+++ b/actioncable/lib/action_cable/engine.rb
@@ -0,0 +1,38 @@
+require "rails"
+require "action_cable"
+require "action_cable/helpers/action_cable_helper"
+require "active_support/core_ext/hash/indifferent_access"
+module ActionCable
+ class Railtie < Rails::Engine # :nodoc:
+ config.action_cable = ActiveSupport::OrderedOptions.new
+ config.action_cable.url = '/cable'
+ config.eager_load_namespaces << ActionCable
+ initializer "action_cable.helpers" do
+ ActiveSupport.on_load(:action_view) do
+ include ActionCable::Helpers::ActionCableHelper
+ end
+ end
+ initializer "action_cable.logger" do
+ ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger }
+ end
+ initializer "action_cable.set_configs" do |app|
+ options = app.config.action_cable
+ options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
+ app.paths.add "config/cable", with: "config/cable.yml"
+ ActiveSupport.on_load(:action_cable) do
+ if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
+ self.cable = Rails.application.config_for(config_path).with_indifferent_access
+ end
+ options.each { |k,v| send("#{k}=", v) }
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
new file mode 100644
index 0000000000..a71603e61a
--- /dev/null
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -0,0 +1,15 @@
+module ActionCable
+ # Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt>.
+ def self.gem_version
+ Gem::Version.new VERSION::STRING
+ end
+ module VERSION
+ MAJOR = 5
+ MINOR = 0
+ TINY = 0
+ PRE = "beta2"
+ STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
+ end
diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
new file mode 100644
index 0000000000..b82751468a
--- /dev/null
+++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
@@ -0,0 +1,29 @@
+module ActionCable
+ module Helpers
+ module ActionCableHelper
+ # Returns an "action-cable-url" meta tag with the value of the url specified in your
+ # configuration. Ensure this is above your javascript tag:
+ #
+ # <head>
+ # <%= action_cable_meta_tag %>
+ # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %>
+ # </head>
+ #
+ # This is then used by ActionCable to determine the url of your websocket server.
+ # Your CoffeeScript can then connect to the server without needing to specify the
+ # url directly:
+ #
+ # #= require cable
+ # @App = {}
+ # App.cable = Cable.createConsumer()
+ #
+ # Make sure to specify the correct server location in each of your environments
+ # config file:
+ #
+ # config.action_cable.url = "ws://example.com:28080"
+ def action_cable_meta_tag
+ tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
new file mode 100644
index 0000000000..7ec121308a
--- /dev/null
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -0,0 +1,66 @@
+module ActionCable
+ # If you need to disconnect a given connection, you can go through the
+ # RemoteConnections. You can find the connections you're looking for by
+ # searching for the identifier declared on the connection. For example:
+ #
+ # module ApplicationCable
+ # class Connection < ActionCable::Connection::Base
+ # identified_by :current_user
+ # ....
+ # end
+ # end
+ #
+ # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
+ #
+ # This will disconnect all the connections established for
+ # <tt>User.find(1)</tt> across all servers running on all machines, because
+ # it uses the internal channel that all these servers are subscribed to.
+ class RemoteConnections
+ attr_reader :server
+ def initialize(server)
+ @server = server
+ end
+ def where(identifier)
+ RemoteConnection.new(server, identifier)
+ end
+ private
+ # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>.
+ # Exists for the solely for the purpose of calling #disconnect on that connection.
+ class RemoteConnection
+ class InvalidIdentifiersError < StandardError; end
+ include Connection::Identification, Connection::InternalChannel
+ def initialize(server, ids)
+ @server = server
+ set_identifier_instance_vars(ids)
+ end
+ # Uses the internal channel to disconnect the connection.
+ def disconnect
+ server.broadcast internal_channel, type: 'disconnect'
+ end
+ # Returns all the identifiers that were applied to this connection.
+ def identifiers
+ server.connection_identifiers
+ end
+ private
+ attr_reader :server
+ def set_identifier_instance_vars(ids)
+ raise InvalidIdentifiersError unless valid_identifiers?(ids)
+ ids.each { |k,v| instance_variable_set("@#{k}", v) }
+ end
+ def valid_identifiers?(ids)
+ keys = ids.keys
+ identifiers.all? { |id| keys.include?(id) }
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb
new file mode 100644
index 0000000000..bd6a3826a3
--- /dev/null
+++ b/actioncable/lib/action_cable/server.rb
@@ -0,0 +1,15 @@
+module ActionCable
+ module Server
+ extend ActiveSupport::Autoload
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Connections
+ autoload :Configuration
+ autoload :Worker
+ autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management'
+ end
+ end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
new file mode 100644
index 0000000000..fe48c112df
--- /dev/null
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -0,0 +1,74 @@
+require 'thread'
+module ActionCable
+ module Server
+ # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
+ # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers.
+ #
+ # Also, this is the server instance used for broadcasting. See Broadcasting for details.
+ class Base
+ include ActionCable::Server::Broadcasting
+ include ActionCable::Server::Connections
+ cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }
+ def self.logger; config.logger; end
+ delegate :logger, to: :config
+ attr_reader :mutex
+ def initialize
+ @mutex = Mutex.new
+ @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
+ end
+ # Called by rack to setup the server.
+ def call(env)
+ setup_heartbeat_timer
+ config.connection_class.new(self, env).process
+ end
+ # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
+ def disconnect(identifiers)
+ remote_connections.where(identifiers).disconnect
+ end
+ # Gateway to RemoteConnections. See that class for details.
+ def remote_connections
+ @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
+ end
+ def stream_event_loop
+ @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
+ end
+ # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
+ def worker_pool
+ @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
+ end
+ # Requires and returns a hash of all the channel class constants keyed by name.
+ def channel_classes
+ @channel_classes || @mutex.synchronize do
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
+ end
+ end
+ # Adapter used for all streams/broadcasting.
+ def pubsub
+ @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
+ end
+ # All the identifiers applied to the connection class associated with this server.
+ def connection_identifiers
+ config.connection_class.identifiers
+ end
+ end
+ ActiveSupport.run_load_hooks(:action_cable, Base.config)
+ end
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
new file mode 100644
index 0000000000..7e8aef45f4
--- /dev/null
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -0,0 +1,47 @@
+module ActionCable
+ module Server
+ # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
+ # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
+ #
+ # class WebNotificationsChannel < ApplicationCable::Channel
+ # def subscribed
+ # stream_from "web_notifications_#{current_user.id}"
+ # end
+ # end
+ #
+ # # Somewhere in your app this is called, perhaps from a NewCommentJob
+ # ActionCable.server.broadcast \
+ # "web_notifications_1", { title: "New things!", body: "All that's fit for print" }
+ #
+ # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications
+ # App.cable.subscriptions.create "WebNotificationsChannel",
+ # received: (data) ->
+ # new Notification data['title'], body: data['body']
+ module Broadcasting
+ # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded.
+ def broadcast(broadcasting, message)
+ broadcaster_for(broadcasting).broadcast(message)
+ end
+ # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that
+ # may need multiple spots to transmit to a specific broadcasting over and over.
+ def broadcaster_for(broadcasting)
+ Broadcaster.new(self, broadcasting)
+ end
+ private
+ class Broadcaster
+ attr_reader :server, :broadcasting
+ def initialize(server, broadcasting)
+ @server, @broadcasting = server, broadcasting
+ end
+ def broadcast(message)
+ server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
+ server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
new file mode 100644
index 0000000000..9a248933c4
--- /dev/null
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -0,0 +1,55 @@
+module ActionCable
+ module Server
+ # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points
+ # in a Rails config initializer.
+ class Configuration
+ attr_accessor :logger, :log_tags
+ attr_accessor :connection_class, :worker_pool_size
+ attr_accessor :channel_load_paths
+ attr_accessor :disable_request_forgery_protection, :allowed_request_origins
+ attr_accessor :cable, :url
+ def initialize
+ @log_tags = []
+ @connection_class = ApplicationCable::Connection
+ @worker_pool_size = 100
+ @channel_load_paths = [Rails.root.join('app/channels')]
+ @disable_request_forgery_protection = false
+ end
+ def channel_paths
+ @channel_paths ||= channel_load_paths.flat_map do |path|
+ Dir["#{path}/**/*_channel.rb"]
+ end
+ end
+ def channel_class_names
+ @channel_class_names ||= channel_paths.collect do |channel_path|
+ Pathname.new(channel_path).basename.to_s.split('.').first.camelize
+ end
+ end
+ # Returns constant of subscription adapter specified in config/cable.yml.
+ # If the adapter cannot be found, this will default to the Redis adapter.
+ # Also makes sure proper dependencies are required.
+ def pubsub_adapter
+ adapter = (cable.fetch('adapter') { 'redis' })
+ path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
+ begin
+ require path_to_adapter
+ rescue Gem::LoadError => e
+ raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)."
+ rescue LoadError => e
+ raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace
+ end
+ adapter = adapter.camelize
+ adapter = 'PostgreSQL' if adapter == 'Postgresql'
+ "ActionCable::SubscriptionAdapter::#{adapter}".constantize
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
new file mode 100644
index 0000000000..8671dd5ebd
--- /dev/null
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -0,0 +1,35 @@
+module ActionCable
+ module Server
+ # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so
+ # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
+ # As such, this is primarily for internal use.
+ module Connections
+ def connections
+ @connections ||= []
+ end
+ def add_connection(connection)
+ connections << connection
+ end
+ def remove_connection(connection)
+ connections.delete connection
+ end
+ # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
+ # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
+ # disconnect.
+ def setup_heartbeat_timer
+ @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
+ Concurrent.global_io_executor.post { connections.map(&:beat) }
+ end.tap(&:execute)
+ end
+ def open_connections_statistics
+ connections.map(&:statistics)
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
new file mode 100644
index 0000000000..3b6c6d44a1
--- /dev/null
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -0,0 +1,71 @@
+require 'active_support/callbacks'
+require 'active_support/core_ext/module/attribute_accessors_per_thread'
+require 'concurrent'
+module ActionCable
+ module Server
+ # Worker used by Server.send_async to do connection work in threads. Only for internal use.
+ class Worker
+ include ActiveSupport::Callbacks
+ thread_mattr_accessor :connection
+ define_callbacks :work
+ include ActiveRecordConnectionManagement
+ def initialize(max_size: 5)
+ @pool = Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: max_size,
+ max_queue: 0,
+ )
+ end
+ def async_invoke(receiver, method, *args)
+ @pool.post do
+ invoke(receiver, method, *args)
+ end
+ end
+ def invoke(receiver, method, *args)
+ begin
+ self.connection = receiver
+ run_callbacks :work do
+ receiver.send method, *args
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ ensure
+ self.connection = nil
+ end
+ end
+ def async_run_periodic_timer(channel, callback)
+ @pool.post do
+ run_periodic_timer(channel, callback)
+ end
+ end
+ def run_periodic_timer(channel, callback)
+ begin
+ self.connection = channel.connection
+ run_callbacks :work do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ end
+ ensure
+ self.connection = nil
+ end
+ end
+ private
+ def logger
+ ActionCable.server.logger
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
new file mode 100644
index 0000000000..ecece4e270
--- /dev/null
+++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -0,0 +1,22 @@
+module ActionCable
+ module Server
+ class Worker
+ # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
+ module ActiveRecordConnectionManagement
+ extend ActiveSupport::Concern
+ included do
+ if defined?(ActiveRecord::Base)
+ set_callback :work, :around, :with_database_connections
+ end
+ end
+ def with_database_connections
+ connection.logger.tag(ActiveRecord::Base.logger) { yield }
+ ensure
+ ActiveRecord::Base.clear_active_connections!
+ end
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
new file mode 100644
index 0000000000..72e62f3daf
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -0,0 +1,8 @@
+module ActionCable
+ module SubscriptionAdapter
+ extend ActiveSupport::Autoload
+ autoload :Base
+ autoload :SubscriberMap
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..cca6894289
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def new_subscriber_map
+ AsyncSubscriberMap.new
+ end
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
new file mode 100644
index 0000000000..796db5ffa3
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -0,0 +1,28 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Base
+ attr_reader :logger, :server
+ def initialize(server)
+ @server = server
+ @logger = @server.logger
+ end
+ def broadcast(channel, payload)
+ raise NotImplementedError
+ end
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+ def unsubscribe(channel, message_callback)
+ raise NotImplementedError
+ end
+ def shutdown
+ raise NotImplementedError
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
new file mode 100644
index 0000000000..d697548cbd
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -0,0 +1,67 @@
+require 'thread'
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+module ActionCable
+ module SubscriptionAdapter
+ class EventedRedis < Base # :nodoc:
+ @@mutex = Mutex.new
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+ private
+ def redis_connection_for_subscriptions
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ end
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..81357faead
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,35 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+ def shutdown
+ # nothing to do
+ end
+ private
+ def subscriber_map
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+ def new_subscriber_map
+ SubscriberMap.new
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
new file mode 100644
index 0000000000..abaeb92e54
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -0,0 +1,106 @@
+gem 'pg', '~> 0.18'
+require 'pg'
+require 'thread'
+module ActionCable
+ module SubscriptionAdapter
+ class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+ def broadcast(channel, payload)
+ with_connection do |pg_conn|
+ pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
+ end
+ end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
+ end
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+ def shutdown
+ listener.shutdown
+ end
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+ yield pg_conn
+ end
+ end
+ private
+ def listener
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ end
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+ @adapter = adapter
+ @queue = Queue.new
+ @thread = Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
+ def listen
+ @adapter.with_connection do |pg_conn|
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ Concurrent.global_io_executor << callback if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
+ end
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
+ end
+ end
+ end
+ end
+ end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
+ end
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
new file mode 100644
index 0000000000..7076383efe
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -0,0 +1,163 @@
+require 'thread'
+gem 'redis', '~> 3.0'
+require 'redis'
+module ActionCable
+ module SubscriptionAdapter
+ class Redis < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ @redis_connection_for_broadcasts = nil
+ end
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
+ end
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+ def shutdown
+ @listener.shutdown if @listener
+ end
+ def redis_connection_for_subscriptions
+ ::Redis.new(@server.config.cable)
+ end
+ private
+ def listener
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ end
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+ @adapter = adapter
+ @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
+ @subscription_lock = Mutex.new
+ @raw_client = nil
+ @when_connected = []
+ @thread = nil
+ end
+ def listen(conn)
+ conn.without_reconnect do
+ original_client = conn.client
+ conn.subscribe('_action_cable_internal') do |on|
+ on.subscribe do |chan, count|
+ @subscription_lock.synchronize do
+ if count == 1
+ @raw_client = original_client
+ until @when_connected.empty?
+ @when_connected.shift.call
+ end
+ end
+ if callbacks = @subscribe_callbacks[chan]
+ next_callback = callbacks.shift
+ Concurrent.global_io_executor << next_callback if next_callback
+ @subscribe_callbacks.delete(chan) if callbacks.empty?
+ end
+ end
+ end
+ on.message do |chan, message|
+ broadcast(chan, message)
+ end
+ on.unsubscribe do |chan, count|
+ if count == 0
+ @subscription_lock.synchronize do
+ @raw_client = nil
+ end
+ end
+ end
+ end
+ end
+ end
+ def shutdown
+ @subscription_lock.synchronize do
+ return if @thread.nil?
+ when_connected do
+ send_command('unsubscribe')
+ @raw_client = nil
+ end
+ end
+ Thread.pass while @thread.alive?
+ end
+ def add_channel(channel, on_success)
+ @subscription_lock.synchronize do
+ ensure_listener_running
+ @subscribe_callbacks[channel] << on_success
+ when_connected { send_command('subscribe', channel) }
+ end
+ end
+ def remove_channel(channel)
+ @subscription_lock.synchronize do
+ when_connected { send_command('unsubscribe', channel) }
+ end
+ end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ private
+ def ensure_listener_running
+ @thread ||= Thread.new do
+ Thread.current.abort_on_exception = true
+ conn = @adapter.redis_connection_for_subscriptions
+ listen conn
+ end
+ end
+ def when_connected(&block)
+ if @raw_client
+ block.call
+ else
+ @when_connected << block
+ end
+ end
+ def send_command(*command)
+ @raw_client.write(command)
+ very_raw_connection =
+ @raw_client.connection.instance_variable_defined?(:@connection) &&
+ @raw_client.connection.instance_variable_get(:@connection)
+ if very_raw_connection && very_raw_connection.respond_to?(:flush)
+ very_raw_connection.flush
+ end
+ end
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+ @subscribers[channel] << subscriber
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+ def remove_channel(channel)
+ end
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
diff --git a/actioncable/lib/action_cable/version.rb b/actioncable/lib/action_cable/version.rb
new file mode 100644
index 0000000000..e17877202b
--- /dev/null
+++ b/actioncable/lib/action_cable/version.rb
@@ -0,0 +1,8 @@
+require_relative 'gem_version'
+module ActionCable
+ # Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt>
+ def self.version
+ gem_version
+ end
diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE
new file mode 100644
index 0000000000..27a934c689
--- /dev/null
+++ b/actioncable/lib/rails/generators/channel/USAGE
@@ -0,0 +1,14 @@
+ Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript).
+ Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments.
+ Note: Turn on the cable connection in app/assets/javascript/cable.coffee after generating any channels.
+ rails generate channel Chat speak
+ creates a Chat channel class and CoffeeScript asset:
+ Channel: app/channels/chat_channel.rb
+ Assets: app/assets/javascript/channels/chat.coffee
diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb
new file mode 100644
index 0000000000..c5d398810a
--- /dev/null
+++ b/actioncable/lib/rails/generators/channel/channel_generator.rb
@@ -0,0 +1,26 @@
+module Rails
+ module Generators
+ class ChannelGenerator < NamedBase
+ source_root File.expand_path("../templates", __FILE__)
+ argument :actions, type: :array, default: [], banner: "method method"
+ class_option :assets, type: :boolean
+ check_class_collision suffix: "Channel"
+ def create_channel_file
+ template "channel.rb", File.join('app/channels', class_path, "#{file_name}_channel.rb")
+ if options[:assets]
+ template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee")
+ end
+ end
+ protected
+ def file_name
+ @_file_name ||= super.gsub(/\_channel/i, '')
+ end
+ end
+ end
diff --git a/actioncable/lib/rails/generators/channel/templates/assets/channel.coffee b/actioncable/lib/rails/generators/channel/templates/assets/channel.coffee
new file mode 100644
index 0000000000..5467811aba
--- /dev/null
+++ b/actioncable/lib/rails/generators/channel/templates/assets/channel.coffee
@@ -0,0 +1,14 @@
+App.<%= class_name.underscore %> = App.cable.subscriptions.create "<%= class_name %>Channel",
+ connected: ->
+ # Called when the subscription is ready for use on the server
+ disconnected: ->
+ # Called when the subscription has been terminated by the server
+ received: (data) ->
+ # Called when there's incoming data on the websocket for this channel
+<% actions.each do |action| -%>
+ <%= action %>: ->
+ @perform '<%= action %>'
+<% end -%>
diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb
new file mode 100644
index 0000000000..6cf04ee61f
--- /dev/null
+++ b/actioncable/lib/rails/generators/channel/templates/channel.rb
@@ -0,0 +1,17 @@
+# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading.
+<% module_namespacing do -%>
+class <%= class_name %>Channel < ApplicationCable::Channel
+ def subscribed
+ # stream_from "some_channel"
+ end
+ def unsubscribed
+ # Any cleanup needed when channel is unsubscribed
+ end
+<% actions.each do |action| -%>
+ def <%= action %>
+ end
+<% end -%>
+<% end -%>
diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb
new file mode 100644
index 0000000000..d41bf3064b
--- /dev/null
+++ b/actioncable/test/channel/base_test.rb
@@ -0,0 +1,184 @@
+require 'test_helper'
+require 'stubs/test_connection'
+require 'stubs/room'
+class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
+ class ActionCable::Channel::Base
+ def kick
+ @last_action = [ :kick ]
+ end
+ def topic
+ end
+ end
+ class BasicChannel < ActionCable::Channel::Base
+ def chatters
+ @last_action = [ :chatters ]
+ end
+ end
+ class ChatChannel < BasicChannel
+ attr_reader :room, :last_action
+ after_subscribe :toggle_subscribed
+ after_unsubscribe :toggle_subscribed
+ def initialize(*)
+ @subscribed = false
+ super
+ end
+ def subscribed
+ @room = Room.new params[:id]
+ @actions = []
+ end
+ def unsubscribed
+ @room = nil
+ end
+ def toggle_subscribed
+ @subscribed = !@subscribed
+ end
+ def leave
+ @last_action = [ :leave ]
+ end
+ def speak(data)
+ @last_action = [ :speak, data ]
+ end
+ def topic(data)
+ @last_action = [ :topic, data ]
+ end
+ def subscribed?
+ @subscribed
+ end
+ def get_latest
+ transmit data: 'latest'
+ end
+ def receive
+ @last_action = [ :receive ]
+ end
+ private
+ def rm_rf
+ @last_action = [ :rm_rf ]
+ end
+ end
+ setup do
+ @user = User.new "lifo"
+ @connection = TestConnection.new(@user)
+ @channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
+ end
+ test "should subscribe to a channel on initialize" do
+ assert_equal 1, @channel.room.id
+ end
+ test "on subscribe callbacks" do
+ assert @channel.subscribed
+ end
+ test "channel params" do
+ assert_equal({ id: 1 }, @channel.params)
+ end
+ test "unsubscribing from a channel" do
+ assert @channel.room
+ assert @channel.subscribed?
+ @channel.unsubscribe_from_channel
+ assert ! @channel.room
+ assert ! @channel.subscribed?
+ end
+ test "connection identifiers" do
+ assert_equal @user.name, @channel.current_user.name
+ end
+ test "callable action without any argument" do
+ @channel.perform_action 'action' => :leave
+ assert_equal [ :leave ], @channel.last_action
+ end
+ test "callable action with arguments" do
+ data = { 'action' => :speak, 'content' => "Hello World" }
+ @channel.perform_action data
+ assert_equal [ :speak, data ], @channel.last_action
+ end
+ test "should not dispatch a private method" do
+ @channel.perform_action 'action' => :rm_rf
+ assert_nil @channel.last_action
+ end
+ test "should not dispatch a public method defined on Base" do
+ @channel.perform_action 'action' => :kick
+ assert_nil @channel.last_action
+ end
+ test "should dispatch a public method defined on Base and redefined on channel" do
+ data = { 'action' => :topic, 'content' => "This is Sparta!" }
+ @channel.perform_action data
+ assert_equal [ :topic, data ], @channel.last_action
+ end
+ test "should dispatch calling a public method defined in an ancestor" do
+ @channel.perform_action 'action' => :chatters
+ assert_equal [ :chatters ], @channel.last_action
+ end
+ test "should dispatch receive action when perform_action is called with empty action" do
+ data = { 'content' => 'hello' }
+ @channel.perform_action data
+ assert_equal [ :receive ], @channel.last_action
+ end
+ test "transmitting data" do
+ @channel.perform_action 'action' => :get_latest
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" }
+ assert_equal expected, @connection.last_transmission
+ end
+ test "subscription confirmation" do
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ assert_equal expected, @connection.last_transmission
+ end
+ test "actions available on Channel" do
+ available_actions = %w(room last_action subscribed unsubscribed toggle_subscribed leave speak subscribed? get_latest receive chatters topic).to_set
+ assert_equal available_actions, ChatChannel.action_methods
+ end
+ test "invalid action on Channel" do
+ assert_logged("Unable to process ActionCable::Channel::BaseTest::ChatChannel#invalid_action") do
+ @channel.perform_action 'action' => :invalid_action
+ end
+ end
+ private
+ def assert_logged(message)
+ old_logger = @connection.logger
+ log = StringIO.new
+ @connection.instance_variable_set(:@logger, Logger.new(log))
+ begin
+ yield
+ log.rewind
+ assert_match message, log.read
+ ensure
+ @connection.instance_variable_set(:@logger, old_logger)
+ end
+ end
diff --git a/actioncable/test/channel/broadcasting_test.rb b/actioncable/test/channel/broadcasting_test.rb
new file mode 100644
index 0000000000..1de04243e5
--- /dev/null
+++ b/actioncable/test/channel/broadcasting_test.rb
@@ -0,0 +1,29 @@
+require 'test_helper'
+require 'stubs/test_connection'
+require 'stubs/room'
+class ActionCable::Channel::BroadcastingTest < ActiveSupport::TestCase
+ class ChatChannel < ActionCable::Channel::Base
+ end
+ setup do
+ @connection = TestConnection.new
+ end
+ test "broadcasts_to" do
+ ActionCable.stubs(:server).returns mock().tap { |m| m.expects(:broadcast).with('action_cable:channel:broadcasting_test:chat:Room#1-Campfire', "Hello World") }
+ ChatChannel.broadcast_to(Room.new(1), "Hello World")
+ end
+ test "broadcasting_for with an object" do
+ assert_equal "Room#1-Campfire", ChatChannel.broadcasting_for(Room.new(1))
+ end
+ test "broadcasting_for with an array" do
+ assert_equal "Room#1-Campfire:Room#2-Campfire", ChatChannel.broadcasting_for([ Room.new(1), Room.new(2) ])
+ end
+ test "broadcasting_for with a string" do
+ assert_equal "hello", ChatChannel.broadcasting_for("hello")
+ end
diff --git a/actioncable/test/channel/naming_test.rb b/actioncable/test/channel/naming_test.rb
new file mode 100644
index 0000000000..89ef6ad8b0
--- /dev/null
+++ b/actioncable/test/channel/naming_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+class ActionCable::Channel::NamingTest < ActiveSupport::TestCase
+ class ChatChannel < ActionCable::Channel::Base
+ end
+ test "channel_name" do
+ assert_equal "action_cable:channel:naming_test:chat", ChatChannel.channel_name
+ end
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
new file mode 100644
index 0000000000..64f0247cd6
--- /dev/null
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -0,0 +1,40 @@
+require 'test_helper'
+require 'stubs/test_connection'
+require 'stubs/room'
+class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
+ class ChatChannel < ActionCable::Channel::Base
+ periodically -> { ping }, every: 5
+ periodically :send_updates, every: 1
+ private
+ def ping
+ end
+ end
+ setup do
+ @connection = TestConnection.new
+ end
+ test "periodic timers definition" do
+ timers = ChatChannel.periodic_timers
+ assert_equal 2, timers.size
+ first_timer = timers[0]
+ assert_kind_of Proc, first_timer[0]
+ assert_equal 5, first_timer[1][:every]
+ second_timer = timers[1]
+ assert_equal :send_updates, second_timer[0]
+ assert_equal 1, second_timer[1][:every]
+ end
+ test "timer start and stop" do
+ Concurrent::TimerTask.expects(:new).times(2).returns(true)
+ channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
+ channel.expects(:stop_periodic_timers).once
+ channel.unsubscribe_from_channel
+ end
diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb
new file mode 100644
index 0000000000..aa93396d44
--- /dev/null
+++ b/actioncable/test/channel/rejection_test.rb
@@ -0,0 +1,25 @@
+require 'test_helper'
+require 'stubs/test_connection'
+require 'stubs/room'
+class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
+ class SecretChannel < ActionCable::Channel::Base
+ def subscribed
+ reject if params[:id] > 0
+ end
+ end
+ setup do
+ @user = User.new "lifo"
+ @connection = TestConnection.new(@user)
+ end
+ test "subscription rejection" do
+ @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
+ @channel = SecretChannel.new @connection, "{id: 1}", { id: 1 }
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription"
+ assert_equal expected, @connection.last_transmission
+ end
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
new file mode 100644
index 0000000000..947efd96d4
--- /dev/null
+++ b/actioncable/test/channel/stream_test.rb
@@ -0,0 +1,74 @@
+require 'test_helper'
+require 'stubs/test_connection'
+require 'stubs/room'
+class ActionCable::Channel::StreamTest < ActionCable::TestCase
+ class ChatChannel < ActionCable::Channel::Base
+ def subscribed
+ if params[:id]
+ @room = Room.new params[:id]
+ stream_from "test_room_#{@room.id}"
+ end
+ end
+ def send_confirmation
+ transmit_subscription_confirmation
+ end
+ end
+ test "streaming start and stop" do
+ run_in_eventmachine do
+ connection = TestConnection.new
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
+ channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
+ channel.unsubscribe_from_channel
+ end
+ end
+ test "stream_for" do
+ run_in_eventmachine do
+ connection = TestConnection.new
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
+ channel = ChatChannel.new connection, ""
+ channel.stream_for Room.new(1)
+ end
+ end
+ test "stream_from subscription confirmation" do
+ run_in_eventmachine do
+ connection = TestConnection.new
+ ChatChannel.new connection, "{id: 1}", { id: 1 }
+ assert_nil connection.last_transmission
+ wait_for_async
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ connection.transmit(expected)
+ assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
+ end
+ end
+ test "subscription confirmation should only be sent out once" do
+ run_in_eventmachine do
+ connection = TestConnection.new
+ channel = ChatChannel.new connection, "test_channel"
+ channel.send_confirmation
+ channel.send_confirmation
+ wait_for_async
+ expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
+ assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
+ assert_equal 1, connection.transmissions.size
+ end
+ end
diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb
new file mode 100644
index 0000000000..63e35f194a
--- /dev/null
+++ b/actioncable/test/client/echo_channel.rb
@@ -0,0 +1,18 @@
+class EchoChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "global"
+ end
+ def ding(data)
+ transmit(dong: data['message'])
+ end
+ def delay(data)
+ sleep 1
+ transmit(dong: data['message'])
+ end
+ def bulk(data)
+ ActionCable.server.broadcast "global", wide: data['message']
+ end
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
new file mode 100644
index 0000000000..199d2b90a3
--- /dev/null
+++ b/actioncable/test/client_test.rb
@@ -0,0 +1,219 @@
+require 'test_helper'
+require 'concurrent'
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+require 'faye/websocket'
+require 'json'
+class ClientTest < ActionCable::TestCase
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+ ActionCable.instance_variable_set(:@server, nil)
+ server = ActionCable.server
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+ server.config.cable = { adapter: 'async' }.with_indifferent_access
+ # and now the "real" setup for our test:
+ server.config.disable_request_forgery_protection = true
+ server.config.channel_load_paths = [File.expand_path('client', __dir__)]
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ # faye-websocket is warning-rich
+ @previous_verbose, $VERBOSE = $VERBOSE, nil
+ end
+ def teardown
+ $VERBOSE = @previous_verbose
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+ def with_puma_server(rack_app = ActionCable.server, port = 3099)
+ server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
+ server.add_tcp_listener '', port
+ server.min_threads = 1
+ server.max_threads = 4
+ t = Thread.new { server.run.join }
+ yield port
+ ensure
+ server.stop(true) if server
+ t.join if t
+ end
+ class SyncClient
+ attr_reader :pings
+ def initialize(port)
+ @ws = Faye::WebSocket::Client.new("ws://{port}/")
+ @messages = Queue.new
+ @closed = Concurrent::Event.new
+ @has_messages = Concurrent::Event.new
+ @pings = 0
+ open = Concurrent::Event.new
+ error = nil
+ @ws.on(:error) do |event|
+ if open.set?
+ @messages << RuntimeError.new(event.message)
+ else
+ error = event.message
+ open.set
+ end
+ end
+ @ws.on(:open) do |event|
+ open.set
+ end
+ @ws.on(:message) do |event|
+ hash = JSON.parse(event.data)
+ if hash['identifier'] == '_ping'
+ @pings += 1
+ else
+ @messages << hash
+ @has_messages.set
+ end
+ end
+ @ws.on(:close) do |event|
+ @closed.set
+ end
+ raise error if error
+ end
+ def read_message
+ @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty?
+ @has_messages.reset if @messages.size < 2
+ msg = @messages.pop(true)
+ raise msg if msg.is_a?(Exception)
+ msg
+ end
+ def read_messages(expected_size = 0)
+ list = []
+ loop do
+ @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
+ if @has_messages.set?
+ list << read_message
+ else
+ break
+ end
+ end
+ list
+ end
+ def send_message(hash)
+ @ws.send(JSON.dump(hash))
+ end
+ def close
+ unless @messages.empty?
+ raise "#{@messages.size} messages unprocessed"
+ end
+ @ws.close
+ end
+ end
+ def faye_client(port)
+ SyncClient.new(port)
+ end
+ def test_single_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close
+ end
+ end
+ def test_interacting_clients
+ with_puma_server do |port|
+ clients = 10.times.map { faye_client(port) }
+ barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
+ barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
+ assert_equal clients.size, c.read_messages(clients.size).size
+ } }.each(&:wait!)
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+ def test_many_clients
+ with_puma_server do |port|
+ clients = 200.times.map { faye_client(port) }
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ } }.each(&:wait!)
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+ def test_disappearing_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
+ c.close # disappear before write
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close # disappear before read
+ end
+ end
diff --git a/actioncable/test/connection/authorization_test.rb b/actioncable/test/connection/authorization_test.rb
new file mode 100644
index 0000000000..87d0e79ef3
--- /dev/null
+++ b/actioncable/test/connection/authorization_test.rb
@@ -0,0 +1,31 @@
+require 'test_helper'
+require 'stubs/test_server'
+class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase
+ class Connection < ActionCable::Connection::Base
+ attr_reader :websocket
+ def connect
+ reject_unauthorized_connection
+ end
+ def send_async(method, *args)
+ send method, *args
+ end
+ end
+ test "unauthorized connection" do
+ run_in_eventmachine do
+ server = TestServer.new
+ server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
+ 'HTTP_ORIGIN' => 'http://rubyonrails.com'
+ connection = Connection.new(server, env)
+ connection.websocket.expects(:close)
+ connection.process
+ end
+ end
diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb
new file mode 100644
index 0000000000..e2b017a9a1
--- /dev/null
+++ b/actioncable/test/connection/base_test.rb
@@ -0,0 +1,118 @@
+require 'test_helper'
+require 'stubs/test_server'
+class ActionCable::Connection::BaseTest < ActionCable::TestCase
+ class Connection < ActionCable::Connection::Base
+ attr_reader :websocket, :subscriptions, :message_buffer, :connected
+ def connect
+ @connected = true
+ end
+ def disconnect
+ @connected = false
+ end
+ def send_async(method, *args)
+ send method, *args
+ end
+ end
+ setup do
+ @server = TestServer.new
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ end
+ test "making a connection with invalid headers" do
+ run_in_eventmachine do
+ connection = ActionCable::Connection::Base.new(@server, Rack::MockRequest.env_for("/test"))
+ response = connection.process
+ assert_equal 404, response[0]
+ end
+ end
+ test "websocket connection" do
+ run_in_eventmachine do
+ connection = open_connection
+ connection.process
+ assert connection.websocket.possible?
+ wait_for_async
+ assert connection.websocket.alive?
+ end
+ end
+ test "rack response" do
+ run_in_eventmachine do
+ connection = open_connection
+ response = connection.process
+ assert_equal [ -1, {}, [] ], response
+ end
+ end
+ test "on connection open" do
+ run_in_eventmachine do
+ connection = open_connection
+ connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
+ connection.message_buffer.expects(:process!)
+ connection.process
+ wait_for_async
+ assert_equal [ connection ], @server.connections
+ assert connection.connected
+ end
+ end
+ test "on connection close" do
+ run_in_eventmachine do
+ connection = open_connection
+ connection.process
+ # Setup the connection
+ Concurrent::TimerTask.stubs(:new).returns(true)
+ connection.send :handle_open
+ assert connection.connected
+ connection.subscriptions.expects(:unsubscribe_from_all)
+ connection.send :handle_close
+ assert ! connection.connected
+ assert_equal [], @server.connections
+ end
+ end
+ test "connection statistics" do
+ run_in_eventmachine do
+ connection = open_connection
+ connection.process
+ statistics = connection.statistics
+ assert statistics[:identifier].blank?
+ assert_kind_of Time, statistics[:started_at]
+ assert_equal [], statistics[:subscriptions]
+ end
+ end
+ test "explicitly closing a connection" do
+ run_in_eventmachine do
+ connection = open_connection
+ connection.process
+ connection.websocket.expects(:close)
+ connection.close
+ end
+ end
+ private
+ def open_connection
+ env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
+ 'HTTP_ORIGIN' => 'http://rubyonrails.com'
+ Connection.new(@server, env)
+ end
diff --git a/actioncable/test/connection/cross_site_forgery_test.rb b/actioncable/test/connection/cross_site_forgery_test.rb
new file mode 100644
index 0000000000..a29f65fb97
--- /dev/null
+++ b/actioncable/test/connection/cross_site_forgery_test.rb
@@ -0,0 +1,81 @@
+require 'test_helper'
+require 'stubs/test_server'
+class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase
+ HOST = 'rubyonrails.com'
+ class Connection < ActionCable::Connection::Base
+ def send_async(method, *args)
+ send method, *args
+ end
+ end
+ setup do
+ @server = TestServer.new
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ end
+ teardown do
+ @server.config.disable_request_forgery_protection = false
+ @server.config.allowed_request_origins = []
+ end
+ test "disable forgery protection" do
+ @server.config.disable_request_forgery_protection = true
+ assert_origin_allowed 'http://rubyonrails.com'
+ assert_origin_allowed 'http://hax.com'
+ end
+ test "explicitly specified a single allowed origin" do
+ @server.config.allowed_request_origins = 'http://hax.com'
+ assert_origin_not_allowed 'http://rubyonrails.com'
+ assert_origin_allowed 'http://hax.com'
+ end
+ test "explicitly specified multiple allowed origins" do
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com http://www.rubyonrails.com )
+ assert_origin_allowed 'http://rubyonrails.com'
+ assert_origin_allowed 'http://www.rubyonrails.com'
+ assert_origin_not_allowed 'http://hax.com'
+ end
+ test "explicitly specified a single regexp allowed origin" do
+ @server.config.allowed_request_origins = /.*ha.*/
+ assert_origin_not_allowed 'http://rubyonrails.com'
+ assert_origin_allowed 'http://hax.com'
+ end
+ test "explicitly specified multiple regexp allowed origins" do
+ @server.config.allowed_request_origins = [/http:\/\/ruby.*/, /.*rai.s.*com/, 'string' ]
+ assert_origin_allowed 'http://rubyonrails.com'
+ assert_origin_allowed 'http://www.rubyonrails.com'
+ assert_origin_not_allowed 'http://hax.com'
+ assert_origin_not_allowed 'http://rails.co.uk'
+ end
+ private
+ def assert_origin_allowed(origin)
+ response = connect_with_origin origin
+ assert_equal(-1, response[0])
+ end
+ def assert_origin_not_allowed(origin)
+ response = connect_with_origin origin
+ assert_equal 404, response[0]
+ end
+ def connect_with_origin(origin)
+ response = nil
+ run_in_eventmachine do
+ response = Connection.new(@server, env_for_origin(origin)).process
+ end
+ response
+ end
+ def env_for_origin(origin)
+ Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST,
+ 'HTTP_ORIGIN' => origin
+ end
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
new file mode 100644
index 0000000000..1019ad541e
--- /dev/null
+++ b/actioncable/test/connection/identifier_test.rb
@@ -0,0 +1,77 @@
+require 'test_helper'
+require 'stubs/test_server'
+require 'stubs/user'
+class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
+ class Connection < ActionCable::Connection::Base
+ identified_by :current_user
+ attr_reader :websocket
+ public :process_internal_message
+ def connect
+ self.current_user = User.new "lifo"
+ end
+ end
+ test "connection identifier" do
+ run_in_eventmachine do
+ open_connection_with_stubbed_pubsub
+ assert_equal "User#lifo", @connection.connection_identifier
+ end
+ end
+ test "should subscribe to internal channel on open and unsubscribe on close" do
+ run_in_eventmachine do
+ pubsub = mock('pubsub_adapter')
+ pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc))
+ server = TestServer.new
+ server.stubs(:pubsub).returns(pubsub)
+ open_connection server: server
+ close_connection
+ end
+ end
+ test "processing disconnect message" do
+ run_in_eventmachine do
+ open_connection_with_stubbed_pubsub
+ @connection.websocket.expects(:close)
+ message = ActiveSupport::JSON.encode('type' => 'disconnect')
+ @connection.process_internal_message message
+ end
+ end
+ test "processing invalid message" do
+ run_in_eventmachine do
+ open_connection_with_stubbed_pubsub
+ @connection.websocket.expects(:close).never
+ message = ActiveSupport::JSON.encode('type' => 'unknown')
+ @connection.process_internal_message message
+ end
+ end
+ protected
+ def open_connection_with_stubbed_pubsub
+ server = TestServer.new
+ server.stubs(:adapter).returns(stub_everything('adapter'))
+ open_connection server: server
+ end
+ def open_connection(server:)
+ env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ @connection = Connection.new(server, env)
+ @connection.process
+ @connection.send :handle_open
+ end
+ def close_connection
+ @connection.send :handle_close
+ end
diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb
new file mode 100644
index 0000000000..e9bb4e6d7f
--- /dev/null
+++ b/actioncable/test/connection/multiple_identifiers_test.rb
@@ -0,0 +1,41 @@
+require 'test_helper'
+require 'stubs/test_server'
+require 'stubs/user'
+class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase
+ class Connection < ActionCable::Connection::Base
+ identified_by :current_user, :current_room
+ def connect
+ self.current_user = User.new "lifo"
+ self.current_room = Room.new "my", "room"
+ end
+ end
+ test "multiple connection identifiers" do
+ run_in_eventmachine do
+ open_connection_with_stubbed_pubsub
+ assert_equal "Room#my-room:User#lifo", @connection.connection_identifier
+ end
+ end
+ protected
+ def open_connection_with_stubbed_pubsub
+ server = TestServer.new
+ server.stubs(:pubsub).returns(stub_everything('pubsub'))
+ open_connection server: server
+ end
+ def open_connection(server:)
+ env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ @connection = Connection.new(server, env)
+ @connection.process
+ @connection.send :handle_open
+ end
+ def close_connection
+ @connection.send :handle_close
+ end
diff --git a/actioncable/test/connection/string_identifier_test.rb b/actioncable/test/connection/string_identifier_test.rb
new file mode 100644
index 0000000000..9d0bda83ef
--- /dev/null
+++ b/actioncable/test/connection/string_identifier_test.rb
@@ -0,0 +1,43 @@
+require 'test_helper'
+require 'stubs/test_server'
+class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
+ class Connection < ActionCable::Connection::Base
+ identified_by :current_token
+ def connect
+ self.current_token = "random-string"
+ end
+ def send_async(method, *args)
+ send method, *args
+ end
+ end
+ test "connection identifier" do
+ run_in_eventmachine do
+ open_connection_with_stubbed_pubsub
+ assert_equal "random-string", @connection.connection_identifier
+ end
+ end
+ protected
+ def open_connection_with_stubbed_pubsub
+ @server = TestServer.new
+ @server.stubs(:pubsub).returns(stub_everything('pubsub'))
+ open_connection
+ end
+ def open_connection
+ env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ @connection = Connection.new(@server, env)
+ @connection.process
+ @connection.send :on_open
+ end
+ def close_connection
+ @connection.send :on_close
+ end
diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb
new file mode 100644
index 0000000000..62e41484fe
--- /dev/null
+++ b/actioncable/test/connection/subscriptions_test.rb
@@ -0,0 +1,115 @@
+require 'test_helper'
+class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
+ class Connection < ActionCable::Connection::Base
+ attr_reader :websocket
+ def send_async(method, *args)
+ send method, *args
+ end
+ end
+ class ChatChannel < ActionCable::Channel::Base
+ attr_reader :room, :lines
+ def subscribed
+ @room = Room.new params[:id]
+ @lines = []
+ end
+ def speak(data)
+ @lines << data
+ end
+ end
+ setup do
+ @server = TestServer.new
+ @server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
+ @chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel')
+ end
+ test "subscribe command" do
+ run_in_eventmachine do
+ setup_connection
+ channel = subscribe_to_chat_channel
+ assert_kind_of ChatChannel, channel
+ assert_equal 1, channel.room.id
+ end
+ end
+ test "subscribe command without an identifier" do
+ run_in_eventmachine do
+ setup_connection
+ @subscriptions.execute_command 'command' => 'subscribe'
+ assert @subscriptions.identifiers.empty?
+ end
+ end
+ test "unsubscribe command" do
+ run_in_eventmachine do
+ setup_connection
+ subscribe_to_chat_channel
+ channel = subscribe_to_chat_channel
+ channel.expects(:unsubscribe_from_channel)
+ @subscriptions.execute_command 'command' => 'unsubscribe', 'identifier' => @chat_identifier
+ assert @subscriptions.identifiers.empty?
+ end
+ end
+ test "unsubscribe command without an identifier" do
+ run_in_eventmachine do
+ setup_connection
+ @subscriptions.execute_command 'command' => 'unsubscribe'
+ assert @subscriptions.identifiers.empty?
+ end
+ end
+ test "message command" do
+ run_in_eventmachine do
+ setup_connection
+ channel = subscribe_to_chat_channel
+ data = { 'content' => 'Hello World!', 'action' => 'speak' }
+ @subscriptions.execute_command 'command' => 'message', 'identifier' => @chat_identifier, 'data' => ActiveSupport::JSON.encode(data)
+ assert_equal [ data ], channel.lines
+ end
+ end
+ test "unsubscrib from all" do
+ run_in_eventmachine do
+ setup_connection
+ channel1 = subscribe_to_chat_channel
+ channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel')
+ channel2 = subscribe_to_chat_channel(channel2_id)
+ channel1.expects(:unsubscribe_from_channel)
+ channel2.expects(:unsubscribe_from_channel)
+ @subscriptions.unsubscribe_from_all
+ end
+ end
+ private
+ def subscribe_to_chat_channel(identifier = @chat_identifier)
+ @subscriptions.execute_command 'command' => 'subscribe', 'identifier' => identifier
+ assert_equal identifier, @subscriptions.identifiers.last
+ @subscriptions.send :find, 'identifier' => identifier
+ end
+ def setup_connection
+ env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ @connection = Connection.new(@server, env)
+ @subscriptions = ActionCable::Connection::Subscriptions.new(@connection)
+ end
diff --git a/actioncable/test/stubs/global_id.rb b/actioncable/test/stubs/global_id.rb
new file mode 100644
index 0000000000..334f0d03e8
--- /dev/null
+++ b/actioncable/test/stubs/global_id.rb
@@ -0,0 +1,8 @@
+class GlobalID
+ attr_reader :uri
+ delegate :to_param, :to_s, to: :uri
+ def initialize(gid, options = {})
+ @uri = gid
+ end
diff --git a/actioncable/test/stubs/room.rb b/actioncable/test/stubs/room.rb
new file mode 100644
index 0000000000..cd66a0b687
--- /dev/null
+++ b/actioncable/test/stubs/room.rb
@@ -0,0 +1,16 @@
+class Room
+ attr_reader :id, :name
+ def initialize(id, name='Campfire')
+ @id = id
+ @name = name
+ end
+ def to_global_id
+ GlobalID.new("Room##{id}-#{name}")
+ end
+ def to_gid_param
+ to_global_id.to_param
+ end
diff --git a/actioncable/test/stubs/test_adapter.rb b/actioncable/test/stubs/test_adapter.rb
new file mode 100644
index 0000000000..bbd142b287
--- /dev/null
+++ b/actioncable/test/stubs/test_adapter.rb
@@ -0,0 +1,10 @@
+class SuccessAdapter < ActionCable::SubscriptionAdapter::Base
+ def broadcast(channel, payload)
+ end
+ def subscribe(channel, callback, success_callback = nil)
+ end
+ def unsubscribe(channel, callback)
+ end
diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb
new file mode 100644
index 0000000000..da98201900
--- /dev/null
+++ b/actioncable/test/stubs/test_connection.rb
@@ -0,0 +1,25 @@
+require 'stubs/user'
+class TestConnection
+ attr_reader :identifiers, :logger, :current_user, :transmissions
+ def initialize(user = User.new("lifo"))
+ @identifiers = [ :current_user ]
+ @current_user = user
+ @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
+ @transmissions = []
+ end
+ def pubsub
+ SuccessAdapter.new(TestServer.new)
+ end
+ def transmit(data)
+ @transmissions << data
+ end
+ def last_transmission
+ @transmissions.last
+ end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
new file mode 100644
index 0000000000..56d132b30a
--- /dev/null
+++ b/actioncable/test/stubs/test_server.rb
@@ -0,0 +1,20 @@
+require 'ostruct'
+class TestServer
+ include ActionCable::Server::Connections
+ attr_reader :logger, :config
+ def initialize
+ @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
+ @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
+ end
+ def pubsub
+ @config.subscription_adapter.new(self)
+ end
+ def stream_event_loop
+ @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ end
diff --git a/actioncable/test/stubs/user.rb b/actioncable/test/stubs/user.rb
new file mode 100644
index 0000000000..a66b4f87d5
--- /dev/null
+++ b/actioncable/test/stubs/user.rb
@@ -0,0 +1,15 @@
+class User
+ attr_reader :name
+ def initialize(name)
+ @name = name
+ end
+ def to_global_id
+ GlobalID.new("User##{name}")
+ end
+ def to_gid_param
+ to_global_id.to_param
+ end
diff --git a/actioncable/test/subscription_adapter/async_test.rb b/actioncable/test/subscription_adapter/async_test.rb
new file mode 100644
index 0000000000..8f413f14c2
--- /dev/null
+++ b/actioncable/test/subscription_adapter/async_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+class AsyncAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+ def setup
+ super
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+ def cable_config
+ { adapter: 'async' }
+ end
diff --git a/actioncable/test/subscription_adapter/base_test.rb b/actioncable/test/subscription_adapter/base_test.rb
new file mode 100644
index 0000000000..7a7ae131e6
--- /dev/null
+++ b/actioncable/test/subscription_adapter/base_test.rb
@@ -0,0 +1,73 @@
+require 'test_helper'
+require 'stubs/test_server'
+class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase
+ class BrokenAdapter < ActionCable::SubscriptionAdapter::Base
+ end
+ setup do
+ @server = TestServer.new
+ @server.config.subscription_adapter = BrokenAdapter
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ end
+ test "#broadcast returns NotImplementedError by default" do
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).broadcast('channel', 'payload')
+ end
+ end
+ test "#subscribe returns NotImplementedError by default" do
+ callback = lambda { puts 'callback' }
+ success_callback = lambda { puts 'success' }
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).subscribe('channel', callback, success_callback)
+ end
+ end
+ test "#unsubscribe returns NotImplementedError by default" do
+ callback = lambda { puts 'callback' }
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).unsubscribe('channel', callback)
+ end
+ end
+ test "#broadcast is implemented" do
+ broadcast = SuccessAdapter.new(@server).broadcast('channel', 'payload')
+ assert_respond_to(SuccessAdapter.new(@server), :broadcast)
+ assert_nothing_raised NotImplementedError do
+ broadcast
+ end
+ end
+ test "#subscribe is implemented" do
+ callback = lambda { puts 'callback' }
+ success_callback = lambda { puts 'success' }
+ subscribe = SuccessAdapter.new(@server).subscribe('channel', callback, success_callback)
+ assert_respond_to(SuccessAdapter.new(@server), :subscribe)
+ assert_nothing_raised NotImplementedError do
+ subscribe
+ end
+ end
+ test "#unsubscribe is implemented" do
+ callback = lambda { puts 'callback' }
+ unsubscribe = SuccessAdapter.new(@server).unsubscribe('channel', callback)
+ assert_respond_to(SuccessAdapter.new(@server), :unsubscribe)
+ assert_nothing_raised NotImplementedError do
+ unsubscribe
+ end
+ end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
new file mode 100644
index 0000000000..361858784e
--- /dev/null
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -0,0 +1,139 @@
+require 'test_helper'
+require 'concurrent'
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+module CommonSubscriptionAdapterTest
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+ server = ActionCable::Server::Base.new
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+ # and now the "real" setup for our test:
+ server.config.cable = cable_config.with_indifferent_access
+ adapter_klass = server.config.pubsub_adapter
+ @rx_adapter = adapter_klass.new(server)
+ @tx_adapter = adapter_klass.new(server)
+ end
+ def teardown
+ @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
+ @rx_adapter.shutdown if @rx_adapter
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+ def subscribe_as_queue(channel, adapter = @rx_adapter)
+ queue = Queue.new
+ callback = -> data { queue << data }
+ subscribed = Concurrent::Event.new
+ adapter.subscribe(channel, callback, Proc.new { subscribed.set })
+ subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ assert subscribed.set?
+ yield queue
+ assert_empty queue
+ ensure
+ adapter.unsubscribe(channel, callback) if subscribed.set?
+ end
+ def test_subscribe_and_unsubscribe
+ subscribe_as_queue('channel') do |queue|
+ end
+ end
+ def test_basic_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'hello world')
+ assert_equal 'hello world', queue.pop
+ end
+ end
+ def test_broadcast_after_unsubscribe
+ keep_queue = nil
+ subscribe_as_queue('channel') do |queue|
+ keep_queue = queue
+ @tx_adapter.broadcast('channel', 'hello world')
+ assert_equal 'hello world', queue.pop
+ end
+ @tx_adapter.broadcast('channel', 'hello void')
+ assert_empty keep_queue
+ end
+ def test_multiple_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'bananas')
+ @tx_adapter.broadcast('channel', 'apples')
+ received = []
+ 2.times { received << queue.pop }
+ assert_equal ['apples', 'bananas'], received.sort
+ end
+ end
+ def test_identical_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'hello')
+ assert_equal 'hello', queue_2.pop
+ end
+ assert_equal 'hello', queue.pop
+ end
+ end
+ def test_simultaneous_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('other channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'apples')
+ @tx_adapter.broadcast('other channel', 'oranges')
+ assert_equal 'apples', queue.pop
+ assert_equal 'oranges', queue_2.pop
+ end
+ end
+ end
+ def test_channel_filtered_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('other channel', 'one')
+ @tx_adapter.broadcast('channel', 'two')
+ assert_equal 'two', queue.pop
+ end
+ end
diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb
new file mode 100644
index 0000000000..70333e51bd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/evented_redis_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+require_relative './common'
+class EventedRedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+ def cable_config
+ { adapter: 'evented_redis', url: 'redis://' }
+ end
diff --git a/actioncable/test/subscription_adapter/inline_test.rb b/actioncable/test/subscription_adapter/inline_test.rb
new file mode 100644
index 0000000000..75ea51e6b3
--- /dev/null
+++ b/actioncable/test/subscription_adapter/inline_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+class InlineAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+ def setup
+ super
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+ def cable_config
+ { adapter: 'inline' }
+ end
diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb
new file mode 100644
index 0000000000..64c632b0cd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/postgresql_test.rb
@@ -0,0 +1,32 @@
+require 'test_helper'
+require_relative './common'
+require 'active_record'
+class PostgresqlAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+ def setup
+ database_config = { 'adapter' => 'postgresql', 'database' => 'activerecord_unittest' }
+ ar_tests = File.expand_path('../../../activerecord/test', __dir__)
+ if Dir.exist?(ar_tests)
+ require File.join(ar_tests, 'config')
+ require File.join(ar_tests, 'support/config')
+ local_config = ARTest.config['arunit']
+ database_config.update local_config if local_config
+ end
+ ActiveRecord::Base.establish_connection database_config
+ super
+ end
+ def teardown
+ super
+ ActiveRecord::Base.clear_all_connections!
+ end
+ def cable_config
+ { adapter: 'postgresql' }
+ end
diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb
new file mode 100644
index 0000000000..4f34dd86c9
--- /dev/null
+++ b/actioncable/test/subscription_adapter/redis_test.rb
@@ -0,0 +1,16 @@
+require 'test_helper'
+require_relative './common'
+class RedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+ def cable_config
+ { adapter: 'redis', driver: 'ruby', url: 'redis://' }
+ end
+class RedisAdapterTest::Hiredis < RedisAdapterTest
+ def cable_config
+ super.merge(driver: 'hiredis')
+ end
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
new file mode 100644
index 0000000000..8ddbd4e764
--- /dev/null
+++ b/actioncable/test/test_helper.rb
@@ -0,0 +1,28 @@
+require File.expand_path('../../../load_paths', __FILE__)
+require 'action_cable'
+require 'active_support/testing/autorun'
+require 'puma'
+require 'mocha/setup'
+require 'rack/mock'
+# Require all the stubs and models
+Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
+class ActionCable::TestCase < ActiveSupport::TestCase
+ def wait_for_async
+ e = Concurrent.global_io_executor
+ until e.completed_task_count == e.scheduled_task_count
+ sleep 0.1
+ end
+ end
+ def run_in_eventmachine
+ yield
+ wait_for_async
+ end
diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb
new file mode 100644
index 0000000000..4a699cde27
--- /dev/null
+++ b/actioncable/test/worker_test.rb
@@ -0,0 +1,52 @@
+require 'test_helper'
+class WorkerTest < ActiveSupport::TestCase
+ class Receiver
+ attr_accessor :last_action
+ def run
+ @last_action = :run
+ end
+ def process(message)
+ @last_action = [ :process, message ]
+ end
+ def connection
+ self
+ end
+ def logger
+ ActionCable.server.logger
+ end
+ end
+ setup do
+ @worker = ActionCable::Server::Worker.new
+ @receiver = Receiver.new
+ end
+ teardown do
+ @receiver.last_action = nil
+ end
+ test "invoke" do
+ @worker.invoke @receiver, :run
+ assert_equal :run, @receiver.last_action
+ end
+ test "invoke with arguments" do
+ @worker.invoke @receiver, :process, "Hello"
+ assert_equal [ :process, "Hello" ], @receiver.last_action
+ end
+ test "running periodic timers with a proc" do
+ @worker.run_periodic_timer @receiver, @receiver.method(:run)
+ assert_equal :run, @receiver.last_action
+ end
+ test "running periodic timers with a method" do
+ @worker.run_periodic_timer @receiver, :run
+ assert_equal :run, @receiver.last_action
+ end