aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/server.rb
diff options
context:
space:
mode:
authorPratik Naik <pratiknaik@gmail.com>2015-04-04 00:26:14 -0500
committerPratik Naik <pratiknaik@gmail.com>2015-04-04 00:26:14 -0500
commit354018bf9b5f5bf0fbbc6e6efddc719e7523b39d (patch)
treecc72b903c615e4061090d7fd75ba43b19777764f /lib/action_cable/server.rb
parentf29a14207aa3084cb2f0a73cb5672729aa0c6d62 (diff)
downloadrails-354018bf9b5f5bf0fbbc6e6efddc719e7523b39d.tar.gz
rails-354018bf9b5f5bf0fbbc6e6efddc719e7523b39d.tar.bz2
rails-354018bf9b5f5bf0fbbc6e6efddc719e7523b39d.zip
Separate connection and server classes
Diffstat (limited to 'lib/action_cable/server.rb')
-rw-r--r--lib/action_cable/server.rb154
1 files changed, 10 insertions, 144 deletions
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index ebf98171c1..6e9265dc06 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -1,157 +1,23 @@
module ActionCable
class Server
- class_attribute :registered_channels
- self.registered_channels = Set.new
-
- class_attribute :worker_pool_size
- self.worker_pool_size = 100
-
cattr_accessor(:logger, instance_reader: true) { Rails.logger }
- PING_INTERVAL = 3
-
- class << self
- def register_channels(*channel_classes)
- self.registered_channels += channel_classes
- end
-
- def call(env)
- new(env).process
- end
-
- def worker_pool
- @worker_pool ||= ActionCable::Worker.pool(size: worker_pool_size)
- end
- end
-
- attr_reader :env
-
- def initialize(env)
- @env = env
- @accept_messages = false
- @pending_messages = []
- end
-
- def process
- if Faye::WebSocket.websocket?(@env)
- @subscriptions = {}
-
- @websocket = Faye::WebSocket.new(@env)
-
- @websocket.on(:open) do |event|
- broadcast_ping_timestamp
- @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
- worker_pool.async.invoke(self, :initialize_client)
- end
-
- @websocket.on(:message) do |event|
- message = event.data
-
- if message.is_a?(String)
- if @accept_messages
- worker_pool.async.invoke(self, :received_data, message)
- else
- @pending_messages << message
- end
- end
- end
+ attr_accessor :registered_channels, :worker_pool
- @websocket.on(:close) do |event|
- worker_pool.async.invoke(self, :cleanup_subscriptions)
- worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)
-
- EventMachine.cancel_timer(@ping_timer) if @ping_timer
- end
-
- @websocket.rack_response
- else
- invalid_request
- end
+ def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection)
+ @redis_config = redis_config
+ @registered_channels = Set.new(channels)
+ @worker_pool = ActionCable::Worker.pool(size: worker_pool_size)
+ @connection_class = connection
end
- def received_data(data)
- return unless websocket_alive?
-
- data = ActiveSupport::JSON.decode data
-
- case data['action']
- when 'subscribe'
- subscribe_channel(data)
- when 'unsubscribe'
- unsubscribe_channel(data)
- when 'message'
- process_message(data)
- end
+ def call(env)
+ @connection_class.new(self, env).process
end
- def cleanup_subscriptions
- @subscriptions.each do |id, channel|
- channel.unsubscribe
- end
+ def pubsub
+ @pubsub ||= EM::Hiredis.connect(@redis_config['url']).pubsub
end
- def broadcast(data)
- logger.info "Sending data: #{data}"
- @websocket.send data
- end
-
- def worker_pool
- self.class.worker_pool
- end
-
- def handle_exception
- logger.error "[ActionCable] Closing connection"
-
- @websocket.close
- end
-
- private
- def initialize_client
- connect if respond_to?(:connect)
- @accept_messages = true
-
- worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
- end
-
- def broadcast_ping_timestamp
- broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
-
- def subscribe_channel(data)
- id_key = data['identifier']
- id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
-
- subscription_klass = registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }
-
- if subscription_klass
- logger.info "Subscribing to channel: #{id_key}"
- @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
- else
- logger.error "Unable to subscribe to channel: #{id_key}"
- end
- end
-
- def process_message(message)
- if @subscriptions[message['identifier']]
- @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
- else
- logger.error "Unable to process message: #{message}"
- end
- end
-
- def unsubscribe_channel(data)
- logger.info "Unsubscribing from channel: #{data['identifier']}"
- @subscriptions[data['identifier']].unsubscribe
- @subscriptions.delete(data['identifier'])
- end
-
- def invalid_request
- [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
- end
-
- def websocket_alive?
- @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
- end
-
end
end