aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/channel
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/channel')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb6
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb32
2 files changed, 24 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 464d0581dd..845b747fc5 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -198,7 +198,7 @@ module ActionCable
payload = { channel_class: self.class.name, data: data, via: via }
ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+ connection.transmit identifier: @identifier, message: data
end
end
@@ -274,7 +274,7 @@ module ActionCable
logger.info "#{self.class.name} is transmitting the subscription confirmation"
ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
@subscription_confirmation_sent = true
end
end
@@ -289,7 +289,7 @@ module ActionCable
logger.info "#{self.class.name} is transmitting the subscription rejection"
ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 23d7320a28..f654ce0bfa 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -46,9 +46,7 @@ module ActionCable
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
- # stream_for @room, -> (encoded_message) do
- # message = ActiveSupport::JSON.decode(encoded_message)
- #
+ # stream_for @room, coder: ActiveSupport::JSON do |message|
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
@@ -71,16 +69,23 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
- def stream_from(broadcasting, callback = nil)
+ # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
+ # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- callback ||= default_stream_callback(broadcasting)
- streams << [ broadcasting, callback ]
+ if handler = callback || block
+ handler = -> message { handler.(coder.decode(message)) } if coder
+ else
+ handler = default_stream_handler(broadcasting, coder: coder)
+ end
+
+ streams << [ broadcasting, handler ]
connection.server.event_loop.post do
- pubsub.subscribe(broadcasting, callback, lambda do
+ pubsub.subscribe(broadcasting, handler, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
@@ -90,8 +95,11 @@ module ActionCable
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
- def stream_for(model, callback = nil)
- stream_from(broadcasting_for([ channel_name, model ]), callback)
+ #
+ # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
+ # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ def stream_for(model, callback = nil, coder: nil, &block)
+ stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end
# Unsubscribes all streams associated with this channel from the pubsub queue.
@@ -109,9 +117,11 @@ module ActionCable
@_streams ||= []
end
- def default_stream_callback(broadcasting)
+ def default_stream_handler(broadcasting, coder:)
+ coder ||= ActiveSupport::JSON
+
-> (message) do
- transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
+ transmit coder.decode(message), via: "streamed from #{broadcasting}"
end
end
end