diff options
Diffstat (limited to 'actioncable/lib/action_cable/channel')
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 6 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 32 |
2 files changed, 24 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 464d0581dd..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -198,7 +198,7 @@ module ActionCable payload = { channel_class: self.class.name, data: data, via: via } ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + connection.transmit identifier: @identifier, message: data end end @@ -274,7 +274,7 @@ module ActionCable logger.info "#{self.class.name} is transmitting the subscription confirmation" ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] @subscription_confirmation_sent = true end end @@ -289,7 +289,7 @@ module ActionCable logger.info "#{self.class.name} is transmitting the subscription rejection" ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 23d7320a28..f654ce0bfa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -46,9 +46,7 @@ module ActionCable # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -71,16 +69,23 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + if handler = callback || block + handler = -> message { handler.(coder.decode(message)) } if coder + else + handler = default_stream_handler(broadcasting, coder: coder) + end + + streams << [ broadcasting, handler ] connection.server.event_loop.post do - pubsub.subscribe(broadcasting, callback, lambda do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -90,8 +95,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -109,9 +117,11 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit coder.decode(message), via: "streamed from #{broadcasting}" end end end |