From f61467ec5b90ebb75987a13f763b6a19548d84b3 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:16:54 +0200 Subject: Move server classes to its own namespace --- lib/action_cable/server/base.rb | 77 +++++++++++++++++++++++++++++++++++++++ lib/action_cable/server/worker.rb | 32 ++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 lib/action_cable/server/base.rb create mode 100644 lib/action_cable/server/worker.rb (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb new file mode 100644 index 0000000000..6abec92dc1 --- /dev/null +++ b/lib/action_cable/server/base.rb @@ -0,0 +1,77 @@ +module ActionCable + module Server + class Base + cattr_accessor(:logger, instance_reader: true) { Rails.logger } + + attr_accessor :registered_channels, :redis_config, :log_tags + + def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) + @redis_config = redis_config.with_indifferent_access + @registered_channels = Set.new(channels) + @worker_pool_size = worker_pool_size + @connection_class = connection + @log_tags = log_tags + + @connections = [] + + logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" + end + + def call(env) + @connection_class.new(self, env).process + end + + def worker_pool + @worker_pool ||= ActionCable::Server::Worker.pool(size: @worker_pool_size) + end + + def pubsub + @pubsub ||= redis.pubsub + end + + def redis + @redis ||= begin + redis = EM::Hiredis.connect(@redis_config[:url]) + redis.on(:reconnect_failed) do + logger.info "[ActionCable] Redis reconnect failed." + # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." + # @connections.map &:close + end + redis + end + end + + def threaded_redis + @threaded_redis ||= Redis.new(redis_config) + end + + def remote_connections + @remote_connections ||= RemoteConnections.new(self) + end + + def broadcaster_for(channel) + Broadcaster.new(self, channel) + end + + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + + def connection_identifiers + @connection_class.identifiers + end + + def add_connection(connection) + @connections << connection + end + + def remove_connection(connection) + @connections.delete connection + end + + def open_connections_statistics + @connections.map(&:statistics) + end + end + end +end \ No newline at end of file diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb new file mode 100644 index 0000000000..0491cb9ab0 --- /dev/null +++ b/lib/action_cable/server/worker.rb @@ -0,0 +1,32 @@ +module ActionCable + module Server + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def invoke(receiver, method, *args) + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end + + def run_periodic_timer(channel, callback) + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + end + + private + def logger + ActionCable::Server::Base.logger + end + end + end +end \ No newline at end of file -- cgit v1.2.3 From e1a99a83ca135523ff8513be756f156500999cb8 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:24:50 +0200 Subject: Make broadcasting a concern --- lib/action_cable/server/base.rb | 10 ++-------- lib/action_cable/server/broadcasting.rb | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) create mode 100644 lib/action_cable/server/broadcasting.rb (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 6abec92dc1..6dcd282e4a 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -1,6 +1,8 @@ module ActionCable module Server class Base + include ActionCable::Server::Broadcasting + cattr_accessor(:logger, instance_reader: true) { Rails.logger } attr_accessor :registered_channels, :redis_config, :log_tags @@ -49,14 +51,6 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end - def broadcaster_for(channel) - Broadcaster.new(self, channel) - end - - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end - def connection_identifiers @connection_class.identifiers end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb new file mode 100644 index 0000000000..682064571f --- /dev/null +++ b/lib/action_cable/server/broadcasting.rb @@ -0,0 +1,28 @@ +module ActionCable + module Server + module Broadcasting + def broadcaster_for(channel) + Broadcaster.new(self, channel) + end + + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + + class Broadcaster + attr_reader :server, :channel, :redis + delegate :logger, to: :server + + def initialize(server, channel) + @server, @channel = server, channel + @redis = @server.threaded_redis + end + + def broadcast(message) + logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" + redis.publish channel, message.to_json + end + end + end + end +end \ No newline at end of file -- cgit v1.2.3 From 8a2af53c8e83cd9258380fad4007e53f8721aa93 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:36:10 +0200 Subject: More redis used for broadcasting into broadcasting concern --- lib/action_cable/server/base.rb | 4 ---- lib/action_cable/server/broadcasting.rb | 32 ++++++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 6dcd282e4a..e8109b325d 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -43,10 +43,6 @@ module ActionCable end end - def threaded_redis - @threaded_redis ||= Redis.new(redis_config) - end - def remote_connections @remote_connections ||= RemoteConnections.new(self) end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 682064571f..691ec1b486 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,28 +1,32 @@ module ActionCable module Server module Broadcasting + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + def broadcaster_for(channel) Broadcaster.new(self, channel) end - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end + private + def redis_for_threads + @redis_for_threads ||= Redis.new(redis_config) + end - class Broadcaster - attr_reader :server, :channel, :redis - delegate :logger, to: :server + class Broadcaster + def initialize(server, channel) + @server, @channel = server, channel + end - def initialize(server, channel) - @server, @channel = server, channel - @redis = @server.threaded_redis - end + def broadcast(message, log: true) + server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" if log + server.redis_for_threads.publish channel, message.to_json + end - def broadcast(message) - logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" - redis.publish channel, message.to_json + private + attr_reader :server, :channel end - end end end end \ No newline at end of file -- cgit v1.2.3 From a5d6bc0eb527f6cfa61300e70fa9010544240cf9 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:38:05 +0200 Subject: Make the remote connection use the broadcaster as well --- lib/action_cable/server/broadcasting.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 691ec1b486..0d591d03e4 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -19,8 +19,12 @@ module ActionCable @server, @channel = server, channel end - def broadcast(message, log: true) - server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" if log + def broadcast(message) + server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" + broadcast_without_logging(message) + end + + def broadcast_without_logging(message) server.redis_for_threads.publish channel, message.to_json end -- cgit v1.2.3 From c2e2a94306e6b77b0a1dce9b453fbaa04a7f7446 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:42:49 +0200 Subject: Rejig for what's used --- lib/action_cable/server/broadcasting.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 0d591d03e4..3fbaa05039 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -9,12 +9,14 @@ module ActionCable Broadcaster.new(self, channel) end - private - def redis_for_threads - @redis_for_threads ||= Redis.new(redis_config) - end + def broadcasting_redis + @broadcasting_redis ||= Redis.new(redis_config) + end + private class Broadcaster + attr_reader :server, :channel + def initialize(server, channel) @server, @channel = server, channel end @@ -25,11 +27,8 @@ module ActionCable end def broadcast_without_logging(message) - server.redis_for_threads.publish channel, message.to_json + server.broadcasting_redis.publish channel, message.to_json end - - private - attr_reader :server, :channel end end end -- cgit v1.2.3 From 5c4f07d34e82310e2ce9029ddaafb6603435da73 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 21:17:16 +0200 Subject: Introduce Streams as the domain language for the pubsub channels Channels redeliver messages from --- lib/action_cable/server/broadcasting.rb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 3fbaa05039..868d418ece 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,12 +1,12 @@ module ActionCable module Server module Broadcasting - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) + def broadcast(broadcasting, message) + broadcaster_for(broadcasting).broadcast(message) end - def broadcaster_for(channel) - Broadcaster.new(self, channel) + def broadcaster_for(broadcasting) + Broadcaster.new(self, broadcasting) end def broadcasting_redis @@ -15,19 +15,19 @@ module ActionCable private class Broadcaster - attr_reader :server, :channel + attr_reader :server, :broadcasting - def initialize(server, channel) - @server, @channel = server, channel + def initialize(server, broadcasting) + @server, @broadcasting = server, broadcasting end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" broadcast_without_logging(message) end def broadcast_without_logging(message) - server.broadcasting_redis.publish channel, message.to_json + server.broadcasting_redis.publish broadcasting, message.to_json end end end -- cgit v1.2.3 From 10323716a134bb86708f6a65280215f8a7f18a1a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Thu, 2 Jul 2015 15:41:50 +0200 Subject: Expose broadcast_without_logging at the top level --- lib/action_cable/server/broadcasting.rb | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'lib/action_cable/server') diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 868d418ece..b0e51b8ba8 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -5,6 +5,10 @@ module ActionCable broadcaster_for(broadcasting).broadcast(message) end + def broadcast_without_logging(broadcasting, message) + broadcaster_for(broadcasting).broadcast_without_logging(message) + end + def broadcaster_for(broadcasting) Broadcaster.new(self, broadcasting) end -- cgit v1.2.3