aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-03-02 11:20:19 +1030
committerMatthew Draper <matthew@trebex.net>2016-03-02 12:17:15 +1030
commita373be9da45d4bee684ea03420212780ec1ef4b1 (patch)
tree3798c6117d8944189c84bd5363d49dbf935ea407 /actioncable/lib/action_cable/connection
parent541e4abb4b3710a384aefac83cafd0ab878c60bf (diff)
downloadrails-a373be9da45d4bee684ea03420212780ec1ef4b1.tar.gz
rails-a373be9da45d4bee684ea03420212780ec1ef4b1.tar.bz2
rails-a373be9da45d4bee684ea03420212780ec1ef4b1.zip
Support faye-websocket + EventMachine as an option
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb4
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb10
-rw-r--r--actioncable/lib/action_cable/connection/faye_client_socket.rb42
-rw-r--r--actioncable/lib/action_cable/connection/faye_event_loop.rb44
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb11
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb4
7 files changed, 107 insertions, 12 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index afe0d958d7..7e7e777eaa 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,7 +49,7 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
- delegate :stream_event_loop, :pubsub, to: :server
+ delegate :event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@@ -57,7 +57,7 @@ module ActionCable
@worker_pool = server.worker_pool
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index f6b11e93f0..9e4dbcd6e6 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -29,10 +29,10 @@ module ActionCable
attr_reader :env, :url
- def initialize(env, event_target, stream_event_loop)
- @env = env
- @event_target = event_target
- @stream_event_loop = stream_event_loop
+ def initialize(env, event_target, event_loop)
+ @env = env
+ @event_target = event_target
+ @event_loop = event_loop
@url = ClientSocket.determine_url(@env)
@@ -49,7 +49,7 @@ module ActionCable
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
- @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+ @stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
def start_driver
diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb
new file mode 100644
index 0000000000..c9139b6858
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb
@@ -0,0 +1,42 @@
+require 'faye/websocket'
+
+module ActionCable
+ module Connection
+ class FayeClientSocket
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+
+ @faye = nil
+ end
+
+ def alive?
+ @faye && @faye.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ connect
+ @faye.send data
+ end
+
+ def close
+ @faye && @faye.close
+ end
+
+ def rack_response
+ connect
+ @faye.rack_response
+ end
+
+ private
+ def connect
+ return if @faye
+ @faye = Faye::WebSocket.new(@env)
+
+ @faye.on(:open) { |event| @event_target.on_open }
+ @faye.on(:message) { |event| @event_target.on_message(event.data) }
+ @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb
new file mode 100644
index 0000000000..8b70f3d84e
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb
@@ -0,0 +1,44 @@
+require 'thread'
+
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module Connection
+ class FayeEventLoop
+ @@mutex = Mutex.new
+
+ def timer(interval, &block)
+ ensure_reactor_running
+ EMTimer.new(::EM::PeriodicTimer.new(interval, &block))
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ ensure_reactor_running
+ ::EM.next_tick(&task)
+ end
+
+ private
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+
+ class EMTimer
+ def initialize(inner)
+ @inner = inner
+ end
+
+ def shutdown
+ inner.cancel
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 27826792b3..3c5d39f59a 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
+ server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index e6335082d2..2abad09c03 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -11,7 +11,16 @@ module ActionCable
@todo = Queue.new
@spawn_mutex = Mutex.new
- spawn
+ end
+
+ def timer(interval, &block)
+ Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ Concurrent.global_io_executor << task
end
def attach(io, stream)
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 5e89fb9b72..0bec9b6a96 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
- def initialize(env, event_target, stream_event_loop)
- @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
+ def initialize(env, event_target, event_loop, client_socket_class)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil
end
def possible?