diff options
45 files changed, 814 insertions, 341 deletions
diff --git a/.gitignore b/.gitignore index 1918a1b0ee..cb2bc5e743 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -test/tests.log
\ No newline at end of file +/gemfiles/*.lock +/test/tests.log diff --git a/.travis.yml b/.travis.yml index 99a95ae240..5e156e2b77 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,19 @@ -sudo: false +language: ruby cache: bundler +sudo: false + rvm: - 2.2 - ruby-head + +gemfile: + - Gemfile + - gemfiles/rails_4-2-stable.gemfile + matrix: fast_finish: true + allow_failures: ruby-head + notifications: email: false irc: @@ -16,4 +25,4 @@ notifications: on_success: change on_failure: always rooms: - - secure: "EZmqsgjEQbWouCx6xL/30jslug7xcq+Dl09twDGjBs369GB5LiUm17/I7d6H1YQFY0Vu2LpiQ/zs+6ihlBjslRV/2RYM3AgAA9OOC3pn7uENFVTXaECi/io1wjvlbMNrf1YJSc3aUyiWKykRsdZnZSFszkDs4DMnZG1s/Oxf1JTYEGNWW3WcOFfYkzcS7NWlOW9OBf4RuzjtLYF05IO4t4FZI1aTWrNV3NNMZ+tqmiQHHNrQE/CzQE3ujqFiea2vVZ7PwvmjVWJgC29UZqS7HcNuq6cCMtMZZuubCZmyT85GjJ/SKTShxFqfV1oCpY3y6kyWcTAQsUoLtPEX0OxLeX+CgWNIJK0rY5+5/v5pZP1uwRsMfLerfp2a9g4fAnlcAKaZjalOc39rOkJl8FdvLQtqFIGWxpjWdJbMrCt3SrnnOccpDqDWpAL798LVBONcOuor71rEeNj1dZ6fCoHTKhLVy6UVm9eUI8zt1APM0xzHgTBI1KBVZi0ikqPcaW604rrNUSk8g/AFQk0pIKyDzV9qYMJD2wnr42cyPKg0gfk1tc9KRCNeH+My1HdZS6Zogpjkc3plAzJQ1DAPY0EBWUlEKghpkyCunjpxN3cw390iKgZUN52phtmGMRkyNnwI8+ELnT4I+Jata1mFyWiETM85q8Rqx+FeA0W/BBsEAp8="
\ No newline at end of file + - secure: "EZmqsgjEQbWouCx6xL/30jslug7xcq+Dl09twDGjBs369GB5LiUm17/I7d6H1YQFY0Vu2LpiQ/zs+6ihlBjslRV/2RYM3AgAA9OOC3pn7uENFVTXaECi/io1wjvlbMNrf1YJSc3aUyiWKykRsdZnZSFszkDs4DMnZG1s/Oxf1JTYEGNWW3WcOFfYkzcS7NWlOW9OBf4RuzjtLYF05IO4t4FZI1aTWrNV3NNMZ+tqmiQHHNrQE/CzQE3ujqFiea2vVZ7PwvmjVWJgC29UZqS7HcNuq6cCMtMZZuubCZmyT85GjJ/SKTShxFqfV1oCpY3y6kyWcTAQsUoLtPEX0OxLeX+CgWNIJK0rY5+5/v5pZP1uwRsMfLerfp2a9g4fAnlcAKaZjalOc39rOkJl8FdvLQtqFIGWxpjWdJbMrCt3SrnnOccpDqDWpAL798LVBONcOuor71rEeNj1dZ6fCoHTKhLVy6UVm9eUI8zt1APM0xzHgTBI1KBVZi0ikqPcaW604rrNUSk8g/AFQk0pIKyDzV9qYMJD2wnr42cyPKg0gfk1tc9KRCNeH+My1HdZS6Zogpjkc3plAzJQ1DAPY0EBWUlEKghpkyCunjpxN3cw390iKgZUN52phtmGMRkyNnwI8+ELnT4I+Jata1mFyWiETM85q8Rqx+FeA0W/BBsEAp8=" @@ -1,2 +1,8 @@ source 'https://rubygems.org' + +gem 'activesupport', github: 'rails/rails' +gem 'actionpack', github: 'rails/rails' +gem 'arel', github: 'rails/arel' +gem 'rack', github: 'rack/rack' + gemspec diff --git a/Gemfile.lock b/Gemfile.lock index 5548531abe..7f128bbdd1 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,47 @@ +GIT + remote: git://github.com/rack/rack.git + revision: 6216a3f8a3560639ee1ddadc1e0d6bf9e5f31830 + specs: + rack (2.0.0.alpha) + json + +GIT + remote: git://github.com/rails/arel.git + revision: 3c429c5d86e9e2201c2a35d934ca6a8911c18e69 + specs: + arel (7.0.0.alpha) + +GIT + remote: git://github.com/rails/rails.git + revision: 960de47f0eef79d234eb3cfc47fabb470fef1529 + specs: + actionpack (5.0.0.alpha) + actionview (= 5.0.0.alpha) + activesupport (= 5.0.0.alpha) + rack (~> 2.x) + rack-test (~> 0.6.3) + rails-dom-testing (~> 1.0, >= 1.0.5) + rails-html-sanitizer (~> 1.0, >= 1.0.2) + actionview (5.0.0.alpha) + activesupport (= 5.0.0.alpha) + builder (~> 3.1) + erubis (~> 2.7.0) + rails-dom-testing (~> 1.0, >= 1.0.5) + rails-html-sanitizer (~> 1.0, >= 1.0.2) + activesupport (5.0.0.alpha) + concurrent-ruby (~> 1.0.0.pre3, < 2.0.0) + i18n (~> 0.7) + json (~> 1.7, >= 1.7.7) + method_source + minitest (~> 5.1) + tzinfo (~> 1.1) + railties (5.0.0.alpha) + actionpack (= 5.0.0.alpha) + activesupport (= 5.0.0.alpha) + method_source + rake (>= 0.8.7) + thor (>= 0.18.1, < 2.0) + PATH remote: . specs: @@ -14,27 +58,8 @@ PATH GEM remote: https://rubygems.org/ specs: - actionpack (4.2.3) - actionview (= 4.2.3) - activesupport (= 4.2.3) - rack (~> 1.6) - rack-test (~> 0.6.2) - rails-dom-testing (~> 1.0, >= 1.0.5) - rails-html-sanitizer (~> 1.0, >= 1.0.2) - actionview (4.2.3) - activesupport (= 4.2.3) - builder (~> 3.1) - erubis (~> 2.7.0) - rails-dom-testing (~> 1.0, >= 1.0.5) - rails-html-sanitizer (~> 1.0, >= 1.0.2) - activesupport (4.2.3) - i18n (~> 0.7) - json (~> 1.7, >= 1.7.7) - minitest (~> 5.1) - thread_safe (~> 0.3, >= 0.3.4) - tzinfo (~> 1.1) builder (3.2.2) - celluloid (0.16.0) + celluloid (0.16.1) timers (~> 4.0.0) coffee-rails (4.1.0) coffee-script (>= 2.2.0) @@ -43,50 +68,46 @@ GEM coffee-script-source execjs coffee-script-source (1.9.1.1) + concurrent-ruby (1.0.0.pre4) em-hiredis (0.3.0) eventmachine (~> 1.0) hiredis (~> 0.5.0) erubis (2.7.0) - eventmachine (1.0.7) - execjs (2.5.2) + eventmachine (1.0.8) + execjs (2.6.0) faye-websocket (0.10.0) eventmachine (>= 0.12.0) websocket-driver (>= 0.5.1) hiredis (0.5.2) - hitimes (1.2.2) + hitimes (1.2.3) i18n (0.7.0) json (1.8.3) - loofah (2.0.2) + loofah (2.0.3) nokogiri (>= 1.5.9) metaclass (0.0.4) + method_source (0.8.2) mini_portile (0.6.2) - minitest (5.7.0) + minitest (5.8.1) mocha (1.1.0) metaclass (~> 0.0.1) nokogiri (1.6.6.2) mini_portile (~> 0.6.0) puma (2.12.2) - rack (1.6.4) rack-test (0.6.3) rack (>= 1.0) rails-deprecated_sanitizer (1.0.3) activesupport (>= 4.2.0.alpha) - rails-dom-testing (1.0.6) + rails-dom-testing (1.0.7) activesupport (>= 4.2.0.beta, < 5.0) nokogiri (~> 1.6.0) rails-deprecated_sanitizer (>= 1.0.1) rails-html-sanitizer (1.0.2) loofah (~> 2.0) - railties (4.2.3) - actionpack (= 4.2.3) - activesupport (= 4.2.3) - rake (>= 0.8.7) - thor (>= 0.18.1, < 2.0) rake (10.4.2) redis (3.2.1) thor (0.19.1) thread_safe (0.3.5) - timers (4.0.1) + timers (4.0.4) hitimes tzinfo (1.2.2) thread_safe (~> 0.1) @@ -99,9 +120,13 @@ PLATFORMS DEPENDENCIES actioncable! + actionpack! + activesupport! + arel! mocha puma + rack! rake BUNDLED WITH - 1.10.5 + 1.10.6 @@ -1,19 +1,20 @@ -# Action Cable – Integrated websockets for Rails +# Action Cable – Integrated WebSockets for Rails [](https://travis-ci.org/rails/actioncable) -Action Cable seamlessly integrates websockets with the rest of your Rails application. +Action Cable seamlessly integrates WebSockets with the rest of your Rails application. It allows for real-time features to be written in Ruby in the same style and form as the rest of your Rails application, while still being performant and scalable. It's a full-stack offering that provides both a client-side JavaScript framework and a server-side Ruby framework. You have access to your full domain model written with ActiveRecord or your ORM of choice. + ## Terminology A single Action Cable server can handle multiple connection instances. It has one -connection instance per websocket connection. A single user may have multiple -websockets open to your application if they use multiple browser tabs or devices. -The client of a websocket connection is called the consumer. +connection instance per WebSocket connection. A single user may have multiple +WebSockets open to your application if they use multiple browser tabs or devices. +The client of a WebSocket connection is called the consumer. Each consumer can in turn subscribe to multiple cable channels. Each channel encapsulates a logical unit of work, similar to what a controller does in a regular MVC setup. For example, you could have a `ChatChannel` and a `AppearancesChannel`, and a consumer could be subscribed to either @@ -33,8 +34,9 @@ As you can see, this is a fairly deep architectural stack. There's a lot of new to identify the new pieces, and on top of that, you're dealing with both client and server side reflections of each unit. +## Examples -## A full-stack example +### A full-stack example The first thing you must do is define your `ApplicationCable::Connection` class in Ruby. This is the place where you authorize the incoming connection, and proceed to establish it @@ -61,6 +63,8 @@ module ApplicationCable end end ``` +Here `identified_by` is a connection identifier that can be used to find the specific connection again or later. +Note that anything marked as an identifier will automatically create a delegate by the same name on any channel instances created off the connection. Then you should define your `ApplicationCable::Channel` class in Ruby. This is the place where you put shared logic between your channels. @@ -99,7 +103,7 @@ itself. This just gives you the plumbing. To make stuff happen, you need content is defined by declaring channels on the server and allowing the consumer to subscribe to them. -## Channel example 1: User appearances +### Channel example 1: User appearances Here's a simple example of a channel that tracks whether a user is online or not and what page they're on. (This is useful for creating presence features like showing a green dot next to a user name if they're online). @@ -165,10 +169,10 @@ Finally, we expose `App.appearance` to the machinations of the application itsel Turbolinks `page:change` callback and allowing the user to click a data-behavior link that triggers the `#away` call. -## Channel example 2: Receiving new web notifications +### Channel example 2: Receiving new web notifications -The appearance example was all about exposing server functionality to client-side invocation over the websocket connection. -But the great thing about websockets is that it's a two-way street. So now let's show an example where the server invokes +The appearance example was all about exposing server functionality to client-side invocation over the WebSocket connection. +But the great thing about WebSockets is that it's a two-way street. So now let's show an example where the server invokes action on the client. This is a web notification channel that allows you to trigger client-side web notifications when you broadcast to the right @@ -177,32 +181,94 @@ streams: ```ruby # app/channels/web_notifications_channel.rb class WebNotificationsChannel < ApplicationCable::Channel - def subscribed - stream_from "web_notifications_#{current_user.id}" - end + def subscribed + stream_from "web_notifications_#{current_user.id}" + end end ``` ```coffeescript -# Somewhere in your app this is called, perhaps from a NewCommentJob -ActionCable.server.broadcast \ - "web_notifications_#{current_user.id}", { title: 'New things!', body: 'All the news that is fit to print' } - # Client-side which assumes you've already requested the right to send web notifications App.cable.subscriptions.create "WebNotificationsChannel", received: (data) -> new Notification data['title'], body: data['body'] ``` +```ruby +# Somewhere in your app this is called, perhaps from a NewCommentJob +ActionCable.server.broadcast \ + "web_notifications_#{current_user.id}", { title: 'New things!', body: 'All the news that is fit to print' } +``` + The `ActionCable.server.broadcast` call places a message in the Redis' pubsub queue under a separate broadcasting name for each user. For a user with an ID of 1, the broadcasting name would be `web_notifications_1`. The channel has been instructed to stream everything that arrives at `web_notifications_1` directly to the client by invoking the `#received(data)` callback. The data is the hash sent as the second parameter to the server-side broadcast call, JSON encoded for the trip across the wire, and unpacked for the data argument arriving to `#received`. -## More complete examples + +### Passing Parameters to Channel + +You can pass parameters from the client side to the server side when creating a subscription. For example: + +```ruby +# app/channels/chat_channel.rb +class ChatChannel < ApplicationCable::Channel + def subscribed + stream_from "chat_#{params[:room]}" + end + end +``` + +Pass an object as the first argument to `subscriptions.create`, and that object will become your params hash in your cable channel. The keyword `channel` is required. + +```coffeescript +# Client-side which assumes you've already requested the right to send web notifications +App.cable.subscriptions.create {channel: "ChatChannel", room: "Best Room"}, + received: (data) -> + new Message data['sent_by'], body: data['body'] +``` + +```ruby +# Somewhere in your app this is called, perhaps from a NewCommentJob +ActionCable.server.broadcast \ + "chat_#{room}", { sent_by: 'Paul', body: 'This is a cool chat app.' } +``` + + +### Rebroadcasting message + +A common use case is to rebroadcast a message sent by one client to any other connected clients. + +```ruby +# app/channels/chat_channel.rb +class ChatChannel < ApplicationCable::Channel + def subscribed + stream_from "chat_#{params[:room]}" + end + + def receive(data) + ActionCable.server.broadcast "chat_#{params[:room]}", data + end +end +``` + +```coffeescript +# Client-side which assumes you've already requested the right to send web notifications +sub = App.cable.subscriptions.create {channel: "ChatChannel", room: "Best Room"}, + received: (data) -> + new Message data['sent_by'], body: data['body'] + +sub.send {sent_by: 'Peter', body: 'Hello Paul, thanks for the compliment.'} +``` + +The rebroadcast will be received by all connected clients, _including_ the client that sent the message. Note that params are the same as they were when you subscribed to the channel. + + +### More complete examples See the [rails/actioncable-examples](http://github.com/rails/actioncable-examples) repository for a full example of how to setup Action Cable in a Rails app and adding channels. + ## Configuration The only must-configure part of Action Cable is the Redis connection. By default, `ActionCable::Server::Base` will look for a configuration @@ -244,9 +310,11 @@ For a full list of all configuration options, see the `ActionCable::Server::Conf Also note that your server must provide at least the same number of database connections as you have workers. The default worker pool is set to 100, so that means you have to make at least that available. You can change that in `config/database.yml` through the `pool` attribute. -## Starting the cable server -As mentioned, the cable server(s) is separated from your normal application server. It's still a rack application, but it is its own rack +## Running the cable server + +### Standalone +The cable server(s) is separated from your normal application server. It's still a rack application, but it is its own rack application. The recommended basic setup is as follows: ```ruby @@ -268,22 +336,38 @@ bundle exec puma -p 28080 cable/config.ru The above will start a cable server on port 28080. Remember to point your client-side setup against that using something like: `App.cable.createConsumer('ws://basecamp.dev:28080')`. +### In app + +If you are using a threaded server like Puma or Thin, the current implementation of ActionCable can run side-along with your Rails application. For example, to listen for WebSocket requests on `/websocket`, match requests on that path: + +```ruby +# config/routes.rb +Example::Application.routes.draw do + match "/websocket", :to => ActionCable.server, via: [:get, :post] +end +``` + +You can use `App.cable.createConsumer('ws://' + window.location.host + '/websocket')` to connect to the cable server. + +For every instance of your server you create and for every worker your server spawns, you will also have a new instance of ActionCable, but the use of Redis keeps messages synced across connections. + +### Notes + Beware that currently the cable server will _not_ auto-reload any changes in the framework. As we've discussed, long-running cable connections mean long-running objects. We don't yet have a way of reloading the classes of those objects in a safe manner. So when you change your channels, or the model your channels use, you must restart the cable server. -Note: We'll get all this abstracted properly when the framework is integrated into Rails. +We'll get all this abstracted properly when the framework is integrated into Rails. ## Dependencies Action Cable is currently tied to Redis through its use of the pubsub feature to route -messages back and forth over the websocket cable connection. This dependency may well +messages back and forth over the WebSocket cable connection. This dependency may well be alleviated in the future, but for the moment that's what it is. So be sure to have Redis installed and running. The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid). - ## Deployment Action Cable is powered by a combination of EventMachine and threads. The @@ -7,5 +7,6 @@ Rake::TestTask.new(:test) do |t| t.libs << "test" t.pattern = 'test/**/*_test.rb' t.verbose = true + t.warning = false end Rake::Task['test'].comment = "Run tests" diff --git a/actioncable.gemspec b/actioncable.gemspec index 02350186db..2f4ae41dc1 100644 --- a/actioncable.gemspec +++ b/actioncable.gemspec @@ -1,11 +1,11 @@ -$:.push File.expand_path("../lib", __FILE__) +$:.unshift File.expand_path("../lib", __FILE__) require 'action_cable/version' Gem::Specification.new do |s| s.name = 'actioncable' s.version = ActionCable::VERSION - s.summary = 'Websockets framework for Rails.' - s.description = 'Structure many real-time application concerns into channels over a single websockets connection.' + s.summary = 'WebSocket framework for Rails.' + s.description = 'Structure many real-time application concerns into channels over a single WebSocket connection.' s.license = 'MIT' s.author = ['Pratik Naik', 'David Heinemeier Hansson'] @@ -28,7 +28,7 @@ Gem::Specification.new do |s| s.add_development_dependency 'puma' s.add_development_dependency 'mocha' - s.files = Dir['README', 'lib/**/*'] + s.files = Dir['README.md', 'lib/**/*'] s.has_rdoc = false s.require_path = 'lib' diff --git a/gemfiles/rails_42.gemfile b/gemfiles/rails_42.gemfile new file mode 100644 index 0000000000..8ca60d69db --- /dev/null +++ b/gemfiles/rails_42.gemfile @@ -0,0 +1,5 @@ +source 'https://rubygems.org' + +gem 'rails', '~> 4.2.4' + +gemspec path: '..' diff --git a/lib/action_cable.rb b/lib/action_cable.rb index d2c5251634..89ffa1fda7 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -1,32 +1,21 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - -require 'set' - require 'active_support' -require 'active_support/json' -require 'active_support/concern' -require 'active_support/core_ext/hash/indifferent_access' -require 'active_support/core_ext/module/delegation' -require 'active_support/callbacks' - -require 'faye/websocket' -require 'celluloid' -require 'em-hiredis' -require 'redis' - -require 'action_cable/engine' if defined?(Rails) +require 'active_support/rails' require 'action_cable/version' module ActionCable - autoload :Server, 'action_cable/server' - autoload :Connection, 'action_cable/connection' - autoload :Channel, 'action_cable/channel' - autoload :RemoteConnections, 'action_cable/remote_connections' + extend ActiveSupport::Autoload # Singleton instance of the server module_function def server @server ||= ActionCable::Server::Base.new end + + eager_autoload do + autoload :Server + autoload :Connection + autoload :Channel + autoload :RemoteConnections + end end + +require 'action_cable/engine' if defined?(Rails) diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb index 3b973ba0a7..7ae262ce5f 100644 --- a/lib/action_cable/channel.rb +++ b/lib/action_cable/channel.rb @@ -1,10 +1,14 @@ module ActionCable module Channel - autoload :Base, 'action_cable/channel/base' - autoload :Broadcasting, 'action_cable/channel/broadcasting' - autoload :Callbacks, 'action_cable/channel/callbacks' - autoload :Naming, 'action_cable/channel/naming' - autoload :PeriodicTimers, 'action_cable/channel/periodic_timers' - autoload :Streams, 'action_cable/channel/streams' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Base + autoload :Broadcasting + autoload :Callbacks + autoload :Naming + autoload :PeriodicTimers + autoload :Streams + end end end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 2f1b4a187d..221730dbc4 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -1,6 +1,8 @@ +require 'set' + module ActionCable module Channel - # The channel provides the basic structure of grouping behavior into logical units when communicating over the websocket connection. + # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection. # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply # responding to the subscriber's direct requests. # @@ -71,6 +73,8 @@ module ActionCable include Naming include Broadcasting + SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription'.freeze + on_subscribe :subscribed on_unsubscribe :unsubscribed @@ -118,6 +122,10 @@ module ActionCable @identifier = identifier @params = params + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + delegate_connection_identifiers subscribe_to_channel end @@ -139,7 +147,6 @@ module ActionCable # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel run_unsubscribe_callbacks - logger.info "#{self.class.name} unsubscribed" end @@ -160,9 +167,22 @@ module ActionCable # the proper channel identifier marked as the recipient. def transmit(data, via: nil) logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } - connection.transmit({ identifier: @identifier, message: data }.to_json) + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + end + + + protected + def defer_subscription_confirmation! + @defer_subscription_confirmation = true end + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + + def subscription_confirmation_sent? + @subscription_confirmation_sent + end private def delegate_connection_identifiers @@ -175,8 +195,8 @@ module ActionCable def subscribe_to_channel - logger.info "#{self.class.name} subscribing" run_subscribe_callbacks + transmit_subscription_confirmation unless defer_subscription_confirmation? end @@ -213,6 +233,16 @@ module ActionCable def run_unsubscribe_callbacks self.class.on_unsubscribe_callbacks.each { |callback| send(callback) } end + + def transmit_subscription_confirmation + unless subscription_confirmation_sent? + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE) + + @subscription_confirmation_sent = true + end + end + end end end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index 2d1506ee98..b5ffa17f72 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -69,12 +69,18 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - callback ||= default_stream_callback(broadcasting) + # Hold off the confirmation until pubsub#subscribe is successful + defer_subscription_confirmation! + callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - pubsub.subscribe broadcasting, &callback - logger.info "#{self.class.name} is streaming from #{broadcasting}" + EM.next_tick do + pubsub.subscribe(broadcasting, &callback).callback do |reply| + transmit_subscription_confirmation + logger.info "#{self.class.name} is streaming from #{broadcasting}" + end + end end # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index c63621c519..b672e00682 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,13 +1,16 @@ module ActionCable module Connection - autoload :Authorization, 'action_cable/connection/authorization' - autoload :Base, 'action_cable/connection/base' - autoload :Heartbeat, 'action_cable/connection/heartbeat' - autoload :Identification, 'action_cable/connection/identification' - autoload :InternalChannel, 'action_cable/connection/internal_channel' - autoload :MessageBuffer, 'action_cable/connection/message_buffer' - autoload :WebSocket, 'action_cable/connection/web_socket' - autoload :Subscriptions, 'action_cable/connection/subscriptions' - autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Authorization + autoload :Base + autoload :Identification + autoload :InternalChannel + autoload :MessageBuffer + autoload :WebSocket + autoload :Subscriptions + autoload :TaggedLoggerProxy + end end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 84393845c4..a629f29643 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,8 +1,8 @@ -require 'action_dispatch/http/request' +require 'action_dispatch' module ActionCable module Connection - # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent + # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. @@ -37,8 +37,8 @@ module ActionCable # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. # - # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes - # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection. + # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes + # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. # @@ -59,19 +59,18 @@ module ActionCable @logger = new_tagged_logger || server.logger @websocket = ActionCable::Connection::WebSocket.new(env) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @started_at = Time.now end - # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user. + # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. def process logger.info started_request_message - if websocket.possible? + if websocket.possible? && allow_request_origin? websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } websocket.on(:close) { |event| send_async :on_close } @@ -88,19 +87,18 @@ module ActionCable if websocket.alive? subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else - logger.error "Received data without a live websocket (#{data.inspect})" + logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" end end - # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the + # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. def transmit(data) websocket.transmit data end - # Close the websocket connection. + # Close the WebSocket connection. def close - logger.error "Closing connection" websocket.close end @@ -112,12 +110,21 @@ module ActionCable # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # This can be returned by a health check against the connection. def statistics - { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } + { + identifier: connection_identifier, + started_at: @started_at, + subscriptions: subscriptions.identifiers, + request_id: @env['action_dispatch.request_id'] + } + end + + def beat + transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i) end protected - # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc. + # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @request ||= begin environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application @@ -125,7 +132,7 @@ module ActionCable end end - # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks. + # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks. def cookies request.cookie_jar end @@ -133,16 +140,15 @@ module ActionCable private attr_reader :websocket - attr_reader :heartbeat, :subscriptions, :message_buffer + attr_reader :subscriptions, :message_buffer def on_open - server.add_connection(self) - connect if respond_to?(:connect) subscribe_to_internal_channel - heartbeat.start + beat message_buffer.process! + server.add_connection(self) rescue ActionCable::Connection::Authorization::UnauthorizedError respond_to_invalid_request close @@ -159,12 +165,22 @@ module ActionCable subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel - heartbeat.stop disconnect if respond_to?(:disconnect) end + def allow_request_origin? + return true if server.config.disable_request_forgery_protection + + if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN'] + true + else + logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") + false + end + end + def respond_to_successful_request websocket.rack_response end @@ -187,17 +203,17 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [Websocket]' : '', + websocket.possible? ? ' [WebSocket]' : '', request.ip, - Time.now.to_default_s ] + Time.now.to_s ] end end end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb deleted file mode 100644 index 2918938ba5..0000000000 --- a/lib/action_cable/connection/heartbeat.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - module Connection - # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically - # disconnect. - class Heartbeat - BEAT_INTERVAL = 3 - - def initialize(connection) - @connection = connection - end - - def start - beat - @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } - end - - def stop - EventMachine.cancel_timer(@timer) if @timer - end - - private - attr_reader :connection - - def beat - connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - end - end -end diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb index 701e6885ad..431493aa70 100644 --- a/lib/action_cable/connection/identification.rb +++ b/lib/action_cable/connection/identification.rb @@ -1,3 +1,5 @@ +require 'set' + module ActionCable module Connection module Identification @@ -22,12 +24,22 @@ module ActionCable # Return a single connection identifier that combines the value of all the registered identifiers into a single gid. def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + unless defined? @connection_identifier + @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + @connection_identifier end private def connection_gid(ids) - ids.map { |o| (o.try(:to_global_id) || o).to_s }.sort.join(":") + ids.map do |o| + if o.respond_to? :to_global_id + o.to_global_id + else + o.to_s + end + end.sort.join(":") end end end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index b00e21824c..c065a24ab7 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_redis_subscriptions ||= [] @_internal_redis_subscriptions << [ internal_redis_channel, callback ] - pubsub.subscribe(internal_redis_channel, &callback) + EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } end end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb index d5a8e9eba9..25cff75b41 100644 --- a/lib/action_cable/connection/message_buffer.rb +++ b/lib/action_cable/connection/message_buffer.rb @@ -1,6 +1,6 @@ module ActionCable module Connection - # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them. + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. # Entirely internal operation and should not be used directly by the user. class MessageBuffer def initialize(connection) @@ -50,4 +50,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 69e3f60706..229be2a316 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index 854f613f1c..34063c1d42 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -4,6 +4,8 @@ module ActionCable # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy + attr_reader :tags + def initialize(logger, tags:) @logger = logger @tags = tags.flatten @@ -22,7 +24,8 @@ module ActionCable protected def log(type, message) - @logger.tagged(*@tags) { @logger.send type, message } + current_tags = tags - @logger.formatter.current_tags + @logger.tagged(*current_tags) { @logger.send type, message } end end end diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb index 135a28cfe4..169b683b8c 100644 --- a/lib/action_cable/connection/web_socket.rb +++ b/lib/action_cable/connection/web_socket.rb @@ -1,3 +1,5 @@ +require 'faye/websocket' + module ActionCable module Connection # Decorate the Faye::WebSocket with helpers we need. diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb index 6c943c7971..613a9b99f2 100644 --- a/lib/action_cable/engine.rb +++ b/lib/action_cable/engine.rb @@ -1,4 +1,22 @@ +require 'rails/engine' +require 'active_support/ordered_options' + module ActionCable class Engine < ::Rails::Engine + config.action_cable = ActiveSupport::OrderedOptions.new + + initializer "action_cable.logger" do + ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger } + end + + initializer "action_cable.set_configs" do |app| + options = app.config.action_cable + + options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development? + + ActiveSupport.on_load(:action_cable) do + options.each { |k,v| send("#{k}=", v) } + end + end end end diff --git a/lib/action_cable/process/logging.rb b/lib/action_cable/process/logging.rb index bcceff4bec..72b1a080d1 100644 --- a/lib/action_cable/process/logging.rb +++ b/lib/action_cable/process/logging.rb @@ -1,3 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' +require 'celluloid' + EM.error_handler do |e| puts "Error raised inside the event loop: #{e.message}" puts e.backtrace.join("\n") diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 919ebd94de..a2a89d5f1e 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,11 +1,19 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server - autoload :Base, 'action_cable/server/base' - autoload :Broadcasting, 'action_cable/server/broadcasting' - autoload :Connections, 'action_cable/server/connections' - autoload :Configuration, 'action_cable/server/configuration' + extend ActiveSupport::Autoload + + eager_autoload do + autoload :Base + autoload :Broadcasting + autoload :Connections + autoload :Configuration - autoload :Worker, 'action_cable/server/worker' - autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections' + autoload :Worker + autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management' + end end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 43849928b9..f1585dc776 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -1,3 +1,5 @@ +require 'em-hiredis' + module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -18,6 +20,7 @@ module ActionCable # Called by rack to setup the server. def call(env) + setup_heartbeat_timer config.connection_class.new(self, env).process end @@ -65,5 +68,7 @@ module ActionCable config.connection_class.identifiers end end + + ActiveSupport.run_load_hooks(:action_cable, Base.config) end end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 037b98951e..6e0fbae387 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,3 +1,5 @@ +require 'redis' + module ActionCable module Server # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these @@ -44,9 +46,9 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.broadcasting_redis.publish broadcasting, message.to_json + server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message) end end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/configuration.rb b/lib/action_cable/server/configuration.rb index ac9fa7b085..b22de273b8 100644 --- a/lib/action_cable/server/configuration.rb +++ b/lib/action_cable/server/configuration.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module ActionCable module Server # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points @@ -6,6 +8,7 @@ module ActionCable attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size attr_accessor :redis_path, :channels_path + attr_accessor :disable_request_forgery_protection, :allowed_request_origins def initialize @logger = Rails.logger @@ -16,6 +19,8 @@ module ActionCable @redis_path = Rails.root.join('config/redis/cable.yml') @channels_path = Rails.root.join('app/channels') + + @disable_request_forgery_protection = false end def channel_paths diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb index 15d7c3c8c7..47dcea8c20 100644 --- a/lib/action_cable/server/connections.rb +++ b/lib/action_cable/server/connections.rb @@ -4,6 +4,8 @@ module ActionCable # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. # As such, this is primarily for internal use. module Connections + BEAT_INTERVAL = 3 + def connections @connections ||= [] end @@ -16,9 +18,20 @@ module ActionCable connections.delete connection end + # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you + # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # disconnect. + def setup_heartbeat_timer + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end + end + def open_connections_statistics connections.map(&:statistics) end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb index d7823ecf93..e063b2a2e1 100644 --- a/lib/action_cable/server/worker.rb +++ b/lib/action_cable/server/worker.rb @@ -1,3 +1,6 @@ +require 'celluloid' +require 'active_support/callbacks' + module ActionCable module Server # Worker used by Server.send_async to do connection work in threads. Only for internal use. @@ -5,10 +8,13 @@ module ActionCable include ActiveSupport::Callbacks include Celluloid + attr_reader :connection define_callbacks :work - include ClearDatabaseConnections + include ActiveRecordConnectionManagement def invoke(receiver, method, *args) + @connection = receiver + run_callbacks :work do receiver.send method, *args end @@ -20,6 +26,8 @@ module ActionCable end def run_periodic_timer(channel, callback) + @connection = channel.connection + run_callbacks :work do callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end @@ -31,4 +39,4 @@ module ActionCable end end end -end
\ No newline at end of file +end diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb index 722d363a41..1ede0095f8 100644 --- a/lib/action_cable/server/worker/clear_database_connections.rb +++ b/lib/action_cable/server/worker/active_record_connection_management.rb @@ -2,7 +2,7 @@ module ActionCable module Server class Worker # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. - module ClearDatabaseConnections + module ActiveRecordConnectionManagement extend ActiveSupport::Concern included do @@ -12,7 +12,7 @@ module ActionCable end def with_database_connections - yield + ActiveRecord::Base.logger.tagged(*connection.logger.tags) { yield } ensure ActiveRecord::Base.clear_active_connections! end diff --git a/lib/assets/javascripts/cable.coffee b/lib/assets/javascripts/cable.coffee index 0bd1757505..476d90ef72 100644 --- a/lib/assets/javascripts/cable.coffee +++ b/lib/assets/javascripts/cable.coffee @@ -3,6 +3,8 @@ @Cable = PING_IDENTIFIER: "_ping" + INTERNAL_MESSAGES: + SUBSCRIPTION_CONFIRMATION: 'confirm_subscription' createConsumer: (url) -> new Cable.Consumer url diff --git a/lib/assets/javascripts/cable/connection.coffee b/lib/assets/javascripts/cable/connection.coffee index 2259ddcedd..33159130c7 100644 --- a/lib/assets/javascripts/cable/connection.coffee +++ b/lib/assets/javascripts/cable/connection.coffee @@ -1,5 +1,7 @@ # Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. class Cable.Connection + @reopenDelay: 500 + constructor: (@consumer) -> @open() @@ -10,19 +12,25 @@ class Cable.Connection else false - open: -> - if @isOpen() - throw new Error("Must close existing connection before opening") + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") else @webSocket = new WebSocket(@consumer.url) @installEventHandlers() + true close: -> @webSocket?.close() reopen: -> - @close() - @open() + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) isOpen: -> @isState("open") @@ -40,11 +48,18 @@ class Cable.Connection for eventName of @events handler = @events[eventName].bind(this) @webSocket["on#{eventName}"] = handler + return events: message: (event) -> - {identifier, message} = JSON.parse(event.data) - @consumer.subscriptions.notify(identifier, "received", message) + {identifier, message, type} = JSON.parse(event.data) + + if type? + switch type + when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_CONFIRMATION + @consumer.subscriptions.notify(identifier, "connected") + else + @consumer.subscriptions.notify(identifier, "received", message) open: -> @disconnected = false diff --git a/lib/assets/javascripts/cable/connection_monitor.coffee b/lib/assets/javascripts/cable/connection_monitor.coffee index 30ce11957c..bf99dee34d 100644 --- a/lib/assets/javascripts/cable/connection_monitor.coffee +++ b/lib/assets/javascripts/cable/connection_monitor.coffee @@ -1,15 +1,13 @@ # Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting # revival reconnections if things go astray. Internal class, not intended for direct user manipulation. class Cable.ConnectionMonitor - identifier: Cable.PING_IDENTIFIER - - pollInterval: - min: 2 + @pollInterval: + min: 3 max: 30 - staleThreshold: - startedAt: 4 - pingedAt: 8 + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: Cable.PING_IDENTIFIER constructor: (@consumer) -> @consumer.subscriptions.add(this) @@ -18,12 +16,10 @@ class Cable.ConnectionMonitor connected: -> @reset() @pingedAt = now() + delete @disconnectedAt disconnected: -> - if @reconnectAttempts++ is 0 - setTimeout => - @consumer.connection.open() unless @consumer.connection.isOpen() - , 200 + @disconnectedAt = now() received: -> @pingedAt = now() @@ -50,20 +46,21 @@ class Cable.ConnectionMonitor , @getInterval() getInterval: -> - {min, max} = @pollInterval - interval = 4 * Math.log(@reconnectAttempts + 1) + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) clamp(interval, min, max) * 1000 reconnectIfStale: -> if @connectionIsStale() @reconnectAttempts++ - @consumer.connection.reopen() + unless @disconnectedRecently() + @consumer.connection.reopen() connectionIsStale: -> - if @pingedAt - secondsSince(@pingedAt) > @staleThreshold.pingedAt - else - secondsSince(@startedAt) > @staleThreshold.startedAt + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold visibilityDidChange: => if document.visibilityState is "visible" diff --git a/lib/assets/javascripts/cable/subscriptions.coffee b/lib/assets/javascripts/cable/subscriptions.coffee index eeaa697081..4efb384ee2 100644 --- a/lib/assets/javascripts/cable/subscriptions.coffee +++ b/lib/assets/javascripts/cable/subscriptions.coffee @@ -21,13 +21,11 @@ class Cable.Subscriptions add: (subscription) -> @subscriptions.push(subscription) @notify(subscription, "initialized") - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") reload: -> for subscription in @subscriptions - if @sendCommand(subscription, "subscribe") - @notify(subscription, "connected") + @sendCommand(subscription, "subscribe") remove: (subscription) -> @subscriptions = (s for s in @subscriptions when s isnt subscription) diff --git a/test/channel/base_test.rb b/test/channel/base_test.rb index e7944ff06b..7eb8e15845 100644 --- a/test/channel/base_test.rb +++ b/test/channel/base_test.rb @@ -23,6 +23,11 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase on_subscribe :toggle_subscribed on_unsubscribe :toggle_subscribed + def initialize(*) + @subscribed = false + super + end + def subscribed @room = Room.new params[:id] @actions = [] @@ -134,4 +139,10 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" } assert_equal expected, @connection.last_transmission end + + test "subscription confirmation" do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + assert_equal expected, @connection.last_transmission + end + end diff --git a/test/channel/stream_test.rb b/test/channel/stream_test.rb index b0a6f49072..5e4e01abbf 100644 --- a/test/channel/stream_test.rb +++ b/test/channel/stream_test.rb @@ -2,7 +2,7 @@ require 'test_helper' require 'stubs/test_connection' require 'stubs/room' -class ActionCable::Channel::StreamTest < ActiveSupport::TestCase +class ActionCable::Channel::StreamTest < ActionCable::TestCase class ChatChannel < ActionCable::Channel::Base def subscribed if params[:id] @@ -10,23 +10,71 @@ class ActionCable::Channel::StreamTest < ActiveSupport::TestCase stream_from "test_room_#{@room.id}" end end - end - setup do - @connection = TestConnection.new + def send_confirmation + transmit_subscription_confirmation + end + end test "streaming start and stop" do - @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1") } - channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) } + channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) } - channel.unsubscribe_from_channel + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) } + channel.unsubscribe_from_channel + end end test "stream_for" do - @connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire") } - channel = ChatChannel.new @connection, "" - channel.stream_for Room.new(1) + run_in_eventmachine do + connection = TestConnection.new + EM.next_tick do + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) } + end + + channel = ChatChannel.new connection, "" + channel.stream_for Room.new(1) + end + end + + test "stream_from subscription confirmation" do + EM.run do + connection = TestConnection.new + connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub + + channel = ChatChannel.new connection, "{id: 1}", { id: 1 } + assert_nil connection.last_transmission + + EM::Timer.new(0.1) do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + + EM.run_deferred_callbacks + EM.stop + end + end end + + test "subscription confirmation should only be sent out once" do + EM.run do + connection = TestConnection.new + connection.stubs(:pubsub).returns EM::Hiredis.connect.pubsub + + channel = ChatChannel.new connection, "test_channel" + channel.send_confirmation + channel.send_confirmation + + EM.run_deferred_callbacks + + expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" + + assert_equal 1, connection.transmissions.size + EM.stop + end + end + end diff --git a/test/connection/authorization_test.rb b/test/connection/authorization_test.rb index 09dfead8c8..762c90fbbc 100644 --- a/test/connection/authorization_test.rb +++ b/test/connection/authorization_test.rb @@ -1,7 +1,7 @@ require 'test_helper' require 'stubs/test_server' -class ActionCable::Connection::AuthorizationTest < ActiveSupport::TestCase +class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base attr_reader :websocket @@ -10,17 +10,15 @@ class ActionCable::Connection::AuthorizationTest < ActiveSupport::TestCase end end - setup do - @server = TestServer.new - - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' - @connection = Connection.new(@server, env) - end - test "unauthorized connection" do - @connection.websocket.expects(:close) + run_in_eventmachine do + server = TestServer.new + env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' - @connection.process - @connection.send :on_open + connection = Connection.new(server, env) + connection.websocket.expects(:close) + connection.process + connection.send :on_open + end end end diff --git a/test/connection/base_test.rb b/test/connection/base_test.rb index 2f008652ee..da6041db4a 100644 --- a/test/connection/base_test.rb +++ b/test/connection/base_test.rb @@ -1,9 +1,9 @@ require 'test_helper' require 'stubs/test_server' -class ActionCable::Connection::BaseTest < ActiveSupport::TestCase +class ActionCable::Connection::BaseTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base - attr_reader :websocket, :heartbeat, :subscriptions, :message_buffer, :connected + attr_reader :websocket, :subscriptions, :message_buffer, :connected def connect @connected = true @@ -12,68 +12,107 @@ class ActionCable::Connection::BaseTest < ActiveSupport::TestCase def disconnect @connected = false end + + def send_async(method, *args) + # Bypass Celluloid + send method, *args + end end setup do @server = TestServer.new - - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' - @connection = Connection.new(@server, env) - @response = @connection.process + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end test "making a connection with invalid headers" do - connection = ActionCable::Connection::Base.new(@server, Rack::MockRequest.env_for("/test")) - response = connection.process - assert_equal 404, response[0] + run_in_eventmachine do + connection = ActionCable::Connection::Base.new(@server, Rack::MockRequest.env_for("/test")) + response = connection.process + assert_equal 404, response[0] + end end test "websocket connection" do - assert @connection.websocket.possible? - assert @connection.websocket.alive? + run_in_eventmachine do + connection = open_connection + connection.process + + assert connection.websocket.possible? + assert connection.websocket.alive? + end end test "rack response" do - assert_equal [ -1, {}, [] ], @response + run_in_eventmachine do + connection = open_connection + response = connection.process + + assert_equal [ -1, {}, [] ], response + end end test "on connection open" do - assert ! @connection.connected - - EventMachine.expects(:add_periodic_timer) - @connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) - @connection.message_buffer.expects(:process!) - - @connection.send :on_open - - assert_equal [ @connection ], @server.connections - assert @connection.connected + run_in_eventmachine do + connection = open_connection + connection.process + + connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) + connection.message_buffer.expects(:process!) + + # Allow EM to run on_open callback + EM.next_tick do + assert_equal [ connection ], @server.connections + assert connection.connected + end + end end test "on connection close" do - # Setup the connection - EventMachine.stubs(:add_periodic_timer).returns(true) - @connection.send :on_open - assert @connection.connected + run_in_eventmachine do + connection = open_connection + connection.process + + # Setup the connection + EventMachine.stubs(:add_periodic_timer).returns(true) + connection.send :on_open + assert connection.connected - EventMachine.expects(:cancel_timer) - @connection.subscriptions.expects(:unsubscribe_from_all) - @connection.send :on_close + connection.subscriptions.expects(:unsubscribe_from_all) + connection.send :on_close - assert ! @connection.connected - assert_equal [], @server.connections + assert ! connection.connected + assert_equal [], @server.connections + end end test "connection statistics" do - statistics = @connection.statistics + run_in_eventmachine do + connection = open_connection + connection.process - assert statistics[:identifier].blank? - assert_kind_of Time, statistics[:started_at] - assert_equal [], statistics[:subscriptions] + statistics = connection.statistics + + assert statistics[:identifier].blank? + assert_kind_of Time, statistics[:started_at] + assert_equal [], statistics[:subscriptions] + end end test "explicitly closing a connection" do - @connection.websocket.expects(:close) - @connection.close + run_in_eventmachine do + connection = open_connection + connection.process + + connection.websocket.expects(:close) + connection.close + end end + + private + def open_connection + env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', + 'HTTP_ORIGIN' => 'http://rubyonrails.com' + + Connection.new(@server, env) + end end diff --git a/test/connection/cross_site_forgery_test.rb b/test/connection/cross_site_forgery_test.rb new file mode 100644 index 0000000000..166abb7b38 --- /dev/null +++ b/test/connection/cross_site_forgery_test.rb @@ -0,0 +1,68 @@ +require 'test_helper' +require 'stubs/test_server' + +class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase + HOST = 'rubyonrails.com' + + class Connection < ActionCable::Connection::Base + def send_async(method, *args) + # Bypass Celluloid + send method, *args + end + end + + setup do + @server = TestServer.new + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + end + + teardown do + @server.config.disable_request_forgery_protection = false + @server.config.allowed_request_origins = [] + end + + test "disable forgery protection" do + @server.config.disable_request_forgery_protection = true + assert_origin_allowed 'http://rubyonrails.com' + assert_origin_allowed 'http://hax.com' + end + + test "explicitly specified a single allowed origin" do + @server.config.allowed_request_origins = 'http://hax.com' + assert_origin_not_allowed 'http://rubyonrails.com' + assert_origin_allowed 'http://hax.com' + end + + test "explicitly specified multiple allowed origins" do + @server.config.allowed_request_origins = %w( http://rubyonrails.com http://www.rubyonrails.com ) + assert_origin_allowed 'http://rubyonrails.com' + assert_origin_allowed 'http://www.rubyonrails.com' + assert_origin_not_allowed 'http://hax.com' + end + + private + def assert_origin_allowed(origin) + response = connect_with_origin origin + assert_equal -1, response[0] + end + + def assert_origin_not_allowed(origin) + response = connect_with_origin origin + assert_equal 404, response[0] + end + + def connect_with_origin(origin) + response = nil + + run_in_eventmachine do + response = Connection.new(@server, env_for_origin(origin)).process + end + + response + end + + def env_for_origin(origin) + Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST, + 'HTTP_ORIGIN' => origin + end +end diff --git a/test/connection/identifier_test.rb b/test/connection/identifier_test.rb index 745cf308d0..02e6b21845 100644 --- a/test/connection/identifier_test.rb +++ b/test/connection/identifier_test.rb @@ -2,7 +2,7 @@ require 'test_helper' require 'stubs/test_server' require 'stubs/user' -class ActionCable::Connection::IdentifierTest < ActiveSupport::TestCase +class ActionCable::Connection::IdentifierTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base identified_by :current_user attr_reader :websocket @@ -14,59 +14,59 @@ class ActionCable::Connection::IdentifierTest < ActiveSupport::TestCase end end - setup do - @server = TestServer.new - - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' - @connection = Connection.new(@server, env) - end - test "connection identifier" do - open_connection_with_stubbed_pubsub - assert_equal "User#lifo", @connection.connection_identifier - end - - test "should subscribe to internal channel on open" do - pubsub = mock('pubsub') - pubsub.expects(:subscribe).with('action_cable/User#lifo') - @server.expects(:pubsub).returns(pubsub) - - open_connection + run_in_eventmachine do + open_connection_with_stubbed_pubsub + assert_equal "User#lifo", @connection.connection_identifier + end end - test "should unsubscribe from internal channel on close" do - open_connection_with_stubbed_pubsub + test "should subscribe to internal channel on open and unsubscribe on close" do + run_in_eventmachine do + pubsub = mock('pubsub') + pubsub.expects(:subscribe).with('action_cable/User#lifo') + pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc)) - pubsub = mock('pubsub') - pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc)) - @server.expects(:pubsub).returns(pubsub) + server = TestServer.new + server.stubs(:pubsub).returns(pubsub) - close_connection + open_connection server: server + close_connection + end end test "processing disconnect message" do - open_connection_with_stubbed_pubsub + run_in_eventmachine do + open_connection_with_stubbed_pubsub - @connection.websocket.expects(:close) - message = { 'type' => 'disconnect' }.to_json - @connection.process_internal_message message + @connection.websocket.expects(:close) + message = ActiveSupport::JSON.encode('type' => 'disconnect') + @connection.process_internal_message message + end end test "processing invalid message" do - open_connection_with_stubbed_pubsub + run_in_eventmachine do + open_connection_with_stubbed_pubsub - @connection.websocket.expects(:close).never - message = { 'type' => 'unknown' }.to_json - @connection.process_internal_message message + @connection.websocket.expects(:close).never + message = ActiveSupport::JSON.encode('type' => 'unknown') + @connection.process_internal_message message + end end protected def open_connection_with_stubbed_pubsub - @server.stubs(:pubsub).returns(stub_everything('pubsub')) - open_connection + server = TestServer.new + server.stubs(:pubsub).returns(stub_everything('pubsub')) + + open_connection server: server end - def open_connection + def open_connection(server:) + env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + @connection = Connection.new(server, env) + @connection.process @connection.send :on_open end diff --git a/test/connection/string_identifier_test.rb b/test/connection/string_identifier_test.rb index 87a9025008..ab69df57b3 100644 --- a/test/connection/string_identifier_test.rb +++ b/test/connection/string_identifier_test.rb @@ -1,34 +1,39 @@ require 'test_helper' require 'stubs/test_server' -class ActionCable::Connection::StringIdentifierTest < ActiveSupport::TestCase +class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base identified_by :current_token def connect self.current_token = "random-string" end - end - - setup do - @server = TestServer.new - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' - @connection = Connection.new(@server, env) + def send_async(method, *args) + # Bypass Celluloid + send method, *args + end end test "connection identifier" do - open_connection_with_stubbed_pubsub - assert_equal "random-string", @connection.connection_identifier + run_in_eventmachine do + open_connection_with_stubbed_pubsub + assert_equal "random-string", @connection.connection_identifier + end end protected def open_connection_with_stubbed_pubsub + @server = TestServer.new @server.stubs(:pubsub).returns(stub_everything('pubsub')) + open_connection end def open_connection + env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + @connection = Connection.new(@server, env) + @connection.process @connection.send :on_open end diff --git a/test/connection/subscriptions_test.rb b/test/connection/subscriptions_test.rb index 24fe8f9300..4f6760827e 100644 --- a/test/connection/subscriptions_test.rb +++ b/test/connection/subscriptions_test.rb @@ -1,8 +1,13 @@ require 'test_helper' -class ActionCable::Connection::SubscriptionsTest < ActiveSupport::TestCase +class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base attr_reader :websocket + + def send_async(method, *args) + # Bypass Celluloid + send method, *args + end end class ChatChannel < ActionCable::Channel::Base @@ -22,59 +27,76 @@ class ActionCable::Connection::SubscriptionsTest < ActiveSupport::TestCase @server = TestServer.new @server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel) - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' - @connection = Connection.new(@server, env) - - @subscriptions = ActionCable::Connection::Subscriptions.new(@connection) - @chat_identifier = { id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel' }.to_json + @chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') end test "subscribe command" do - channel = subscribe_to_chat_channel + run_in_eventmachine do + setup_connection + channel = subscribe_to_chat_channel - assert_kind_of ChatChannel, channel - assert_equal 1, channel.room.id + assert_kind_of ChatChannel, channel + assert_equal 1, channel.room.id + end end test "subscribe command without an identifier" do - @subscriptions.execute_command 'command' => 'subscribe' - assert @subscriptions.identifiers.empty? + run_in_eventmachine do + setup_connection + + @subscriptions.execute_command 'command' => 'subscribe' + assert @subscriptions.identifiers.empty? + end end test "unsubscribe command" do - subscribe_to_chat_channel + run_in_eventmachine do + setup_connection + subscribe_to_chat_channel - channel = subscribe_to_chat_channel - channel.expects(:unsubscribe_from_channel) + channel = subscribe_to_chat_channel + channel.expects(:unsubscribe_from_channel) - @subscriptions.execute_command 'command' => 'unsubscribe', 'identifier' => @chat_identifier - assert @subscriptions.identifiers.empty? + @subscriptions.execute_command 'command' => 'unsubscribe', 'identifier' => @chat_identifier + assert @subscriptions.identifiers.empty? + end end test "unsubscribe command without an identifier" do - @subscriptions.execute_command 'command' => 'unsubscribe' - assert @subscriptions.identifiers.empty? + run_in_eventmachine do + setup_connection + + @subscriptions.execute_command 'command' => 'unsubscribe' + assert @subscriptions.identifiers.empty? + end end test "message command" do - channel = subscribe_to_chat_channel + run_in_eventmachine do + setup_connection + channel = subscribe_to_chat_channel - data = { 'content' => 'Hello World!', 'action' => 'speak' } - @subscriptions.execute_command 'command' => 'message', 'identifier' => @chat_identifier, 'data' => data.to_json + data = { 'content' => 'Hello World!', 'action' => 'speak' } + @subscriptions.execute_command 'command' => 'message', 'identifier' => @chat_identifier, 'data' => ActiveSupport::JSON.encode(data) - assert_equal [ data ], channel.lines + assert_equal [ data ], channel.lines + end end test "unsubscrib from all" do - channel1 = subscribe_to_chat_channel + run_in_eventmachine do + setup_connection - channel2_id = { id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel' }.to_json - channel2 = subscribe_to_chat_channel(channel2_id) + channel1 = subscribe_to_chat_channel - channel1.expects(:unsubscribe_from_channel) - channel2.expects(:unsubscribe_from_channel) + channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') + channel2 = subscribe_to_chat_channel(channel2_id) - @subscriptions.unsubscribe_from_all + channel1.expects(:unsubscribe_from_channel) + channel2.expects(:unsubscribe_from_channel) + + @subscriptions.unsubscribe_from_all + end end private @@ -84,4 +106,11 @@ class ActionCable::Connection::SubscriptionsTest < ActiveSupport::TestCase @subscriptions.send :find, 'identifier' => identifier end + + def setup_connection + env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + @connection = Connection.new(@server, env) + + @subscriptions = ActionCable::Connection::Subscriptions.new(@connection) + end end diff --git a/test/stubs/test_server.rb b/test/stubs/test_server.rb index 2a7ac3e927..f9168f9b78 100644 --- a/test/stubs/test_server.rb +++ b/test/stubs/test_server.rb @@ -9,4 +9,7 @@ class TestServer @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) @config = OpenStruct.new(log_tags: []) end + + def send_async + end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 5640178f34..935e50e900 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -8,6 +8,7 @@ Bundler.setup Bundler.require :default, :test require 'puma' +require 'em-hiredis' require 'mocha/mini_test' require 'rack/mock' @@ -18,4 +19,29 @@ ActiveSupport.test_order = :sorted # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } +require 'celluloid' +$CELLULOID_DEBUG = false +$CELLULOID_TEST = false Celluloid.logger = Logger.new(StringIO.new) + +require 'faye/websocket' +class << Faye::WebSocket + remove_method :ensure_reactor_running + + # We don't want Faye to start the EM reactor in tests because it makes testing much harder. + # We want to be able to start and stop EM loop in tests to make things simpler. + def ensure_reactor_running + # no-op + end +end + +class ActionCable::TestCase < ActiveSupport::TestCase + def run_in_eventmachine + EM.run do + yield + + EM.run_deferred_callbacks + EM.stop + end + end +end diff --git a/test/worker_test.rb b/test/worker_test.rb index e1fa6f561b..69c4b6529d 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -11,6 +11,9 @@ class WorkerTest < ActiveSupport::TestCase def process(message) @last_action = [ :process, message ] end + + def connection + end end setup do |