aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/channel/streams.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/channel/streams.rb')
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb34
1 files changed, 18 insertions, 16 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index b5ffa17f72..3158f30814 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -41,22 +41,23 @@ module ActionCable
# Example below shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
- # def subscribed
- # @room = Chat::Room[params[:room_number]]
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
#
- # stream_for @room, -> (encoded_message) do
- # message = ActiveSupport::JSON.decode(encoded_message)
+ # stream_for @room, -> (encoded_message) do
+ # message = ActiveSupport::JSON.decode(encoded_message)
#
- # if message['originated_at'].present?
- # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
+ # if message['originated_at'].present?
+ # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
- # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
- # logger.info "Message took #{elapsed_time}s to arrive"
- # end
+ # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
+ # logger.info "Message took #{elapsed_time}s to arrive"
+ # end
#
- # transmit message
- # end
- # end
+ # transmit message
+ # end
+ # end
+ # end
#
# You can stop streaming from all broadcasts by calling #stop_all_streams.
module Streams
@@ -75,11 +76,11 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- EM.next_tick do
- pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ Concurrent.global_io_executor.post do
+ pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
- end
+ end)
end
end
@@ -90,9 +91,10 @@ module ActionCable
stream_from(broadcasting_for([ channel_name, model ]), callback)
end
+ # Unsubscribes all streams associated with this channel from the pubsub queue.
def stop_all_streams
streams.each do |broadcasting, callback|
- pubsub.unsubscribe_proc broadcasting, callback
+ pubsub.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end