aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/CHANGELOG.md9
-rw-r--r--actioncable/MIT-LICENSE2
-rw-r--r--actioncable/README.md78
-rw-r--r--actioncable/actioncable.gemspec9
-rw-r--r--actioncable/lib/action_cable.rb3
-rw-r--r--actioncable/lib/action_cable/channel/base.rb4
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb2
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb6
-rw-r--r--actioncable/lib/action_cable/connection/base.rb4
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb12
-rw-r--r--actioncable/lib/action_cable/engine.rb6
-rw-r--r--actioncable/lib/action_cable/gem_version.rb2
-rw-r--r--actioncable/lib/action_cable/process/logging.rb3
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb2
-rw-r--r--actioncable/lib/action_cable/server/base.rb22
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb9
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb30
-rw-r--r--actioncable/lib/action_cable/server/worker.rb55
-rw-r--r--actioncable/lib/action_cable/subscription_adapter.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb28
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb26
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb101
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb42
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb53
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/connection.coffee3
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee5
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/consumer.coffee6
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee14
-rw-r--r--actioncable/test/channel/stream_test.rb10
-rw-r--r--actioncable/test/connection/authorization_test.rb1
-rw-r--r--actioncable/test/connection/base_test.rb1
-rw-r--r--actioncable/test/connection/cross_site_forgery_test.rb1
-rw-r--r--actioncable/test/connection/identifier_test.rb8
-rw-r--r--actioncable/test/connection/string_identifier_test.rb1
-rw-r--r--actioncable/test/connection/subscriptions_test.rb1
-rw-r--r--actioncable/test/stubs/test_adapter.rb10
-rw-r--r--actioncable/test/stubs/test_connection.rb4
-rw-r--r--actioncable/test/stubs/test_server.rb6
-rw-r--r--actioncable/test/subscription_adapter/async_test.rb17
-rw-r--r--actioncable/test/subscription_adapter/base_test.rb73
-rw-r--r--actioncable/test/subscription_adapter/common.rb142
-rw-r--r--actioncable/test/subscription_adapter/inline_test.rb17
-rw-r--r--actioncable/test/subscription_adapter/postgresql_test.rb32
-rw-r--r--actioncable/test/subscription_adapter/redis_test.rb10
-rw-r--r--actioncable/test/test_helper.rb10
-rw-r--r--actioncable/test/worker_test.rb7
48 files changed, 760 insertions, 159 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index 22126d82b7..a33b7b6b4b 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,12 @@
+* Create notion of an `ActionCable::SubscriptionAdapter`.
+ Separate out Redis functionality into
+ `ActionCable::SubscriptionAdapter::Redis`, and add a
+ PostgreSQL adapter as well. Configuration file for
+ ActionCable was changed from`config/redis/cable.yml` to
+ `config/cable.yml`.
+
+ *Jon Moss*
+
## Rails 5.0.0.beta1 (December 18, 2015) ##
* Added to Rails!
diff --git a/actioncable/MIT-LICENSE b/actioncable/MIT-LICENSE
index a4910677eb..27a17cf41b 100644
--- a/actioncable/MIT-LICENSE
+++ b/actioncable/MIT-LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2015 Basecamp, LLC
+Copyright (c) 2015-2016 Basecamp, LLC
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
diff --git a/actioncable/README.md b/actioncable/README.md
index c7420d48bc..cad71ddf94 100644
--- a/actioncable/README.md
+++ b/actioncable/README.md
@@ -66,6 +66,13 @@ end
Here `identified_by` is a connection identifier that can be used to find the specific connection again or later.
Note that anything marked as an identifier will automatically create a delegate by the same name on any channel instances created off the connection.
+This relies on the fact that you will already have handled authentication of the user, and
+that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
+automatically sent to the connection instance when a new connection is attempted, and you
+use that to set the `current_user`. By identifying the connection by this same current_user,
+you're also ensuring that you can later retrieve all open connections by a given user (and
+potentially disconnect them all if the user is deleted or deauthorized).
+
Then you should define your `ApplicationCable::Channel` class in Ruby. This is the place where you put
shared logic between your channels.
@@ -77,13 +84,6 @@ module ApplicationCable
end
```
-This relies on the fact that you will already have handled authentication of the user, and
-that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
-automatically sent to the connection instance when a new connection is attempted, and you
-use that to set the `current_user`. By identifying the connection by this same current_user,
-you're also ensuring that you can later retrieve all open connections by a given user (and
-potentially disconnect them all if the user is deleted or deauthorized).
-
The client-side needs to setup a consumer instance of this connection. That's done like so:
```coffeescript
@@ -298,11 +298,12 @@ See the [rails/actioncable-examples](http://github.com/rails/actioncable-example
## Configuration
-Action Cable has two required configurations: the Redis connection and specifying allowed request origins.
+Action Cable has three required configurations: the Redis connection, allowed request origins, and the cable server url (which can optionally be set on the client side).
### Redis
-By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/redis/cable.yml')`. The file must follow the following format:
+By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/cable.yml')`.
+This file must specify a Redis url for each Rails environment. It may use the following format:
```yaml
production: &production
@@ -312,11 +313,10 @@ development: &development
test: *development
```
-This format allows you to specify one configuration per Rails environment. You can also change the location of the Redis config file in
-a Rails initializer with something like:
+You can also change the location of the Redis config file in a Rails initializer with something like:
```ruby
-Rails.application.paths.add "config/redis/cable", with: "somewhere/else/cable.yml"
+Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml"
```
### Allowed Request Origins
@@ -327,29 +327,31 @@ Action Cable will only accept requests from specified origins, which are passed
ActionCable.server.config.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/]
```
+When running in the development environment, this defaults to "http://localhost:3000".
+
To disable and allow requests from any origin:
```ruby
ActionCable.server.config.disable_request_forgery_protection = true
```
-By default, Action Cable allows all requests from localhost:3000 when running in the development environment.
+### Consumer Configuration
-### Other Configurations
+Once you have decided how to run your cable server (see below), you must provide the server url (or path) to your client-side setup.
+There are two ways you can do this.
-The other common option to configure is the log tags applied to the per-connection logger. Here's close to what we're using in Basecamp:
+The first is to simply pass it in when creating your consumer. For a standalone server,
+this would be something like: `App.cable = ActionCable.createConsumer("ws://example.com:28080")`, and for an in-app server,
+something like: `App.cable = ActionCable.createConsumer("/cable")`.
-```ruby
-ActionCable.server.config.log_tags = [
- -> request { request.env['bc.account_id'] || "no-account" },
- :action_cable,
- -> request { request.uuid }
-]
-```
+The second option is to pass the server url through the `action_cable_meta_tag` in your layout.
+This uses a url or path typically set via `config.action_cable.url` in the environment configuration files, or defaults to "/cable".
-Your websocket url might change between environments. If you host your production server via https, you will need to use the wss scheme
+This method is especially useful if your websocket url might change between environments. If you host your production server via https, you will need to use the wss scheme
for your ActionCable server, but development might remain http and use the ws scheme. You might use localhost in development and your
-domain in production. In any case, to vary the websocket url between environments, add the following configuration to each environment:
+domain in production.
+
+In any case, to vary the websocket url between environments, add the following configuration to each environment:
```ruby
config.action_cable.url = "ws://example.com:28080"
@@ -367,6 +369,18 @@ And finally, create your consumer like so:
App.cable = ActionCable.createConsumer()
```
+### Other Configurations
+
+The other common option to configure is the log tags applied to the per-connection logger. Here's close to what we're using in Basecamp:
+
+```ruby
+ActionCable.server.config.log_tags = [
+ -> request { request.env['bc.account_id'] || "no-account" },
+ :action_cable,
+ -> request { request.uuid }
+]
+```
+
For a full list of all configuration options, see the `ActionCable::Server::Configuration` class.
Also note that your server must provide at least the same number of database connections as you have workers. The default worker pool is set to 100, so that means you have to make at least that available. You can change that in `config/database.yml` through the `pool` attribute.
@@ -394,8 +408,7 @@ Then you start the server using a binstub in bin/cable ala:
bundle exec puma -p 28080 cable/config.ru
```
-The above will start a cable server on port 28080. Remember to point your client-side setup against that using something like:
-`App.cable = ActionCable.createConsumer("ws://basecamp.dev:28080")`.
+The above will start a cable server on port 28080.
### In app
@@ -408,8 +421,6 @@ Example::Application.routes.draw do
end
```
-You can use `App.cable = ActionCable.createConsumer()` to connect to the cable server if `action_cable_meta_tag` is included in the layout. A custom path is specified as first argument to `createConsumer` (e.g. `App.cable = ActionCable.createConsumer("/websocket")`).
-
For every instance of your server you create and for every worker your server spawns, you will also have a new instance of ActionCable, but the use of Redis keeps messages synced across connections.
### Notes
@@ -427,16 +438,15 @@ messages back and forth over the WebSocket cable connection. This dependency may
be alleviated in the future, but for the moment that's what it is. So be sure to have
Redis installed and running.
-The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid).
+The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).
## Deployment
-Action Cable is powered by a combination of EventMachine and threads. The
-framework plumbing needed for connection handling is handled in the
-EventMachine loop, but the actual channel, user-specified, work is handled
-in a normal Ruby thread. This means you can use all your regular Rails models
-with no problem, as long as you haven't committed any thread-safety sins.
+Action Cable is powered by a combination of websockets and threads. All of the
+connection management is handled internally by utilizing Ruby’s native thread
+support, which means you can use all your regular Rails models with no problems
+as long as you haven’t committed any thread-safety sins.
But this also means that Action Cable needs to run in its own server process.
So you'll have one set of server processes for your normal web work, and another
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index 74c21bd24d..a36acc8f6f 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -21,12 +21,13 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
+ s.add_dependency 'eventmachine', '~> 1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
- s.add_dependency 'celluloid', '~> 0.17.2'
- s.add_dependency 'em-hiredis', '~> 0.3.0'
- s.add_dependency 'redis', '~> 3.0'
- s.add_development_dependency 'puma'
+ s.add_development_dependency 'em-hiredis', '~> 0.3.0'
s.add_development_dependency 'mocha'
+ s.add_development_dependency 'pg'
+ s.add_development_dependency 'puma'
+ s.add_development_dependency 'redis', '~> 3.0'
end
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb
index f27698765c..1dc66ef3ad 100644
--- a/actioncable/lib/action_cable.rb
+++ b/actioncable/lib/action_cable.rb
@@ -1,5 +1,5 @@
#--
-# Copyright (c) 2015 Basecamp, LLC
+# Copyright (c) 2015-2016 Basecamp, LLC
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@@ -47,4 +47,5 @@ module ActionCable
autoload :Connection
autoload :Channel
autoload :RemoteConnections
+ autoload :SubscriptionAdapter
end
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index ce9d62635c..88cdc1cab1 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -133,8 +133,8 @@ module ActionCable
@identifier = identifier
@params = params
- # When a channel is streaming via redis pubsub, we want to delay the confirmation
- # transmission until redis pubsub subscription is confirmed.
+ # When a channel is streaming via pubsub, we want to delay the confirmation
+ # transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false
@reject_subscription = nil
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 25fe8e5e54..7f0fb37afc 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -28,7 +28,7 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
- connection.worker_pool.async.run_periodic_timer(self, callback)
+ connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index b5ffa17f72..e2876ef6fa 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,10 +76,10 @@ module ActionCable
streams << [ broadcasting, callback ]
EM.next_tick do
- pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
- end
+ end)
end
end
@@ -92,7 +92,7 @@ module ActionCable
def stop_all_streams
streams.each do |broadcasting, callback|
- pubsub.unsubscribe_proc broadcasting, callback
+ pubsub.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 977856d656..bb8850aaa0 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -60,7 +60,7 @@ module ActionCable
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
- @_internal_redis_subscriptions = nil
+ @_internal_subscriptions = nil
@started_at = Time.now
end
@@ -103,7 +103,7 @@ module ActionCable
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
- worker_pool.async.invoke(self, method, *arguments)
+ worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
index 2d75ff8d6d..885ff3f102 100644
--- a/actioncable/lib/action_cable/connection/identification.rb
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -11,7 +11,7 @@ module ActionCable
end
class_methods do
- # Mark a key as being a connection identifier index that can then used to find the specific connection again later.
+ # Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
# Common identifiers are current_user and current_account, but could be anything really.
#
# Note that anything marked as an identifier will automatically create a delegate by the same name on any
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index c065a24ab7..54ed7672d2 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -5,24 +5,24 @@ module ActionCable
extend ActiveSupport::Concern
private
- def internal_redis_channel
+ def internal_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
- @_internal_redis_subscriptions ||= []
- @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+ @_internal_subscriptions ||= []
+ @_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
+ EM.next_tick { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
- if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ if @_internal_subscriptions.present?
+ @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index 2d3caa5b0a..f5e233e091 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -24,11 +24,11 @@ module ActionCable
options = app.config.action_cable
options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
- app.paths.add "config/redis/cable", with: "config/redis/cable.yml"
+ app.paths.add "config/cable", with: "config/cable.yml"
ActiveSupport.on_load(:action_cable) do
- if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist?
- self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access
+ if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
+ self.cable = Rails.application.config_for(config_path).with_indifferent_access
end
options.each { |k,v| send("#{k}=", v) }
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
index b1286aea6f..c652fb91ae 100644
--- a/actioncable/lib/action_cable/gem_version.rb
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -8,7 +8,7 @@ module ActionCable
MAJOR = 5
MINOR = 0
TINY = 0
- PRE = "beta1"
+ PRE = "beta1.1"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb
index 72b1a080d1..dce637b3ca 100644
--- a/actioncable/lib/action_cable/process/logging.rb
+++ b/actioncable/lib/action_cable/process/logging.rb
@@ -1,10 +1,7 @@
require 'action_cable/server'
require 'eventmachine'
-require 'celluloid'
EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
end
-
-Celluloid.logger = ActionCable.server.logger
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index 1230d905ad..aa2fc95d2f 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -39,7 +39,7 @@ module ActionCable
# Uses the internal channel to disconnect the connection.
def disconnect
- server.broadcast internal_redis_channel, type: 'disconnect'
+ server.broadcast internal_channel, type: 'disconnect'
end
# Returns all the identifiers that were applied to this connection.
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 740e4b301e..3385a4c9f3 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,8 +1,3 @@
-# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10
-require 'celluloid/current'
-
-require 'em-hiredis'
-
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -39,7 +34,7 @@ module ActionCable
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
- @worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
+ @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
end
# Requires and returns a hash of all the channel class constants keyed by name.
@@ -50,20 +45,9 @@ module ActionCable
end
end
- # The redis pubsub adapter used for all streams/broadcasting.
+ # Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= redis.pubsub
- end
-
- # The EventMachine Redis instance used by the pubsub adapter.
- def redis
- @redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- logger.info "[ActionCable] Redis reconnect failed."
- # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
- # @connections.map &:close
- end
- end
+ @pubsub ||= config.pubsub_adapter.new(self)
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index c759239a0e..4a26ed9269 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -1,5 +1,3 @@
-require 'redis'
-
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
@@ -31,11 +29,6 @@ module ActionCable
Broadcaster.new(self, broadcasting)
end
- # The redis instance used for broadcasting. Not intended for direct user use.
- def broadcasting_redis
- @broadcasting_redis ||= Redis.new(config.redis)
- end
-
private
class Broadcaster
attr_reader :server, :broadcasting
@@ -46,7 +39,7 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
+ server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 935133cbba..9a248933c4 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -5,9 +5,9 @@ module ActionCable
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
- attr_accessor :redis, :channels_path
+ attr_accessor :channel_load_paths
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
- attr_accessor :url
+ attr_accessor :cable, :url
def initialize
@log_tags = []
@@ -15,13 +15,15 @@ module ActionCable
@connection_class = ApplicationCable::Connection
@worker_pool_size = 100
- @channels_path = Rails.root.join('app/channels')
+ @channel_load_paths = [Rails.root.join('app/channels')]
@disable_request_forgery_protection = false
end
def channel_paths
- @channels ||= Dir["#{channels_path}/**/*_channel.rb"]
+ @channel_paths ||= channel_load_paths.flat_map do |path|
+ Dir["#{path}/**/*_channel.rb"]
+ end
end
def channel_class_names
@@ -29,7 +31,25 @@ module ActionCable
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end
+
+ # Returns constant of subscription adapter specified in config/cable.yml.
+ # If the adapter cannot be found, this will default to the Redis adapter.
+ # Also makes sure proper dependencies are required.
+ def pubsub_adapter
+ adapter = (cable.fetch('adapter') { 'redis' })
+ path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
+ begin
+ require path_to_adapter
+ rescue Gem::LoadError => e
+ raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)."
+ rescue LoadError => e
+ raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace
+ end
+
+ adapter = adapter.camelize
+ adapter = 'PostgreSQL' if adapter == 'Postgresql'
+ "ActionCable::SubscriptionAdapter::#{adapter}".constantize
+ end
end
end
end
-
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index e063b2a2e1..3b6c6d44a1 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -1,39 +1,68 @@
-require 'celluloid'
require 'active_support/callbacks'
+require 'active_support/core_ext/module/attribute_accessors_per_thread'
+require 'concurrent'
module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use.
class Worker
include ActiveSupport::Callbacks
- include Celluloid
- attr_reader :connection
+ thread_mattr_accessor :connection
define_callbacks :work
include ActiveRecordConnectionManagement
+ def initialize(max_size: 5)
+ @pool = Concurrent::ThreadPoolExecutor.new(
+ min_threads: 1,
+ max_threads: max_size,
+ max_queue: 0,
+ )
+ end
+
+ def async_invoke(receiver, method, *args)
+ @pool.post do
+ invoke(receiver, method, *args)
+ end
+ end
+
def invoke(receiver, method, *args)
- @connection = receiver
+ begin
+ self.connection = receiver
- run_callbacks :work do
- receiver.send method, *args
+ run_callbacks :work do
+ receiver.send method, *args
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ ensure
+ self.connection = nil
end
- rescue Exception => e
- logger.error "There was an exception - #{e.class}(#{e.message})"
- logger.error e.backtrace.join("\n")
+ end
- receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ def async_run_periodic_timer(channel, callback)
+ @pool.post do
+ run_periodic_timer(channel, callback)
+ end
end
def run_periodic_timer(channel, callback)
- @connection = channel.connection
+ begin
+ self.connection = channel.connection
- run_callbacks :work do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ run_callbacks :work do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ end
+ ensure
+ self.connection = nil
end
end
private
+
def logger
ActionCable.server.logger
end
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
new file mode 100644
index 0000000000..72e62f3daf
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -0,0 +1,8 @@
+module ActionCable
+ module SubscriptionAdapter
+ extend ActiveSupport::Autoload
+
+ autoload :Base
+ autoload :SubscriberMap
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..85d4892e4c
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def subscriber_map
+ @subscriber_map ||= AsyncSubscriberMap.new
+ end
+
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ ::EM.next_tick { super }
+ end
+
+ def invoke_callback(*)
+ ::EM.next_tick { super }
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
new file mode 100644
index 0000000000..796db5ffa3
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -0,0 +1,28 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Base
+ attr_reader :logger, :server
+
+ def initialize(server)
+ @server = server
+ @logger = @server.logger
+ end
+
+ def broadcast(channel, payload)
+ raise NotImplementedError
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+
+ def unsubscribe(channel, message_callback)
+ raise NotImplementedError
+ end
+
+ def shutdown
+ raise NotImplementedError
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..4a2a8d23a2
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,26 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ # nothing to do
+ end
+
+ private
+ def subscriber_map
+ @subscriber_map ||= SubscriberMap.new
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
new file mode 100644
index 0000000000..78f8aeb599
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -0,0 +1,101 @@
+gem 'pg', '~> 0.18'
+require 'pg'
+require 'thread'
+
+module ActionCable
+ module SubscriptionAdapter
+ class PostgreSQL < Base # :nodoc:
+ def broadcast(channel, payload)
+ with_connection do |pg_conn|
+ pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
+ end
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ listener.shutdown
+ end
+
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ yield pg_conn
+ end
+ end
+
+ private
+ def listener
+ @listener ||= Listener.new(self)
+ end
+
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+
+ @adapter = adapter
+ @queue = Queue.new
+
+ @thread = Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
+
+ def listen
+ @adapter.with_connection do |pg_conn|
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ ::EM.next_tick(&callback) if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
+ end
+
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
+ end
+ end
+ end
+ end
+ end
+
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
+
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
+ end
+
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
+
+ def invoke_callback(*)
+ ::EM.next_tick { super }
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
new file mode 100644
index 0000000000..3b86354621
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -0,0 +1,42 @@
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Redis < Base # :nodoc:
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
+ private
+ def redis_connection_for_subscriptions
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+
+ @subscribers[channel] << subscriber
+
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+
+ def remove_channel(channel)
+ end
+
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/connection.coffee
index 2f69a9b26c..fbd7dbd35b 100644
--- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee
+++ b/actioncable/lib/assets/javascripts/action_cable/connection.coffee
@@ -79,6 +79,3 @@ class ActionCable.Connection
return if @disconnected
@disconnected = true
@consumer.subscriptions.notifyAll("disconnected")
-
- toJSON: ->
- state: @getState()
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
index b594802be1..99b9a1c6d5 100644
--- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
+++ b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
@@ -69,11 +69,6 @@ class ActionCable.ConnectionMonitor
@consumer.connection.reopen()
, 200
- toJSON: ->
- interval = @getInterval()
- connectionIsStale = @connectionIsStale()
- {@startedAt, @stoppedAt, @pingedAt, @reconnectAttempts, connectionIsStale, interval}
-
now = ->
new Date().getTime()
diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
index 5cf8978d77..fcd8d0fb6c 100644
--- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
+++ b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
@@ -23,9 +23,3 @@ class ActionCable.Consumer
send: (data) ->
@connection.send(data)
-
- inspect: ->
- JSON.stringify(this, null, 2)
-
- toJSON: ->
- {@url, @subscriptions, @connection, @connectionMonitor}
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
index 0316f76a24..ae041ffa2b 100644
--- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
+++ b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
@@ -9,7 +9,6 @@
class ActionCable.Subscriptions
constructor: (@consumer) ->
@subscriptions = []
- @history = []
create: (channelName, mixin) ->
channel = channelName
@@ -57,22 +56,9 @@ class ActionCable.Subscriptions
for subscription in subscriptions
subscription[callbackName]?(args...)
- if callbackName in ["initialized", "connected", "disconnected", "rejected"]
- {identifier} = subscription
- @record(notification: {identifier, callbackName, args})
-
sendCommand: (subscription, command) ->
{identifier} = subscription
if identifier is ActionCable.INTERNAL.identifiers.ping
@consumer.connection.isOpen()
else
@consumer.send({command, identifier})
-
- record: (data) ->
- data.time = new Date()
- @history = @history.slice(-19)
- @history.push(data)
-
- toJSON: ->
- history: @history
- identifiers: (subscription.identifier for subscription in @subscriptions)
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index 1424ded04c..3fa2b291b7 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -20,10 +20,10 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "streaming start and stop" do
run_in_eventmachine do
connection = TestConnection.new
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
end
end
@@ -32,7 +32,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
run_in_eventmachine do
connection = TestConnection.new
EM.next_tick do
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
end
channel = ChatChannel.new connection, ""
@@ -43,13 +43,14 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "stream_from subscription confirmation" do
EM.run do
connection = TestConnection.new
- connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub
ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
EM::Timer.new(0.1) do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ connection.transmit(expected)
+
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
EM.run_deferred_callbacks
@@ -61,7 +62,6 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "subscription confirmation should only be sent out once" do
EM.run do
connection = TestConnection.new
- connection.stubs(:pubsub).returns EM::Hiredis.connect.pubsub
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
diff --git a/actioncable/test/connection/authorization_test.rb b/actioncable/test/connection/authorization_test.rb
index 68668b2835..87d0e79ef3 100644
--- a/actioncable/test/connection/authorization_test.rb
+++ b/actioncable/test/connection/authorization_test.rb
@@ -10,7 +10,6 @@ class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase
end
def send_async(method, *args)
- # Bypass Celluloid
send method, *args
end
end
diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb
index da6041db4a..182562db82 100644
--- a/actioncable/test/connection/base_test.rb
+++ b/actioncable/test/connection/base_test.rb
@@ -14,7 +14,6 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
end
def send_async(method, *args)
- # Bypass Celluloid
send method, *args
end
end
diff --git a/actioncable/test/connection/cross_site_forgery_test.rb b/actioncable/test/connection/cross_site_forgery_test.rb
index d445e08f2a..a29f65fb97 100644
--- a/actioncable/test/connection/cross_site_forgery_test.rb
+++ b/actioncable/test/connection/cross_site_forgery_test.rb
@@ -6,7 +6,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
def send_async(method, *args)
- # Bypass Celluloid
send method, *args
end
end
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index 02e6b21845..a110dfdee0 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -23,9 +23,9 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
test "should subscribe to internal channel on open and unsubscribe on close" do
run_in_eventmachine do
- pubsub = mock('pubsub')
- pubsub.expects(:subscribe).with('action_cable/User#lifo')
- pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub = mock('pubsub_adapter')
+ pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc))
server = TestServer.new
server.stubs(:pubsub).returns(pubsub)
@@ -58,7 +58,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
protected
def open_connection_with_stubbed_pubsub
server = TestServer.new
- server.stubs(:pubsub).returns(stub_everything('pubsub'))
+ server.stubs(:adapter).returns(stub_everything('adapter'))
open_connection server: server
end
diff --git a/actioncable/test/connection/string_identifier_test.rb b/actioncable/test/connection/string_identifier_test.rb
index ab69df57b3..9d0bda83ef 100644
--- a/actioncable/test/connection/string_identifier_test.rb
+++ b/actioncable/test/connection/string_identifier_test.rb
@@ -10,7 +10,6 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
end
def send_async(method, *args)
- # Bypass Celluloid
send method, *args
end
end
diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb
index 4f6760827e..62e41484fe 100644
--- a/actioncable/test/connection/subscriptions_test.rb
+++ b/actioncable/test/connection/subscriptions_test.rb
@@ -5,7 +5,6 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
attr_reader :websocket
def send_async(method, *args)
- # Bypass Celluloid
send method, *args
end
end
diff --git a/actioncable/test/stubs/test_adapter.rb b/actioncable/test/stubs/test_adapter.rb
new file mode 100644
index 0000000000..bbd142b287
--- /dev/null
+++ b/actioncable/test/stubs/test_adapter.rb
@@ -0,0 +1,10 @@
+class SuccessAdapter < ActionCable::SubscriptionAdapter::Base
+ def broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ end
+
+ def unsubscribe(channel, callback)
+ end
+end
diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb
index 384abc5e76..da98201900 100644
--- a/actioncable/test/stubs/test_connection.rb
+++ b/actioncable/test/stubs/test_connection.rb
@@ -11,6 +11,10 @@ class TestConnection
@transmissions = []
end
+ def pubsub
+ SuccessAdapter.new(TestServer.new)
+ end
+
def transmit(data)
@transmissions << data
end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index f9168f9b78..6e6541a952 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -7,7 +7,11 @@ class TestServer
def initialize
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
- @config = OpenStruct.new(log_tags: [])
+ @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
+ end
+
+ def pubsub
+ @config.subscription_adapter.new(self)
end
def send_async
diff --git a/actioncable/test/subscription_adapter/async_test.rb b/actioncable/test/subscription_adapter/async_test.rb
new file mode 100644
index 0000000000..8f413f14c2
--- /dev/null
+++ b/actioncable/test/subscription_adapter/async_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+
+class AsyncAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ super
+
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+
+ def cable_config
+ { adapter: 'async' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/base_test.rb b/actioncable/test/subscription_adapter/base_test.rb
new file mode 100644
index 0000000000..7a7ae131e6
--- /dev/null
+++ b/actioncable/test/subscription_adapter/base_test.rb
@@ -0,0 +1,73 @@
+require 'test_helper'
+require 'stubs/test_server'
+
+class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase
+ ## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS
+
+ class BrokenAdapter < ActionCable::SubscriptionAdapter::Base
+ end
+
+ setup do
+ @server = TestServer.new
+ @server.config.subscription_adapter = BrokenAdapter
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ end
+
+ test "#broadcast returns NotImplementedError by default" do
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).broadcast('channel', 'payload')
+ end
+ end
+
+ test "#subscribe returns NotImplementedError by default" do
+ callback = lambda { puts 'callback' }
+ success_callback = lambda { puts 'success' }
+
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).subscribe('channel', callback, success_callback)
+ end
+ end
+
+ test "#unsubscribe returns NotImplementedError by default" do
+ callback = lambda { puts 'callback' }
+
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).unsubscribe('channel', callback)
+ end
+ end
+
+ # TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT
+
+ test "#broadcast is implemented" do
+ broadcast = SuccessAdapter.new(@server).broadcast('channel', 'payload')
+
+ assert_respond_to(SuccessAdapter.new(@server), :broadcast)
+
+ assert_nothing_raised NotImplementedError do
+ broadcast
+ end
+ end
+
+ test "#subscribe is implemented" do
+ callback = lambda { puts 'callback' }
+ success_callback = lambda { puts 'success' }
+ subscribe = SuccessAdapter.new(@server).subscribe('channel', callback, success_callback)
+
+ assert_respond_to(SuccessAdapter.new(@server), :subscribe)
+
+ assert_nothing_raised NotImplementedError do
+ subscribe
+ end
+ end
+
+ test "#unsubscribe is implemented" do
+ callback = lambda { puts 'callback' }
+ unsubscribe = SuccessAdapter.new(@server).unsubscribe('channel', callback)
+
+ assert_respond_to(SuccessAdapter.new(@server), :unsubscribe)
+
+ assert_nothing_raised NotImplementedError do
+ unsubscribe
+ end
+ end
+end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
new file mode 100644
index 0000000000..d4a13be889
--- /dev/null
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -0,0 +1,142 @@
+require 'test_helper'
+require 'concurrent'
+
+require 'action_cable/process/logging'
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+
+module CommonSubscriptionAdapterTest
+ WAIT_WHEN_EXPECTING_EVENT = 3
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+
+ server = ActionCable::Server::Base.new
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+
+
+ # and now the "real" setup for our test:
+ spawn_eventmachine
+
+ server.config.cable = cable_config.with_indifferent_access
+
+ adapter_klass = server.config.pubsub_adapter
+
+ @rx_adapter = adapter_klass.new(server)
+ @tx_adapter = adapter_klass.new(server)
+ end
+
+ def teardown
+ @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
+ @rx_adapter.shutdown if @rx_adapter
+
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+
+
+ def subscribe_as_queue(channel, adapter = @rx_adapter)
+ queue = Queue.new
+
+ callback = -> data { queue << data }
+ subscribed = Concurrent::Event.new
+ adapter.subscribe(channel, callback, Proc.new { subscribed.set })
+ subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ assert subscribed.set?
+
+ yield queue
+
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+ assert_empty queue
+ ensure
+ adapter.unsubscribe(channel, callback) if subscribed.set?
+ end
+
+
+ def test_subscribe_and_unsubscribe
+ subscribe_as_queue('channel') do |queue|
+ end
+ end
+
+ def test_basic_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'hello world')
+
+ assert_equal 'hello world', queue.pop
+ end
+ end
+
+ def test_broadcast_after_unsubscribe
+ keep_queue = nil
+ subscribe_as_queue('channel') do |queue|
+ keep_queue = queue
+
+ @tx_adapter.broadcast('channel', 'hello world')
+
+ assert_equal 'hello world', queue.pop
+ end
+
+ @tx_adapter.broadcast('channel', 'hello void')
+
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+ assert_empty keep_queue
+ end
+
+ def test_multiple_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'bananas')
+ @tx_adapter.broadcast('channel', 'apples')
+
+ received = []
+ 2.times { received << queue.pop }
+ assert_equal ['apples', 'bananas'], received.sort
+ end
+ end
+
+ def test_identical_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'hello')
+
+ assert_equal 'hello', queue_2.pop
+ end
+
+ assert_equal 'hello', queue.pop
+ end
+ end
+
+ def test_simultaneous_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('other channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'apples')
+ @tx_adapter.broadcast('other channel', 'oranges')
+
+ assert_equal 'apples', queue.pop
+ assert_equal 'oranges', queue_2.pop
+ end
+ end
+ end
+
+ def test_channel_filtered_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('other channel', 'one')
+ @tx_adapter.broadcast('channel', 'two')
+
+ assert_equal 'two', queue.pop
+ end
+ end
+end
diff --git a/actioncable/test/subscription_adapter/inline_test.rb b/actioncable/test/subscription_adapter/inline_test.rb
new file mode 100644
index 0000000000..75ea51e6b3
--- /dev/null
+++ b/actioncable/test/subscription_adapter/inline_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+
+class InlineAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ super
+
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+
+ def cable_config
+ { adapter: 'inline' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb
new file mode 100644
index 0000000000..64c632b0cd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/postgresql_test.rb
@@ -0,0 +1,32 @@
+require 'test_helper'
+require_relative './common'
+
+require 'active_record'
+
+class PostgresqlAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ database_config = { 'adapter' => 'postgresql', 'database' => 'activerecord_unittest' }
+ ar_tests = File.expand_path('../../../activerecord/test', __dir__)
+ if Dir.exist?(ar_tests)
+ require File.join(ar_tests, 'config')
+ require File.join(ar_tests, 'support/config')
+ local_config = ARTest.config['arunit']
+ database_config.update local_config if local_config
+ end
+ ActiveRecord::Base.establish_connection database_config
+
+ super
+ end
+
+ def teardown
+ super
+
+ ActiveRecord::Base.clear_all_connections!
+ end
+
+ def cable_config
+ { adapter: 'postgresql' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb
new file mode 100644
index 0000000000..8d52832c87
--- /dev/null
+++ b/actioncable/test/subscription_adapter/redis_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+require_relative './common'
+
+class RedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def cable_config
+ { adapter: 'redis', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 12dcd98402..6636ce078b 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -5,7 +5,6 @@ require 'active_support/testing/autorun'
require 'puma'
-require 'em-hiredis'
require 'mocha/setup'
@@ -14,11 +13,6 @@ require 'rack/mock'
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
-$CELLULOID_DEBUG = false
-$CELLULOID_TEST = false
-require 'celluloid'
-Celluloid.logger = Logger.new(StringIO.new)
-
require 'faye/websocket'
class << Faye::WebSocket
remove_method :ensure_reactor_running
@@ -39,4 +33,8 @@ class ActionCable::TestCase < ActiveSupport::TestCase
EM.stop
end
end
+
+ def spawn_eventmachine
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ end
end
diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb
index 69c4b6529d..4a699cde27 100644
--- a/actioncable/test/worker_test.rb
+++ b/actioncable/test/worker_test.rb
@@ -13,12 +13,15 @@ class WorkerTest < ActiveSupport::TestCase
end
def connection
+ self
+ end
+
+ def logger
+ ActionCable.server.logger
end
end
setup do
- Celluloid.boot
-
@worker = ActionCable::Server::Worker.new
@receiver = Receiver.new
end