aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/action_cable.rb11
-rw-r--r--lib/action_cable/channel/base.rb11
-rw-r--r--lib/action_cable/server.rb53
-rw-r--r--lib/action_cable/worker.rb21
4 files changed, 82 insertions, 14 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb
index 7df2a8c5eb..e7d8f4cbb1 100644
--- a/lib/action_cable.rb
+++ b/lib/action_cable.rb
@@ -1,12 +1,21 @@
-require 'cramp'
+require 'eventmachine'
+EM.epoll
+
+require 'set'
+
require 'active_support'
require 'active_support/json'
require 'active_support/concern'
require 'active_support/core_ext/hash/indifferent_access'
+require 'active_support/callbacks'
+
+require 'faye/websocket'
+require 'celluloid'
module ActionCable
VERSION = '0.0.1'
autoload :Channel, 'action_cable/channel'
+ autoload :Worker, 'action_cable/worker'
autoload :Server, 'action_cable/server'
end
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb
index ae8822d2a2..832c8cc314 100644
--- a/lib/action_cable/channel/base.rb
+++ b/lib/action_cable/channel/base.rb
@@ -41,13 +41,13 @@ module ActionCable
def subscribe
self.class.on_subscribe_callbacks.each do |callback|
- EM.next_tick { send(callback) }
+ send(callback)
end
end
def unsubscribe
self.class.on_unsubscribe_callbacks.each do |callback|
- EM.next_tick { send(callback) }
+ send(callback)
end
end
@@ -67,7 +67,7 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
@_active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
- callback.respond_to?(:call) ? instance_exec(&callback) : send(callback)
+ worker_pool.async.run_periodic_timer(self, callback)
end
end
end
@@ -75,6 +75,11 @@ module ActionCable
def stop_periodic_timers
@_active_periodic_timers.each {|t| t.cancel }
end
+
+ def worker_pool
+ connection.worker_pool
+ end
+
end
end
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index cdf8ea0f66..e657f6d636 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -1,23 +1,49 @@
-require 'set'
-
module ActionCable
- class Server < Cramp::Websocket
- on_data :received_data
- on_finish :cleanup_subscriptions
-
+ class Server
class_attribute :registered_channels
self.registered_channels = Set.new
+ class_attribute :worker_pool_size
+ self.worker_pool_size = 100
+
class << self
def register_channels(*channel_classes)
self.registered_channels += channel_classes
end
+
+ def call(env)
+ new(env).process
+ end
+
+ def worker_pool
+ @worker_pool ||= ActionCable::Worker.pool(size: worker_pool_size)
+ end
+ end
+
+ def initialize(env)
+ @env = env
end
- def initialize(*)
- @subscriptions = {}
+ def process
+ if Faye::WebSocket.websocket?(@env)
+ @subscriptions = {}
- super
+ @websocket = Faye::WebSocket.new(@env)
+
+ @websocket.on(:message) do |event|
+ message = event.data
+ worker_pool.async.invoke(self, :received_data, message) if message.is_a?(String)
+ end
+
+ @websocket.on(:close) do |event|
+ worker_pool.async.invoke(self, :cleanup_subscriptions)
+ worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)
+ end
+
+ @websocket.rack_response
+ else
+ invalid_request
+ end
end
def received_data(data)
@@ -40,7 +66,11 @@ module ActionCable
end
def broadcast(data)
- render data
+ @websocket.send data
+ end
+
+ def worker_pool
+ self.class.worker_pool
end
private
@@ -71,5 +101,8 @@ module ActionCable
@subscriptions.delete(id_key)
end
+ def invalid_request
+ [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
+ end
end
end
diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb
new file mode 100644
index 0000000000..6687af43a0
--- /dev/null
+++ b/lib/action_cable/worker.rb
@@ -0,0 +1,21 @@
+module ActionCable
+ class Worker
+ include ActiveSupport::Callbacks
+ include Celluloid
+
+ define_callbacks :work
+
+ def invoke(receiver, method, *args)
+ run_callbacks :work do
+ receiver.send method, *args
+ end
+ end
+
+ def run_periodic_timer(channel, callback)
+ run_callbacks :work do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ end
+ end
+
+ end
+end