diff options
author | Jeremy Daer <jeremydaer@gmail.com> | 2016-03-11 16:32:02 -0700 |
---|---|---|
committer | Jeremy Daer <jeremydaer@gmail.com> | 2016-03-31 07:08:16 -0700 |
commit | b168eb5819fa5fea940c9865d5c9a3ec5ba2a7ec (patch) | |
tree | e6225bc33dcc1dcefdfefca4537bde07bc8df94a /actioncable/lib/action_cable/channel | |
parent | 903f447e436a7c909c3afc552f27bbbc1b4770c8 (diff) | |
download | rails-b168eb5819fa5fea940c9865d5c9a3ec5ba2a7ec.tar.gz rails-b168eb5819fa5fea940c9865d5c9a3ec5ba2a7ec.tar.bz2 rails-b168eb5819fa5fea940c9865d5c9a3ec5ba2a7ec.zip |
Cable message encoding
* Introduce a connection coder responsible for encoding Cable messages
as WebSocket messages, defaulting to `ActiveSupport::JSON` and duck-
typing to any object responding to `#encode` and `#decode`.
* Consolidate encoding responsibility to the connection. No longer
explicitly JSON-encode from channels or other sources. Pass Cable
messages as Hashes to `#transmit` and rely on it to encode.
* Introduce stream encoders responsible for decoding pubsub messages.
Preserve the currently raw encoding, but make it easy to use JSON.
Same duck type as the connection encoder.
* Revert recent data normalization/quoting (#23649) which treated
`identifier` and `data` values as nested JSON objects rather than as
opaque JSON-encoded strings. That dealt us an awkward hand where we'd
decode JSON stringsā¦ or not, but always encode as JSON. Embedding
JSON object values directly is preferably, no extra JSON encoding,
but that should be a purposeful protocol version change rather than
ambiguously, inadvertently supporting multiple message formats.
Diffstat (limited to 'actioncable/lib/action_cable/channel')
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 6 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 32 |
2 files changed, 24 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 464d0581dd..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -198,7 +198,7 @@ module ActionCable payload = { channel_class: self.class.name, data: data, via: via } ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + connection.transmit identifier: @identifier, message: data end end @@ -274,7 +274,7 @@ module ActionCable logger.info "#{self.class.name} is transmitting the subscription confirmation" ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] @subscription_confirmation_sent = true end end @@ -289,7 +289,7 @@ module ActionCable logger.info "#{self.class.name} is transmitting the subscription rejection" ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 23d7320a28..f654ce0bfa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -46,9 +46,7 @@ module ActionCable # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -71,16 +69,23 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + if handler = callback || block + handler = -> message { handler.(coder.decode(message)) } if coder + else + handler = default_stream_handler(broadcasting, coder: coder) + end + + streams << [ broadcasting, handler ] connection.server.event_loop.post do - pubsub.subscribe(broadcasting, callback, lambda do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -90,8 +95,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -109,9 +117,11 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit coder.decode(message), via: "streamed from #{broadcasting}" end end end |