diff options
author | Jeremy Daer <jeremydaer@gmail.com> | 2016-03-31 07:08:49 -0700 |
---|---|---|
committer | Jeremy Daer <jeremydaer@gmail.com> | 2016-03-31 07:08:49 -0700 |
commit | 9a83ef38912063db127a3918b0f155d5d3acfe3b (patch) | |
tree | e6225bc33dcc1dcefdfefca4537bde07bc8df94a /actioncable/lib/action_cable/channel/streams.rb | |
parent | 903f447e436a7c909c3afc552f27bbbc1b4770c8 (diff) | |
parent | b168eb5819fa5fea940c9865d5c9a3ec5ba2a7ec (diff) | |
download | rails-9a83ef38912063db127a3918b0f155d5d3acfe3b.tar.gz rails-9a83ef38912063db127a3918b0f155d5d3acfe3b.tar.bz2 rails-9a83ef38912063db127a3918b0f155d5d3acfe3b.zip |
Merge pull request #24233 from jeremy/cable/encoding
Cable message encoding
Diffstat (limited to 'actioncable/lib/action_cable/channel/streams.rb')
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 23d7320a28..f654ce0bfa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -46,9 +46,7 @@ module ActionCable # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -71,16 +69,23 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + if handler = callback || block + handler = -> message { handler.(coder.decode(message)) } if coder + else + handler = default_stream_handler(broadcasting, coder: coder) + end + + streams << [ broadcasting, handler ] connection.server.event_loop.post do - pubsub.subscribe(broadcasting, callback, lambda do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -90,8 +95,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -109,9 +117,11 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit coder.decode(message), via: "streamed from #{broadcasting}" end end end |