aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib')
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb52
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb64
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/engine.rb2
-rw-r--r--actioncable/lib/action_cable/gem_version.rb2
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb2
-rw-r--r--actioncable/lib/action_cable/server/worker.rb30
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb4
8 files changed, 116 insertions, 42 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index b414255707..dab604440f 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -12,11 +12,42 @@ module ActionCable
end
module ClassMethods
- # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
- # for sending a steady flow of updates to a client based off an object that was configured on subscription.
- # It's an alternative to using streams if the channel is able to do the work internally.
- def periodically(callback, every:)
- self.periodic_timers += [ [ callback, every: every ] ]
+ # Periodically performs a task on the channel, like updating an online
+ # user counter, polling a backend for new status messages, sending
+ # regular "heartbeat" messages, or doing some internal work and giving
+ # progress updates.
+ #
+ # Pass a method name or lambda argument or provide a block to call.
+ # Specify the calling period in seconds using the <tt>every:</tt>
+ # keyword argument.
+ #
+ # periodically :transmit_progress, every: 5.seconds
+ #
+ # periodically every: 3.minutes do
+ # transmit action: :update_count, count: current_count
+ # end
+ #
+ def periodically(callback_or_method_name = nil, every:, &block)
+ callback =
+ if block_given?
+ raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name
+ block
+ else
+ case callback_or_method_name
+ when Proc
+ callback_or_method_name
+ when Symbol
+ -> { __send__ callback_or_method_name }
+ else
+ raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}"
+ end
+ end
+
+ unless every.kind_of?(Numeric) && every > 0
+ raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
+ end
+
+ self.periodic_timers += [[ callback, every: every ]]
end
end
@@ -27,14 +58,21 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << connection.server.event_loop.timer(options[:every]) do
- connection.worker_pool.async_run_periodic_timer(self, callback)
+ active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every))
+ end
+ end
+
+ def start_periodic_timer(callback, every:)
+ connection.server.event_loop.timer every do
+ connection.worker_pool.async_invoke connection do
+ instance_exec(&callback)
end
end
end
def stop_periodic_timers
active_periodic_timers.each { |timer| timer.shutdown }
+ active_periodic_timers.clear
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 8b46ac216a..200c9d053c 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -2,7 +2,7 @@ module ActionCable
module Channel
# Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
# placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
- # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent.
+ # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
# the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
@@ -73,18 +73,13 @@ module ActionCable
# Defaults to `coder: nil` which does no decoding, passes raw messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
+
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- if user_handler = callback || block
- user_handler = -> message { handler.(coder.decode(message)) } if coder
- handler = -> message do
- connection.worker_pool.async_invoke(user_handler, :call, message)
- end
- else
- handler = default_stream_handler(broadcasting, coder: coder)
- end
-
+ # Build a stream handler by wrapping the user-provided callback with
+ # a decoder or defaulting to a JSON-decoding retransmitter.
+ handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
streams << [ broadcasting, handler ]
connection.server.event_loop.post do
@@ -120,13 +115,60 @@ module ActionCable
@_streams ||= []
end
+ # Always wrap the outermost handler to invoke the user handler on the
+ # worker pool rather than blocking the event loop.
+ def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
+ handler = stream_handler(broadcasting, user_handler, coder: coder)
+
+ -> message do
+ connection.worker_pool.async_invoke handler, :call, message, connection: connection
+ end
+ end
+
+ # May be overridden to add instrumentation, logging, specialized error
+ # handling, or other forms of handler decoration.
+ #
+ # TODO: Tests demonstrating this.
+ def stream_handler(broadcasting, user_handler, coder: nil)
+ if user_handler
+ stream_decoder user_handler, coder: coder
+ else
+ default_stream_handler broadcasting, coder: coder
+ end
+ end
+
+ # May be overridden to change the default stream handling behavior
+ # which decodes JSON and transmits to client.
+ #
+ # TODO: Tests demonstrating this.
+ #
+ # TODO: Room for optimization. Update transmit API to be coder-aware
+ # so we can no-op when pubsub and connection are both JSON-encoded.
+ # Then we can skip decode+encode if we're just proxying messages.
def default_stream_handler(broadcasting, coder:)
coder ||= ActiveSupport::JSON
+ stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
+ end
+
+ def stream_decoder(handler = identity_handler, coder:)
+ if coder
+ -> message { handler.(coder.decode(message)) }
+ else
+ handler
+ end
+ end
+
+ def stream_transmitter(handler = identity_handler, broadcasting:)
+ via = "streamed from #{broadcasting}"
-> (message) do
- transmit coder.decode(message), via: "streamed from #{broadcasting}"
+ transmit handler.(message), via: via
end
end
+
+ def identity_handler
+ -> message { message }
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 9a7dfbe761..cc4e0f8c8b 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -40,7 +40,7 @@ module ActionCable
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
- # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index 7dc541d00c..8ce1b24962 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -4,7 +4,7 @@ require "action_cable/helpers/action_cable_helper"
require "active_support/core_ext/hash/indifferent_access"
module ActionCable
- class Railtie < Rails::Engine # :nodoc:
+ class Engine < Rails::Engine # :nodoc:
config.action_cable = ActiveSupport::OrderedOptions.new
config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path]
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
index 67adeefaff..5ca2f473b1 100644
--- a/actioncable/lib/action_cable/gem_version.rb
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -8,7 +8,7 @@ module ActionCable
MAJOR = 5
MINOR = 0
TINY = 0
- PRE = "beta3"
+ PRE = "beta4"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index aeef8abc72..a528024427 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -28,7 +28,7 @@ module ActionCable
private
# Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>.
- # Exists for the solely for the purpose of calling #disconnect on that connection.
+ # Exists solely for the purpose of calling #disconnect on that connection.
class RemoteConnection
class InvalidIdentifiersError < StandardError; end
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index 49cbaec0c0..a638ff72e7 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -12,8 +12,10 @@ module ActionCable
define_callbacks :work
include ActiveRecordConnectionManagement
+ attr_reader :executor
+
def initialize(max_size: 5)
- @pool = Concurrent::ThreadPoolExecutor.new(
+ @executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
@@ -23,11 +25,11 @@ module ActionCable
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
- @pool.kill
+ @executor.kill
end
def stopping?
- @pool.shuttingdown?
+ @executor.shuttingdown?
end
def work(connection)
@@ -40,14 +42,14 @@ module ActionCable
self.connection = nil
end
- def async_invoke(receiver, method, *args)
- @pool.post do
- invoke(receiver, method, *args)
+ def async_invoke(receiver, method, *args, connection: receiver)
+ @executor.post do
+ invoke(receiver, method, *args, connection: connection)
end
end
- def invoke(receiver, method, *args)
- work(receiver) do
+ def invoke(receiver, method, *args, connection:)
+ work(connection) do
begin
receiver.send method, *args
rescue Exception => e
@@ -59,18 +61,6 @@ module ActionCable
end
end
- def async_run_periodic_timer(channel, callback)
- @pool.post do
- run_periodic_timer(channel, callback)
- end
- end
-
- def run_periodic_timer(channel, callback)
- work(channel.connection) do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
- end
- end
-
private
def logger
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
index 256876cf30..4735a4bfa8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -53,6 +53,10 @@ module ActionCable
redis.on(:reconnect_failed) do
@logger.error "[ActionCable] Redis reconnect failed."
end
+
+ redis.on(:failed) do
+ @logger.error "[ActionCable] Redis connection has failed."
+ end
end
end
end