From 7fef6b01a3011438d48136e3f95bb9a823e87ec6 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Thu, 5 Feb 2015 16:35:11 +0530 Subject: No cramp and use celluloid workers to run callbacks --- lib/action_cable/channel/base.rb | 4 ++-- lib/action_cable/server.rb | 43 ++++++++++++++++++++++++++++++++-------- lib/action_cable/worker.rb | 19 ++++++++++++++++++ 3 files changed, 56 insertions(+), 10 deletions(-) create mode 100644 lib/action_cable/worker.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index ae8822d2a2..e311cc97e9 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -41,13 +41,13 @@ module ActionCable def subscribe self.class.on_subscribe_callbacks.each do |callback| - EM.next_tick { send(callback) } + send(callback) end end def unsubscribe self.class.on_unsubscribe_callbacks.each do |callback| - EM.next_tick { send(callback) } + send(callback) end end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index cdf8ea0f66..ea22f0014e 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,10 +1,11 @@ require 'set' +require 'faye/websocket' +require 'celluloid' -module ActionCable - class Server < Cramp::Websocket - on_data :received_data - on_finish :cleanup_subscriptions +Celluloid::Actor[:worker_pool] = ActionCable::Worker.pool(size: 100) +module ActionCable + class Server class_attribute :registered_channels self.registered_channels = Set.new @@ -12,12 +13,35 @@ module ActionCable def register_channels(*channel_classes) self.registered_channels += channel_classes end + + def call(env) + new(env).process + end end - def initialize(*) - @subscriptions = {} + def initialize(env) + @env = env + end + + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) - super + @websocket.on(:message) do |event| + message = event.data + Celluloid::Actor[:worker_pool].async.received_data(self, message) if message.is_a?(String) + end + + @websocket.on(:close) do |event| + Celluloid::Actor[:worker_pool].async.cleanup_subscriptions(self) + end + + @websocket.rack_response + else + invalid_request + end end def received_data(data) @@ -40,7 +64,7 @@ module ActionCable end def broadcast(data) - render data + @websocket.send data end private @@ -71,5 +95,8 @@ module ActionCable @subscriptions.delete(id_key) end + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end end end diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb new file mode 100644 index 0000000000..46b5f7edc0 --- /dev/null +++ b/lib/action_cable/worker.rb @@ -0,0 +1,19 @@ +module ActionCable + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def received_data(connection, data) + run_callbacks :work do + connection.received_data(data) + end + end + + def cleanup_subscriptions(connection) + connection.cleanup_subscriptions + end + + end +end -- cgit v1.2.3