diff options
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r-- | actioncable/lib/action_cable/channel/periodic_timers.rb | 52 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 61 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/base.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 30 |
4 files changed, 109 insertions, 36 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index b414255707..dab604440f 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,11 +12,42 @@ module ActionCable end module ClassMethods - # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful - # for sending a steady flow of updates to a client based off an object that was configured on subscription. - # It's an alternative to using streams if the channel is able to do the work internally. - def periodically(callback, every:) - self.periodic_timers += [ [ callback, every: every ] ] + # Periodically performs a task on the channel, like updating an online + # user counter, polling a backend for new status messages, sending + # regular "heartbeat" messages, or doing some internal work and giving + # progress updates. + # + # Pass a method name or lambda argument or provide a block to call. + # Specify the calling period in seconds using the <tt>every:</tt> + # keyword argument. + # + # periodically :transmit_progress, every: 5.seconds + # + # periodically every: 3.minutes do + # transmit action: :update_count, count: current_count + # end + # + def periodically(callback_or_method_name = nil, every:, &block) + callback = + if block_given? + raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name + block + else + case callback_or_method_name + when Proc + callback_or_method_name + when Symbol + -> { __send__ callback_or_method_name } + else + raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}" + end + end + + unless every.kind_of?(Numeric) && every > 0 + raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" + end + + self.periodic_timers += [[ callback, every: every ]] end end @@ -27,14 +58,21 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << connection.server.event_loop.timer(options[:every]) do - connection.worker_pool.async_run_periodic_timer(self, callback) + active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every)) + end + end + + def start_periodic_timer(callback, every:) + connection.server.event_loop.timer every do + connection.worker_pool.async_invoke connection do + instance_exec(&callback) end end end def stop_periodic_timers active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.clear end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index f654ce0bfa..200c9d053c 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -2,7 +2,7 @@ module ActionCable module Channel # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent. + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -73,15 +73,13 @@ module ActionCable # Defaults to `coder: nil` which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - if handler = callback || block - handler = -> message { handler.(coder.decode(message)) } if coder - else - handler = default_stream_handler(broadcasting, coder: coder) - end - + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams << [ broadcasting, handler ] connection.server.event_loop.post do @@ -117,13 +115,60 @@ module ActionCable @_streams ||= [] end + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. def default_stream_handler(broadcasting, coder:) coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" -> (message) do - transmit coder.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 9a7dfbe761..cc4e0f8c8b 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -40,7 +40,7 @@ module ActionCable # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # - # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. + # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log. # # Pretty simple, eh? class Base diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 49cbaec0c0..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e @@ -59,18 +61,6 @@ module ActionCable end end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) - end - end - - def run_periodic_timer(channel, callback) - work(channel.connection) do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - private def logger |