aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib
diff options
context:
space:
mode:
authorkp <keith.j.payne@gmail.com>2016-02-10 17:44:54 +0000
committerkp <keith.j.payne@gmail.com>2016-02-10 17:44:54 +0000
commitec4ae308a76bd86fb6eaf7fa8ee025af0063ee30 (patch)
tree741019b3fbf474ff4cdeefdbeb6ff1f598ffcdfc /actioncable/lib
parent8641de93eb98d4ebdb0db2530c8c79c0c4e2f95e (diff)
parent688996da7b25080a1a2ef74f5b4789f3e5eb670d (diff)
downloadrails-ec4ae308a76bd86fb6eaf7fa8ee025af0063ee30.tar.gz
rails-ec4ae308a76bd86fb6eaf7fa8ee025af0063ee30.tar.bz2
rails-ec4ae308a76bd86fb6eaf7fa8ee025af0063ee30.zip
Merge remote-tracking branch 'origin/master' into actioncable_logging
Diffstat (limited to 'actioncable/lib')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb33
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb26
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb6
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
-rw-r--r--actioncable/lib/action_cable/gem_version.rb2
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb12
-rw-r--r--actioncable/lib/action_cable/server/base.rb23
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb75
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb7
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb163
-rw-r--r--actioncable/lib/assets/javascripts/action_cable.coffee.erb23
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/connection.coffee81
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee79
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/consumer.coffee25
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/subscription.coffee68
-rw-r--r--actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee64
-rw-r--r--actioncable/lib/rails/generators/channel/templates/channel.rb2
21 files changed, 350 insertions, 446 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 88cdc1cab1..874ebe2e71 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -32,8 +32,11 @@ module ActionCable
#
# == Action processing
#
- # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can
- # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client.
+ # Unlike subclasses of ActionController::Base, channels do not follow a REST
+ # constraint form for their actions. Instead, ActionCable operates through a
+ # remote-procedure call model. You can declare any public method on the
+ # channel (optionally taking a <tt>data</tt> argument), and this method is
+ # automatically exposed as callable to the client.
#
# Example:
#
@@ -60,18 +63,22 @@ module ActionCable
# end
# end
#
- # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away
- # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then
- # uses as part of its model call. #away does not, it's simply a trigger action.
+ # In this example, subscribed/unsubscribed are not callable methods, as they
+ # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
+ # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
+ # callable as it's a private method. You'll see that appear accepts a data
+ # parameter, which it then uses as part of its model call. <tt>#away</tt>
+ # does not, since it's simply a trigger action.
#
- # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection.
- # All such identifiers will automatically create a delegation method of the same name on the channel instance.
+ # Also note that in this example, <tt>current_user</tt> is available because
+ # it was marked as an identifying attribute on the connection. All such
+ # identifiers will automatically create a delegation method of the same name
+ # on the channel instance.
#
# == Rejecting subscription requests
#
- # A channel can reject a subscription request in the #subscribed callback by invoking #reject!
- #
- # Example:
+ # A channel can reject a subscription request in the #subscribed callback by
+ # invoking the #reject method:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
@@ -80,8 +87,10 @@ module ActionCable
# end
# end
#
- # In this example, the subscription will be rejected if the current_user does not have access to the chat room.
- # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request.
+ # In this example, the subscription will be rejected if the
+ # <tt>current_user</tt> does not have access to the chat room. On the
+ # client-side, the <tt>Channel#rejected</tt> callback will get invoked when
+ # the server rejects the subscription request.
class Base
include Callbacks
include PeriodicTimers
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index a26373e387..3158f30814 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -41,22 +41,23 @@ module ActionCable
# Example below shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
- # def subscribed
- # @room = Chat::Room[params[:room_number]]
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
#
- # stream_for @room, -> (encoded_message) do
- # message = ActiveSupport::JSON.decode(encoded_message)
+ # stream_for @room, -> (encoded_message) do
+ # message = ActiveSupport::JSON.decode(encoded_message)
#
- # if message['originated_at'].present?
- # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
+ # if message['originated_at'].present?
+ # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
- # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
- # logger.info "Message took #{elapsed_time}s to arrive"
- # end
+ # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
+ # logger.info "Message took #{elapsed_time}s to arrive"
+ # end
#
- # transmit message
- # end
- # end
+ # transmit message
+ # end
+ # end
+ # end
#
# You can stop streaming from all broadcasts by calling #stop_all_streams.
module Streams
@@ -90,6 +91,7 @@ module ActionCable
stream_from(broadcasting_for([ channel_name, model ]), callback)
end
+ # Unsubscribes all streams associated with this channel from the pubsub queue.
def stop_all_streams
streams.each do |broadcasting, callback|
pubsub.unsubscribe broadcasting, callback
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index de7ff96d7b..e23789978c 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -129,7 +129,7 @@ module ActionCable
# ignore
end
- def on_close # :nodoc:
+ def on_close(reason, code) # :nodoc:
send_async :handle_close
end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index 62dd753646..ef937d7c16 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -37,6 +37,7 @@ module ActionCable
@url = ClientSocket.determine_url(@env)
@driver = @driver_started = nil
+ @close_params = ['', 1006]
@ready_state = CONNECTING
@@ -142,10 +143,7 @@ module ActionCable
return if @ready_state == CLOSED
@ready_state = CLOSED
- reason = @close_params ? @close_params[0] : ''
- code = @close_params ? @close_params[1] : 1006
-
- @event_target.on_close(code, reason)
+ @event_target.on_close(*@close_params)
end
end
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index f773814973..e6335082d2 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -1,18 +1,17 @@
require 'nio'
+require 'thread'
module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = NIO::Selector.new
+ @nio = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
- Thread.new do
- Thread.current.abort_on_exception = true
- run
- end
+ @spawn_mutex = Mutex.new
+ spawn
end
def attach(io, stream)
@@ -20,34 +19,53 @@ module ActionCable
@map[io] = stream
@nio.register(io, :r)
end
- @nio.wakeup
+ wakeup
end
def detach(io, stream)
@todo << lambda do
- @nio.deregister(io)
+ @nio.deregister io
@map.delete io
end
- @nio.wakeup
+ wakeup
end
def stop
@stopping = true
- @nio.wakeup
+ wakeup if @nio
end
- def run
- loop do
- if @stopping
- @nio.close
- break
- end
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
- until @todo.empty?
- @todo.pop(true).call
+ return true
end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
- if monitors = @nio.select
monitors.each do |monitor|
io = monitor.io
stream = @map[io]
@@ -56,13 +74,21 @@ module ActionCable
stream.receive io.read_nonblock(4096)
rescue IO::WaitReadable
next
- rescue EOFError
- stream.close
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
end
end
end
end
- end
end
end
end
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
index c652fb91ae..a71603e61a 100644
--- a/actioncable/lib/action_cable/gem_version.rb
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -8,7 +8,7 @@ module ActionCable
MAJOR = 5
MINOR = 0
TINY = 0
- PRE = "beta1.1"
+ PRE = "beta2"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index aa2fc95d2f..7ec121308a 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -1,6 +1,7 @@
module ActionCable
- # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by
- # searching the identifier declared on the connection. Example:
+ # If you need to disconnect a given connection, you can go through the
+ # RemoteConnections. You can find the connections you're looking for by
+ # searching for the identifier declared on the connection. For example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
@@ -11,8 +12,9 @@ module ActionCable
#
# ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
#
- # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses
- # the internal channel that all these servers are subscribed to).
+ # This will disconnect all the connections established for
+ # <tt>User.find(1)</tt> across all servers running on all machines, because
+ # it uses the internal channel that all these servers are subscribed to.
class RemoteConnections
attr_reader :server
@@ -25,7 +27,7 @@ module ActionCable
end
private
- # Represents a single remote connection found via ActionCable.server.remote_connections.where(*).
+ # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>.
# Exists for the solely for the purpose of calling #disconnect on that connection.
class RemoteConnection
class InvalidIdentifiersError < StandardError; end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index b00abd208c..fe48c112df 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'thread'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -13,7 +15,12 @@ module ActionCable
def self.logger; config.logger; end
delegate :logger, to: :config
+ attr_reader :mutex
+
def initialize
+ @mutex = Mutex.new
+
+ @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by rack to setup the server.
@@ -29,29 +36,31 @@ module ActionCable
# Gateway to RemoteConnections. See that class for details.
def remote_connections
- @remote_connections ||= RemoteConnections.new(self)
+ @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
def stream_event_loop
- @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
- @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
+ @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
# Requires and returns a hash of all the channel class constants keyed by name.
def channel_classes
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ @channel_classes || @mutex.synchronize do
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
end
end
# Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= config.pubsub_adapter.new(self)
+ @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 4a26ed9269..7e8aef45f4 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -4,19 +4,19 @@ module ActionCable
# broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
#
# class WebNotificationsChannel < ApplicationCable::Channel
- # def subscribed
- # stream_from "web_notifications_#{current_user.id}"
- # end
- # end
+ # def subscribed
+ # stream_from "web_notifications_#{current_user.id}"
+ # end
+ # end
#
- # # Somewhere in your app this is called, perhaps from a NewCommentJob
- # ActionCable.server.broadcast \
- # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' }
+ # # Somewhere in your app this is called, perhaps from a NewCommentJob
+ # ActionCable.server.broadcast \
+ # "web_notifications_1", { title: "New things!", body: "All that's fit for print" }
#
- # # Client-side coffescript, which assumes you've already requested the right to send web notifications
- # App.cable.subscriptions.create "WebNotificationsChannel",
- # received: (data) ->
- # new Notification data['title'], body: data['body']
+ # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications
+ # App.cable.subscriptions.create "WebNotificationsChannel",
+ # received: (data) ->
+ # new Notification data['title'], body: data['body']
module Broadcasting
# Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded.
def broadcast(broadcasting, message)
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index c88b03947a..cca6894289 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -4,8 +4,8 @@ module ActionCable
module SubscriptionAdapter
class Async < Inline # :nodoc:
private
- def subscriber_map
- @subscriber_map ||= AsyncSubscriberMap.new
+ def new_subscriber_map
+ AsyncSubscriberMap.new
end
class AsyncSubscriberMap < SubscriberMap
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
new file mode 100644
index 0000000000..af04a58c70
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -0,0 +1,75 @@
+require 'thread'
+
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module SubscriptionAdapter
+ class EventedRedis < Base # :nodoc:
+ @@mutex = Mutex.new
+
+ # Overwrite this factory method for EventMachine redis connections if you want to use a different Redis library than EM::Hiredis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } }
+
+ # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
+ private
+ def redis_connection_for_subscriptions
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ end
+
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
+ end
+ end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 4a2a8d23a2..81357faead 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -1,6 +1,11 @@
module ActionCable
module SubscriptionAdapter
class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+
def broadcast(channel, payload)
subscriber_map.broadcast(channel, payload)
end
@@ -19,7 +24,11 @@ module ActionCable
private
def subscriber_map
- @subscriber_map ||= SubscriberMap.new
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+
+ def new_subscriber_map
+ SubscriberMap.new
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 3ce1bbed68..abaeb92e54 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -5,6 +5,11 @@ require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
@@ -37,7 +42,7 @@ module ActionCable
private
def listener
- @listener ||= Listener.new(self)
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
class Listener < SubscriberMap
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index a035e3988d..ba4934a264 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,57 +1,166 @@
require 'thread'
-gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
-require 'em-hiredis'
require 'redis'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
- @@mutex = Mutex.new
+ # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
+ def initialize(*)
+ super
+ @listener = nil
+ @redis_connection_for_broadcasts = nil
+ end
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
- def subscribe(channel, message_callback, success_callback = nil)
- redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback { |reply| success_callback.call } if success_callback
- end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
end
- def unsubscribe(channel, message_callback)
- redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
end
def shutdown
- redis_connection_for_subscriptions.pubsub.close_connection
- @redis_connection_for_subscriptions = nil
+ @listener.shutdown if @listener
+ end
+
+ def redis_connection_for_subscriptions
+ ::Redis.new(@server.config.cable)
end
private
- def redis_connection_for_subscriptions
- ensure_reactor_running
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
- end
- end
+ def listener
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
def redis_connection_for_broadcasts
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
+ end
end
- def ensure_reactor_running
- return if EventMachine.reactor_running?
- @@mutex.synchronize do
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+
+ @adapter = adapter
+
+ @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
+ @subscription_lock = Mutex.new
+
+ @raw_client = nil
+
+ @when_connected = []
+
+ @thread = nil
+ end
+
+ def listen(conn)
+ conn.without_reconnect do
+ original_client = conn.client
+
+ conn.subscribe('_action_cable_internal') do |on|
+ on.subscribe do |chan, count|
+ @subscription_lock.synchronize do
+ if count == 1
+ @raw_client = original_client
+
+ until @when_connected.empty?
+ @when_connected.shift.call
+ end
+ end
+
+ if callbacks = @subscribe_callbacks[chan]
+ next_callback = callbacks.shift
+ Concurrent.global_io_executor << next_callback if next_callback
+ @subscribe_callbacks.delete(chan) if callbacks.empty?
+ end
+ end
+ end
+
+ on.message do |chan, message|
+ broadcast(chan, message)
+ end
+
+ on.unsubscribe do |chan, count|
+ if count == 0
+ @subscription_lock.synchronize do
+ @raw_client = nil
+ end
+ end
+ end
+ end
+ end
+ end
+
+ def shutdown
+ @subscription_lock.synchronize do
+ return if @thread.nil?
+
+ when_connected do
+ send_command('unsubscribe')
+ @raw_client = nil
+ end
+ end
+
+ Thread.pass while @thread.alive?
+ end
+
+ def add_channel(channel, on_success)
+ @subscription_lock.synchronize do
+ ensure_listener_running
+ @subscribe_callbacks[channel] << on_success
+ when_connected { send_command('subscribe', channel) }
+ end
+ end
+
+ def remove_channel(channel)
+ @subscription_lock.synchronize do
+ when_connected { send_command('unsubscribe', channel) }
+ end
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
end
+
+ private
+ def ensure_listener_running
+ @thread ||= Thread.new do
+ Thread.current.abort_on_exception = true
+
+ conn = @adapter.redis_connection_for_subscriptions
+ listen conn
+ end
+ end
+
+ def when_connected(&block)
+ if @raw_client
+ block.call
+ else
+ @when_connected << block
+ end
+ end
+
+ def send_command(*command)
+ @raw_client.write(command)
+
+ very_raw_connection =
+ @raw_client.connection.instance_variable_defined?(:@connection) &&
+ @raw_client.connection.instance_variable_get(:@connection)
+
+ if very_raw_connection && very_raw_connection.respond_to?(:flush)
+ very_raw_connection.flush
+ end
+ end
end
end
end
diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/lib/assets/javascripts/action_cable.coffee.erb
deleted file mode 100644
index 7daea4ebcd..0000000000
--- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb
+++ /dev/null
@@ -1,23 +0,0 @@
-#= require_self
-#= require action_cable/consumer
-
-@ActionCable =
- INTERNAL: <%= ActionCable::INTERNAL.to_json %>
-
- createConsumer: (url = @getConfig("url")) ->
- new ActionCable.Consumer @createWebSocketURL(url)
-
- getConfig: (name) ->
- element = document.head.querySelector("meta[name='action-cable-#{name}']")
- element?.getAttribute("content")
-
- createWebSocketURL: (url) ->
- if url and not /^wss?:/i.test(url)
- a = document.createElement("a")
- a.href = url
- # Fix populating Location properties in IE. Otherwise, protocol will be blank.
- a.href = a.href
- a.protocol = a.protocol.replace("http", "ws")
- a.href
- else
- url
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/connection.coffee
deleted file mode 100644
index fbd7dbd35b..0000000000
--- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee
+++ /dev/null
@@ -1,81 +0,0 @@
-# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation.
-
-{message_types} = ActionCable.INTERNAL
-
-class ActionCable.Connection
- @reopenDelay: 500
-
- constructor: (@consumer) ->
- @open()
-
- send: (data) ->
- if @isOpen()
- @webSocket.send(JSON.stringify(data))
- true
- else
- false
-
- open: =>
- if @webSocket and not @isState("closed")
- throw new Error("Existing connection must be closed before opening")
- else
- @webSocket = new WebSocket(@consumer.url)
- @installEventHandlers()
- true
-
- close: ->
- @webSocket?.close()
-
- reopen: ->
- if @isState("closed")
- @open()
- else
- try
- @close()
- finally
- setTimeout(@open, @constructor.reopenDelay)
-
- isOpen: ->
- @isState("open")
-
- # Private
-
- isState: (states...) ->
- @getState() in states
-
- getState: ->
- return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState
- null
-
- installEventHandlers: ->
- for eventName of @events
- handler = @events[eventName].bind(this)
- @webSocket["on#{eventName}"] = handler
- return
-
- events:
- message: (event) ->
- {identifier, message, type} = JSON.parse(event.data)
-
- switch type
- when message_types.confirmation
- @consumer.subscriptions.notify(identifier, "connected")
- when message_types.rejection
- @consumer.subscriptions.reject(identifier)
- else
- @consumer.subscriptions.notify(identifier, "received", message)
-
- open: ->
- @disconnected = false
- @consumer.subscriptions.reload()
-
- close: ->
- @disconnect()
-
- error: ->
- @disconnect()
-
- disconnect: ->
- return if @disconnected
- @disconnected = true
- @consumer.subscriptions.notifyAll("disconnected")
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
deleted file mode 100644
index 99b9a1c6d5..0000000000
--- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
+++ /dev/null
@@ -1,79 +0,0 @@
-# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting
-# revival reconnections if things go astray. Internal class, not intended for direct user manipulation.
-class ActionCable.ConnectionMonitor
- @pollInterval:
- min: 3
- max: 30
-
- @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings)
-
- identifier: ActionCable.INTERNAL.identifiers.ping
-
- constructor: (@consumer) ->
- @consumer.subscriptions.add(this)
- @start()
-
- connected: ->
- @reset()
- @pingedAt = now()
- delete @disconnectedAt
-
- disconnected: ->
- @disconnectedAt = now()
-
- received: ->
- @pingedAt = now()
-
- reset: ->
- @reconnectAttempts = 0
-
- start: ->
- @reset()
- delete @stoppedAt
- @startedAt = now()
- @poll()
- document.addEventListener("visibilitychange", @visibilityDidChange)
-
- stop: ->
- @stoppedAt = now()
- document.removeEventListener("visibilitychange", @visibilityDidChange)
-
- poll: ->
- setTimeout =>
- unless @stoppedAt
- @reconnectIfStale()
- @poll()
- , @getInterval()
-
- getInterval: ->
- {min, max} = @constructor.pollInterval
- interval = 5 * Math.log(@reconnectAttempts + 1)
- clamp(interval, min, max) * 1000
-
- reconnectIfStale: ->
- if @connectionIsStale()
- @reconnectAttempts++
- unless @disconnectedRecently()
- @consumer.connection.reopen()
-
- connectionIsStale: ->
- secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold
-
- disconnectedRecently: ->
- @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold
-
- visibilityDidChange: =>
- if document.visibilityState is "visible"
- setTimeout =>
- if @connectionIsStale() or not @consumer.connection.isOpen()
- @consumer.connection.reopen()
- , 200
-
- now = ->
- new Date().getTime()
-
- secondsSince = (time) ->
- (now() - time) / 1000
-
- clamp = (number, min, max) ->
- Math.max(min, Math.min(max, number))
diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
deleted file mode 100644
index fcd8d0fb6c..0000000000
--- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
+++ /dev/null
@@ -1,25 +0,0 @@
-#= require action_cable/connection
-#= require action_cable/connection_monitor
-#= require action_cable/subscriptions
-#= require action_cable/subscription
-
-# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established,
-# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates.
-# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription
-# method.
-#
-# The following example shows how this can be setup:
-#
-# @App = {}
-# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1"
-# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
-#
-# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription.
-class ActionCable.Consumer
- constructor: (@url) ->
- @subscriptions = new ActionCable.Subscriptions this
- @connection = new ActionCable.Connection this
- @connectionMonitor = new ActionCable.ConnectionMonitor this
-
- send: (data) ->
- @connection.send(data)
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/subscription.coffee
deleted file mode 100644
index 339d676933..0000000000
--- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee
+++ /dev/null
@@ -1,68 +0,0 @@
-# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer.
-# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding
-# Channel instance on the server side.
-#
-# An example demonstrates the basic functionality:
-#
-# App.appearance = App.cable.subscriptions.create "AppearanceChannel",
-# connected: ->
-# # Called once the subscription has been successfully completed
-#
-# appear: ->
-# @perform 'appear', appearing_on: @appearingOn()
-#
-# away: ->
-# @perform 'away'
-#
-# appearingOn: ->
-# $('main').data 'appearing-on'
-#
-# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server
-# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away).
-# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter.
-#
-# This is how the server component would look:
-#
-# class AppearanceChannel < ApplicationActionCable::Channel
-# def subscribed
-# current_user.appear
-# end
-#
-# def unsubscribed
-# current_user.disappear
-# end
-#
-# def appear(data)
-# current_user.appear on: data['appearing_on']
-# end
-#
-# def away
-# current_user.away
-# end
-# end
-#
-# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name.
-# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method.
-class ActionCable.Subscription
- constructor: (@subscriptions, params = {}, mixin) ->
- @identifier = JSON.stringify(params)
- extend(this, mixin)
- @subscriptions.add(this)
- @consumer = @subscriptions.consumer
-
- # Perform a channel action with the optional data passed as an attribute
- perform: (action, data = {}) ->
- data.action = action
- @send(data)
-
- send: (data) ->
- @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data))
-
- unsubscribe: ->
- @subscriptions.remove(this)
-
- extend = (object, properties) ->
- if properties?
- for key, value of properties
- object[key] = value
- object
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
deleted file mode 100644
index ae041ffa2b..0000000000
--- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
+++ /dev/null
@@ -1,64 +0,0 @@
-# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user
-# us ActionCable.Subscriptions#create, and it should be called through the consumer like so:
-#
-# @App = {}
-# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1"
-# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
-#
-# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription.
-class ActionCable.Subscriptions
- constructor: (@consumer) ->
- @subscriptions = []
-
- create: (channelName, mixin) ->
- channel = channelName
- params = if typeof channel is "object" then channel else {channel}
- new ActionCable.Subscription this, params, mixin
-
- # Private
-
- add: (subscription) ->
- @subscriptions.push(subscription)
- @notify(subscription, "initialized")
- @sendCommand(subscription, "subscribe")
-
- remove: (subscription) ->
- @forget(subscription)
-
- unless @findAll(subscription.identifier).length
- @sendCommand(subscription, "unsubscribe")
-
- reject: (identifier) ->
- for subscription in @findAll(identifier)
- @forget(subscription)
- @notify(subscription, "rejected")
-
- forget: (subscription) ->
- @subscriptions = (s for s in @subscriptions when s isnt subscription)
-
- findAll: (identifier) ->
- s for s in @subscriptions when s.identifier is identifier
-
- reload: ->
- for subscription in @subscriptions
- @sendCommand(subscription, "subscribe")
-
- notifyAll: (callbackName, args...) ->
- for subscription in @subscriptions
- @notify(subscription, callbackName, args...)
-
- notify: (subscription, callbackName, args...) ->
- if typeof subscription is "string"
- subscriptions = @findAll(subscription)
- else
- subscriptions = [subscription]
-
- for subscription in subscriptions
- subscription[callbackName]?(args...)
-
- sendCommand: (subscription, command) ->
- {identifier} = subscription
- if identifier is ActionCable.INTERNAL.identifiers.ping
- @consumer.connection.isOpen()
- else
- @consumer.send({command, identifier})
diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb
index 6cf04ee61f..7bff3341c1 100644
--- a/actioncable/lib/rails/generators/channel/templates/channel.rb
+++ b/actioncable/lib/rails/generators/channel/templates/channel.rb
@@ -1,4 +1,4 @@
-# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading.
+# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading.
<% module_namespacing do -%>
class <%= class_name %>Channel < ApplicationCable::Channel
def subscribed