diff options
Diffstat (limited to 'actioncable/lib/action_cable')
16 files changed, 154 insertions, 34 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 0f6e854520..b414255707 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,7 +27,7 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << connection.server.event_loop.timer(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 431a5c1063..23d7320a28 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -79,7 +79,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - Concurrent.global_io_executor.post do + connection.server.event_loop.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2..5f813cf8e0 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,6 +8,8 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel + autoload :FayeClientSocket + autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index afe0d958d7..7e7e777eaa 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,7 +49,7 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger, :worker_pool - delegate :stream_event_loop, :pubsub, to: :server + delegate :event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @@ -57,7 +57,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index f6b11e93f0..9e4dbcd6e6 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -29,10 +29,10 @@ module ActionCable attr_reader :env, :url - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop + def initialize(env, event_target, event_loop) + @env = env + @event_target = event_target + @event_loop = event_loop @url = ClientSocket.determine_url(@env) @@ -49,7 +49,7 @@ module ActionCable @driver.on(:close) { |e| begin_close(e.reason, e.code) } @driver.on(:error) { |e| emit_error(e.message) } - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + @stream = ActionCable::Connection::Stream.new(@event_loop, self) end def start_driver diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb new file mode 100644 index 0000000000..c9139b6858 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -0,0 +1,42 @@ +require 'faye/websocket' + +module ActionCable + module Connection + class FayeClientSocket + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + + @faye = nil + end + + def alive? + @faye && @faye.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + connect + @faye.send data + end + + def close + @faye && @faye.close + end + + def rack_response + connect + @faye.rack_response + end + + private + def connect + return if @faye + @faye = Faye::WebSocket.new(@env) + + @faye.on(:open) { |event| @event_target.on_open } + @faye.on(:message) { |event| @event_target.on_message(event.data) } + @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb new file mode 100644 index 0000000000..8b70f3d84e --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -0,0 +1,44 @@ +require 'thread' + +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module Connection + class FayeEventLoop + @@mutex = Mutex.new + + def timer(interval, &block) + ensure_reactor_running + EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) + end + + def post(task = nil, &block) + task ||= block + + ensure_reactor_running + ::EM.next_tick(&task) + end + + private + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + + class EMTimer + def initialize(inner) + @inner = inner + end + + def shutdown + inner.cancel + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..3c5d39f59a 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + server.event_loop.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index e6335082d2..2abad09c03 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -11,7 +11,16 @@ module ActionCable @todo = Queue.new @spawn_mutex = Mutex.new - spawn + end + + def timer(interval, &block) + Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) + end + + def post(task = nil, &block) + task ||= block + + Concurrent.global_io_executor << task end def attach(io, stream) diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..0bec9b6a96 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + def initialize(env, event_target, event_loop, client_socket_class) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil end def possible? diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index d9a2653cc2..778f5ffeed 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,4 +1,4 @@ -require 'thread' +require 'monitor' module ActionCable module Server @@ -18,8 +18,8 @@ module ActionCable attr_reader :mutex def initialize - @mutex = Mutex.new - @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil + @mutex = Monitor.new + @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil end # Called by Rack to setup the server. @@ -48,8 +48,8 @@ module ActionCable @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end - def stream_event_loop - @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } + def event_loop + @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a7301287c..5fe71caed2 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,7 +4,7 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :connection_class, :worker_pool_size + attr_accessor :use_faye, :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins attr_accessor :cable, :url, :mount_path @@ -43,6 +43,22 @@ module ActionCable adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize end + + def event_loop_class + if use_faye + ActionCable::Connection::FayeEventLoop + else + ActionCable::Connection::StreamEventLoop + end + end + + def client_socket_class + if use_faye + ActionCable::Connection::FayeClientSocket + else + ActionCable::Connection::ClientSocket + end + end end end end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 4dc8934b25..5e61b4e335 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -21,9 +21,9 @@ module ActionCable # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do + event_loop.post { connections.map(&:beat) } + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index cca6894289..10b3ac8cd8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -5,16 +5,21 @@ module ActionCable class Async < Inline # :nodoc: private def new_subscriber_map - AsyncSubscriberMap.new + AsyncSubscriberMap.new(server.event_loop) end class AsyncSubscriberMap < SubscriberMap + def initialize(event_loop) + @event_loop = event_loop + super() + end + def add_subscriber(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index abaeb92e54..66c7852f6e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -42,14 +42,15 @@ module ActionCable private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -68,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -98,7 +99,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 6b4236e7d3..65434f7107 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -38,7 +38,7 @@ module ActionCable private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts @@ -52,10 +52,11 @@ module ActionCable end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @@ -84,7 +85,7 @@ module ActionCable if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift - Concurrent.global_io_executor << next_callback if next_callback + @event_loop.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end @@ -133,7 +134,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end private |