aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib')
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb4
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb4
-rw-r--r--actioncable/lib/action_cable/connection.rb5
-rw-r--r--actioncable/lib/action_cable/connection/base.rb32
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb152
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb59
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb22
-rw-r--r--actioncable/lib/action_cable/process/logging.rb7
-rw-r--r--actioncable/lib/action_cable/server.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb4
-rw-r--r--actioncable/lib/action_cable/server/connections.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter.rb5
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb26
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb71
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb25
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb53
20 files changed, 499 insertions, 80 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 7f0fb37afc..56597d02d7 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -27,14 +27,14 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
+ active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
def stop_periodic_timers
- active_periodic_timers.each { |timer| timer.cancel }
+ active_periodic_timers.each { |timer| timer.shutdown }
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 589946c3db..a26373e387 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -75,8 +75,8 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- EM.next_tick do
- pubsub.subscribe(broadcasting, callback, lambda do |reply|
+ Concurrent.global_io_executor.post do
+ pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index b672e00682..902efb07e2 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -5,12 +5,15 @@ module ActionCable
eager_autoload do
autoload :Authorization
autoload :Base
+ autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
- autoload :WebSocket
+ autoload :Stream
+ autoload :StreamEventLoop
autoload :Subscriptions
autoload :TaggedLoggerProxy
+ autoload :WebSocket
end
end
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index bb8850aaa0..0016d1a1a4 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,14 +49,14 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :worker_pool, :pubsub, to: :server
+ delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -70,10 +70,6 @@ module ActionCable
logger.info started_request_message
if websocket.possible? && allow_request_origin?
- websocket.on(:open) { |event| send_async :on_open }
- websocket.on(:message) { |event| on_message event.data }
- websocket.on(:close) { |event| send_async :on_close }
-
respond_to_successful_request
else
respond_to_invalid_request
@@ -121,6 +117,22 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+
+ def on_error(message) # :nodoc:
+ # ignore
+ end
+
+ def on_close # :nodoc:
+ send_async :handle_close
+ end
+
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@@ -139,7 +151,7 @@ module ActionCable
attr_reader :message_buffer
private
- def on_open
+ def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
@@ -150,11 +162,7 @@ module ActionCable
respond_to_invalid_request
end
- def on_message(message)
- message_buffer.append message
- end
-
- def on_close
+ def handle_close
logger.info finished_request_message
server.remove_connection(self)
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
new file mode 100644
index 0000000000..62dd753646
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -0,0 +1,152 @@
+require 'websocket/driver'
+
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class ClientSocket # :nodoc:
+ def self.determine_url(env)
+ scheme = secure_request?(env) ? 'wss:' : 'ws:'
+ "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
+ end
+
+ def self.secure_request?(env)
+ return true if env['HTTPS'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
+ return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
+ return true if env['rack.url_scheme'] == 'https'
+
+ return false
+ end
+
+ CONNECTING = 0
+ OPEN = 1
+ CLOSING = 2
+ CLOSED = 3
+
+ attr_reader :env, :url
+
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+ @stream_event_loop = stream_event_loop
+
+ @url = ClientSocket.determine_url(@env)
+
+ @driver = @driver_started = nil
+
+ @ready_state = CONNECTING
+
+ # The driver calls +env+, +url+, and +write+
+ @driver = ::WebSocket::Driver.rack(self)
+
+ @driver.on(:open) { |e| open }
+ @driver.on(:message) { |e| receive_message(e.data) }
+ @driver.on(:close) { |e| begin_close(e.reason, e.code) }
+ @driver.on(:error) { |e| emit_error(e.message) }
+
+ @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+
+ if callback = @env['async.callback']
+ callback.call([101, {}, @stream])
+ end
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @driver_started = true
+ @driver.start
+ end
+
+ def rack_response
+ start_driver
+ [ -1, {}, [] ]
+ end
+
+ def write(data)
+ @stream.write(data)
+ end
+
+ def transmit(message)
+ return false if @ready_state > OPEN
+ case message
+ when Numeric then @driver.text(message.to_s)
+ when String then @driver.text(message)
+ when Array then @driver.binary(message)
+ else false
+ end
+ end
+
+ def close(code = nil, reason = nil)
+ code ||= 1000
+ reason ||= ''
+
+ unless code == 1000 or (code >= 3000 and code <= 4999)
+ raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
+ "The code must be either 1000, or between 3000 and 4999. " +
+ "#{code} is neither."
+ end
+
+ @ready_state = CLOSING unless @ready_state == CLOSED
+ @driver.close(reason, code)
+ end
+
+ def parse(data)
+ @driver.parse(data)
+ end
+
+ def client_gone
+ finalize_close
+ end
+
+ def alive?
+ @ready_state == OPEN
+ end
+
+ private
+ def open
+ return unless @ready_state == CONNECTING
+ @ready_state = OPEN
+
+ @event_target.on_open
+ end
+
+ def receive_message(data)
+ return unless @ready_state == OPEN
+
+ @event_target.on_message(data)
+ end
+
+ def emit_error(message)
+ return if @ready_state >= CLOSING
+
+ @event_target.on_error(message)
+ end
+
+ def begin_close(reason, code)
+ return if @ready_state == CLOSED
+ @ready_state = CLOSING
+ @close_params = [reason, code]
+
+ if @stream
+ @stream.shutdown
+ else
+ finalize_close
+ end
+ end
+
+ def finalize_close
+ return if @ready_state == CLOSED
+ @ready_state = CLOSED
+
+ reason = @close_params ? @close_params[0] : ''
+ code = @close_params ? @close_params[1] : 1006
+
+ @event_target.on_close(code, reason)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 54ed7672d2..27826792b3 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_channel, callback) }
+ Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
new file mode 100644
index 0000000000..ace250cd16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -0,0 +1,59 @@
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class Stream
+ def initialize(event_loop, socket)
+ @event_loop = event_loop
+ @socket_object = socket
+ @stream_send = socket.env['stream.send']
+
+ @rack_hijack_io = nil
+
+ hijack_rack_socket
+ end
+
+ def each(&callback)
+ @stream_send ||= callback
+ end
+
+ def close
+ shutdown
+ @socket_object.client_gone
+ end
+
+ def shutdown
+ clean_rack_hijack
+ end
+
+ def write(data)
+ return @rack_hijack_io.write(data) if @rack_hijack_io
+ return @stream_send.call(data) if @stream_send
+ rescue EOFError
+ @socket_object.client_gone
+ end
+
+ def receive(data)
+ @socket_object.parse(data)
+ end
+
+ private
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
+
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
+
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+
+ def clean_rack_hijack
+ return unless @rack_hijack_io
+ @event_loop.detach(@rack_hijack_io, self)
+ @rack_hijack_io = nil
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..f773814973
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,68 @@
+require 'nio'
+
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = NIO::Selector.new
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ run
+ end
+ end
+
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = stream
+ @nio.register(io, :r)
+ end
+ @nio.wakeup
+ end
+
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister(io)
+ @map.delete io
+ end
+ @nio.wakeup
+ end
+
+ def stop
+ @stopping = true
+ @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ if monitors = @nio.select
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = @map[io]
+
+ begin
+ stream.receive io.read_nonblock(4096)
+ rescue IO::WaitReadable
+ next
+ rescue EOFError
+ stream.close
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 670d5690ae..5e89fb9b72 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -1,13 +1,11 @@
-require 'faye/websocket'
+require 'websocket/driver'
module ActionCable
module Connection
- # Decorate the Faye::WebSocket with helpers we need.
+ # Wrap the real socket to minimize the externally-presented API
class WebSocket
- delegate :rack_response, :close, :on, to: :websocket
-
- def initialize(env)
- @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ def initialize(env, event_target, stream_event_loop)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
end
def possible?
@@ -15,11 +13,19 @@ module ActionCable
end
def alive?
- websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ websocket && websocket.alive?
end
def transmit(data)
- websocket.send data
+ websocket.transmit data
+ end
+
+ def close
+ websocket.close
+ end
+
+ def rack_response
+ websocket.rack_response
end
protected
diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb
deleted file mode 100644
index dce637b3ca..0000000000
--- a/actioncable/lib/action_cable/process/logging.rb
+++ /dev/null
@@ -1,7 +0,0 @@
-require 'action_cable/server'
-require 'eventmachine'
-
-EM.error_handler do |e|
- puts "Error raised inside the event loop: #{e.message}"
- puts e.backtrace.join("\n")
-end
diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb
index a2a89d5f1e..bd6a3826a3 100644
--- a/actioncable/lib/action_cable/server.rb
+++ b/actioncable/lib/action_cable/server.rb
@@ -1,7 +1,3 @@
-require 'eventmachine'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module Server
extend ActiveSupport::Autoload
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 3385a4c9f3..b00abd208c 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -32,6 +32,10 @@ module ActionCable
@remote_connections ||= RemoteConnections.new(self)
end
+ def stream_event_loop
+ @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ end
+
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 47dcea8c20..8671dd5ebd 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -22,11 +22,9 @@ module ActionCable
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- EM.next_tick do
- @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
- EM.next_tick { connections.map(&:beat) }
- end
- end
+ @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
+ Concurrent.global_io_executor.post { connections.map(&:beat) }
+ end.tap(&:execute)
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
index e770f4fb00..72e62f3daf 100644
--- a/actioncable/lib/action_cable/subscription_adapter.rb
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -1,5 +1,8 @@
module ActionCable
module SubscriptionAdapter
- autoload :Base, 'action_cable/subscription_adapter/base'
+ extend ActiveSupport::Autoload
+
+ autoload :Base
+ autoload :SubscriberMap
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..c88b03947a
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def subscriber_map
+ @subscriber_map ||= AsyncSubscriberMap.new
+ end
+
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ Concurrent.global_io_executor.post { super }
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
index 11910803e8..796db5ffa3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/base.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -19,6 +19,10 @@ module ActionCable
def unsubscribe(channel, message_callback)
raise NotImplementedError
end
+
+ def shutdown
+ raise NotImplementedError
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..4a2a8d23a2
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,26 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ # nothing to do
+ end
+
+ private
+ def subscriber_map
+ @subscriber_map ||= SubscriberMap.new
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 6465663c97..3ce1bbed68 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -12,11 +12,15 @@ module ActionCable
end
def subscribe(channel, callback, success_callback = nil)
- listener.subscribe_to(channel, callback, success_callback)
+ listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
- listener.unsubscribe_from(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ listener.shutdown
end
def with_connection(&block) # :nodoc:
@@ -36,14 +40,14 @@ module ActionCable
@listener ||= Listener.new(self)
end
- class Listener
+ class Listener < SubscriberMap
def initialize(adapter)
+ super()
+
@adapter = adapter
- @subscribers = Hash.new { |h,k| h[k] = [] }
- @sync = Mutex.new
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,46 +55,45 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ Concurrent.global_io_executor << callback if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
end
end
end
end
end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
- @subscribers[channel] << callback
- end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
end
- def unsubscribe_from(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
- end
- end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index d149f28b1f..a035e3988d 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,27 +1,40 @@
+require 'thread'
+
gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
require 'em-hiredis'
require 'redis'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ @@mutex = Mutex.new
+
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
def subscribe(channel, message_callback, success_callback = nil)
redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback(&success_callback) if success_callback
+ result.callback { |reply| success_callback.call } if success_callback
end
end
def unsubscribe(channel, message_callback)
- hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
end
private
def redis_connection_for_subscriptions
+ ensure_reactor_running
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."
@@ -32,6 +45,14 @@ module ActionCable
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+
+ @subscribers[channel] << subscriber
+
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+
+ def remove_channel(channel)
+ end
+
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
+end