aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/channel.rb14
-rw-r--r--actioncable/lib/action_cable/channel/base.rb274
-rw-r--r--actioncable/lib/action_cable/channel/broadcasting.rb29
-rw-r--r--actioncable/lib/action_cable/channel/callbacks.rb35
-rw-r--r--actioncable/lib/action_cable/channel/naming.rb22
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb41
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb114
-rw-r--r--actioncable/lib/action_cable/connection.rb16
-rw-r--r--actioncable/lib/action_cable/connection/authorization.rb13
-rw-r--r--actioncable/lib/action_cable/connection/base.rb219
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb46
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb45
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb53
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb75
-rw-r--r--actioncable/lib/action_cable/connection/tagged_logger_proxy.rb40
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb29
-rw-r--r--actioncable/lib/action_cable/engine.rb27
-rw-r--r--actioncable/lib/action_cable/helpers/action_cable_helper.rb29
-rw-r--r--actioncable/lib/action_cable/process/logging.rb12
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb64
-rw-r--r--actioncable/lib/action_cable/server.rb19
-rw-r--r--actioncable/lib/action_cable/server/base.rb74
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb54
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb67
-rw-r--r--actioncable/lib/action_cable/server/connections.rb37
-rw-r--r--actioncable/lib/action_cable/server/worker.rb42
-rw-r--r--actioncable/lib/action_cable/server/worker/active_record_connection_management.rb22
-rw-r--r--actioncable/lib/action_cable/version.rb3
28 files changed, 1515 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/channel.rb b/actioncable/lib/action_cable/channel.rb
new file mode 100644
index 0000000000..7ae262ce5f
--- /dev/null
+++ b/actioncable/lib/action_cable/channel.rb
@@ -0,0 +1,14 @@
+module ActionCable
+ module Channel
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Callbacks
+ autoload :Naming
+ autoload :PeriodicTimers
+ autoload :Streams
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
new file mode 100644
index 0000000000..ca903a810d
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -0,0 +1,274 @@
+require 'set'
+
+module ActionCable
+ module Channel
+ # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
+ # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
+ # responding to the subscriber's direct requests.
+ #
+ # Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then
+ # lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care
+ # not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released
+ # as is normally the case with a controller instance that gets thrown away after every request.
+ #
+ # Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user
+ # record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it.
+ #
+ # The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests
+ # can interact with. Here's a quick example:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # end
+ #
+ # def speak(data)
+ # @room.speak data, user: current_user
+ # end
+ # end
+ #
+ # The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that
+ # subscriber wants to say something in the room.
+ #
+ # == Action processing
+ #
+ # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's an remote-procedure call model. You can
+ # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client.
+ #
+ # Example:
+ #
+ # class AppearanceChannel < ApplicationCable::Channel
+ # def subscribed
+ # @connection_token = generate_connection_token
+ # end
+ #
+ # def unsubscribed
+ # current_user.disappear @connection_token
+ # end
+ #
+ # def appear(data)
+ # current_user.appear @connection_token, on: data['appearing_on']
+ # end
+ #
+ # def away
+ # current_user.away @connection_token
+ # end
+ #
+ # private
+ # def generate_connection_token
+ # SecureRandom.hex(36)
+ # end
+ # end
+ #
+ # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away
+ # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then
+ # uses as part of its model call. #away does not, it's simply a trigger action.
+ #
+ # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection.
+ # All such identifiers will automatically create a delegation method of the same name on the channel instance.
+ #
+ # == Rejecting subscription requests
+ #
+ # A channel can reject a subscription request in the #subscribed callback by invoking #reject!
+ #
+ # Example:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # reject unless current_user.can_access?(@room)
+ # end
+ # end
+ #
+ # In this example, the subscription will be rejected if the current_user does not have access to the chat room.
+ # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request.
+ class Base
+ include Callbacks
+ include PeriodicTimers
+ include Streams
+ include Naming
+ include Broadcasting
+
+ attr_reader :params, :connection, :identifier
+ delegate :logger, to: :connection
+
+ class << self
+ # A list of method names that should be considered actions. This
+ # includes all public instance methods on a channel, less
+ # any internal methods (defined on Base), adding back in
+ # any methods that are internal, but still exist on the class
+ # itself.
+ #
+ # ==== Returns
+ # * <tt>Set</tt> - A set of all methods that should be considered actions.
+ def action_methods
+ @action_methods ||= begin
+ # All public instance methods of this class, including ancestors
+ methods = (public_instance_methods(true) -
+ # Except for public instance methods of Base and its ancestors
+ ActionCable::Channel::Base.public_instance_methods(true) +
+ # Be sure to include shadowed public instance methods of this class
+ public_instance_methods(false)).uniq.map(&:to_s)
+ methods.to_set
+ end
+ end
+
+ protected
+ # action_methods are cached and there is sometimes need to refresh
+ # them. ::clear_action_methods! allows you to do that, so next time
+ # you run action_methods, they will be recalculated
+ def clear_action_methods!
+ @action_methods = nil
+ end
+
+ # Refresh the cached action_methods when a new action_method is added.
+ def method_added(name)
+ super
+ clear_action_methods!
+ end
+ end
+
+ def initialize(connection, identifier, params = {})
+ @connection = connection
+ @identifier = identifier
+ @params = params
+
+ # When a channel is streaming via redis pubsub, we want to delay the confirmation
+ # transmission until redis pubsub subscription is confirmed.
+ @defer_subscription_confirmation = false
+
+ delegate_connection_identifiers
+ subscribe_to_channel
+ end
+
+ # Extract the action name from the passed data and process it via the channel. The process will ensure
+ # that the action requested is a public method on the channel declared by the user (so not one of the callbacks
+ # like #subscribed).
+ def perform_action(data)
+ action = extract_action(data)
+
+ if processable_action?(action)
+ dispatch_action(action, data)
+ else
+ logger.error "Unable to process #{action_signature(action, data)}"
+ end
+ end
+
+ # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
+ # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
+ def unsubscribe_from_channel
+ run_callbacks :unsubscribe do
+ unsubscribed
+ end
+ end
+
+
+ protected
+ # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams
+ # you want this channel to be sending to the subscriber.
+ def subscribed
+ # Override in subclasses
+ end
+
+ # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
+ # people as offline or the like.
+ def unsubscribed
+ # Override in subclasses
+ end
+
+ # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
+ # the proper channel identifier marked as the recipient.
+ def transmit(data, via: nil)
+ logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+ end
+
+ def defer_subscription_confirmation!
+ @defer_subscription_confirmation = true
+ end
+
+ def defer_subscription_confirmation?
+ @defer_subscription_confirmation
+ end
+
+ def subscription_confirmation_sent?
+ @subscription_confirmation_sent
+ end
+
+ def reject
+ @reject_subscription = true
+ end
+
+ def subscription_rejected?
+ @reject_subscription
+ end
+
+ private
+ def delegate_connection_identifiers
+ connection.identifiers.each do |identifier|
+ define_singleton_method(identifier) do
+ connection.send(identifier)
+ end
+ end
+ end
+
+
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ if subscription_rejected?
+ reject_subscription
+ else
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+ end
+
+
+ def extract_action(data)
+ (data['action'].presence || :receive).to_sym
+ end
+
+ def processable_action?(action)
+ self.class.action_methods.include?(action.to_s)
+ end
+
+ def dispatch_action(action, data)
+ logger.info action_signature(action, data)
+
+ if method(action).arity == 1
+ public_send action, data
+ else
+ public_send action
+ end
+ end
+
+ def action_signature(action, data)
+ "#{self.class.name}##{action}".tap do |signature|
+ if (arguments = data.except('action')).any?
+ signature << "(#{arguments.inspect})"
+ end
+ end
+ end
+
+ def transmit_subscription_confirmation
+ unless subscription_confirmation_sent?
+ logger.info "#{self.class.name} is transmitting the subscription confirmation"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
+ @subscription_confirmation_sent = true
+ end
+ end
+
+ def reject_subscription
+ connection.subscriptions.remove_subscription self
+ transmit_subscription_rejection
+ end
+
+ def transmit_subscription_rejection
+ logger.info "#{self.class.name} is transmitting the subscription rejection"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb
new file mode 100644
index 0000000000..afc23d7d1a
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/broadcasting.rb
@@ -0,0 +1,29 @@
+require 'active_support/core_ext/object/to_param'
+
+module ActionCable
+ module Channel
+ module Broadcasting
+ extend ActiveSupport::Concern
+
+ delegate :broadcasting_for, to: :class
+
+ class_methods do
+ # Broadcast a hash to a unique broadcasting for this <tt>model</tt> in this channel.
+ def broadcast_to(model, message)
+ ActionCable.server.broadcast(broadcasting_for([ channel_name, model ]), message)
+ end
+
+ def broadcasting_for(model) #:nodoc:
+ case
+ when model.is_a?(Array)
+ model.map { |m| broadcasting_for(m) }.join(':')
+ when model.respond_to?(:to_gid_param)
+ model.to_gid_param
+ else
+ model.to_param
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb
new file mode 100644
index 0000000000..295d750e86
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/callbacks.rb
@@ -0,0 +1,35 @@
+require 'active_support/callbacks'
+
+module ActionCable
+ module Channel
+ module Callbacks
+ extend ActiveSupport::Concern
+ include ActiveSupport::Callbacks
+
+ included do
+ define_callbacks :subscribe
+ define_callbacks :unsubscribe
+ end
+
+ class_methods do
+ def before_subscribe(*methods, &block)
+ set_callback(:subscribe, :before, *methods, &block)
+ end
+
+ def after_subscribe(*methods, &block)
+ set_callback(:subscribe, :after, *methods, &block)
+ end
+ alias_method :on_subscribe, :after_subscribe
+
+ def before_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :before, *methods, &block)
+ end
+
+ def after_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :after, *methods, &block)
+ end
+ alias_method :on_unsubscribe, :after_unsubscribe
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb
new file mode 100644
index 0000000000..4c9d53b15a
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/naming.rb
@@ -0,0 +1,22 @@
+module ActionCable
+ module Channel
+ module Naming
+ extend ActiveSupport::Concern
+
+ class_methods do
+ # Returns the name of the channel, underscored, without the <tt>Channel</tt> ending.
+ # If the channel is in a namespace, then the namespaces are represented by single
+ # colon separators in the channel name.
+ #
+ # ChatChannel.channel_name # => 'chat'
+ # Chats::AppearancesChannel.channel_name # => 'chats:appearances'
+ def channel_name
+ @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore
+ end
+ end
+
+ # Delegates to the class' <tt>channel_name</tt>
+ delegate :channel_name, to: :class
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
new file mode 100644
index 0000000000..25fe8e5e54
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -0,0 +1,41 @@
+module ActionCable
+ module Channel
+ module PeriodicTimers
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :periodic_timers, instance_reader: false
+ self.periodic_timers = []
+
+ after_subscribe :start_periodic_timers
+ after_unsubscribe :stop_periodic_timers
+ end
+
+ module ClassMethods
+ # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
+ # for sending a steady flow of updates to a client based off an object that was configured on subscription.
+ # It's an alternative to using streams if the channel is able to do the work internally.
+ def periodically(callback, every:)
+ self.periodic_timers += [ [ callback, every: every ] ]
+ end
+ end
+
+ private
+ def active_periodic_timers
+ @active_periodic_timers ||= []
+ end
+
+ def start_periodic_timers
+ self.class.periodic_timers.each do |callback, options|
+ active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
+ connection.worker_pool.async.run_periodic_timer(self, callback)
+ end
+ end
+ end
+
+ def stop_periodic_timers
+ active_periodic_timers.each { |timer| timer.cancel }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
new file mode 100644
index 0000000000..b5ffa17f72
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -0,0 +1,114 @@
+module ActionCable
+ module Channel
+ # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data
+ # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
+ # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later.
+ #
+ # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
+ # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
+ # comments on a given page:
+ #
+ # class CommentsChannel < ApplicationCable::Channel
+ # def follow(data)
+ # stream_from "comments_for_#{data['recording_id']}"
+ # end
+ #
+ # def unfollow
+ # stop_all_streams
+ # end
+ # end
+ #
+ # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there.
+ # That looks like so from that side of things:
+ #
+ # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
+ #
+ # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
+ # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`
+ #
+ # class CommentsChannel < ApplicationCable::Channel
+ # def subscribed
+ # post = Post.find(params[:id])
+ # stream_for post
+ # end
+ # end
+ #
+ # You can then broadcast to this channel using:
+ #
+ # CommentsChannel.broadcast_to(@post, @comment)
+ #
+ # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out.
+ # Example below shows how you can use this to provide performance introspection in the process:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ #
+ # stream_for @room, -> (encoded_message) do
+ # message = ActiveSupport::JSON.decode(encoded_message)
+ #
+ # if message['originated_at'].present?
+ # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
+ #
+ # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
+ # logger.info "Message took #{elapsed_time}s to arrive"
+ # end
+ #
+ # transmit message
+ # end
+ # end
+ #
+ # You can stop streaming from all broadcasts by calling #stop_all_streams.
+ module Streams
+ extend ActiveSupport::Concern
+
+ included do
+ on_unsubscribe :stop_all_streams
+ end
+
+ # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
+ # instead of the default of just transmitting the updates straight to the subscriber.
+ def stream_from(broadcasting, callback = nil)
+ # Hold off the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+
+ callback ||= default_stream_callback(broadcasting)
+ streams << [ broadcasting, callback ]
+
+ EM.next_tick do
+ pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ transmit_subscription_confirmation
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end
+ end
+ end
+
+ # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
+ # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
+ # to the subscriber.
+ def stream_for(model, callback = nil)
+ stream_from(broadcasting_for([ channel_name, model ]), callback)
+ end
+
+ def stop_all_streams
+ streams.each do |broadcasting, callback|
+ pubsub.unsubscribe_proc broadcasting, callback
+ logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
+ end.clear
+ end
+
+ private
+ delegate :pubsub, to: :connection
+
+ def streams
+ @_streams ||= []
+ end
+
+ def default_stream_callback(broadcasting)
+ -> (message) do
+ transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
new file mode 100644
index 0000000000..b672e00682
--- /dev/null
+++ b/actioncable/lib/action_cable/connection.rb
@@ -0,0 +1,16 @@
+module ActionCable
+ module Connection
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Authorization
+ autoload :Base
+ autoload :Identification
+ autoload :InternalChannel
+ autoload :MessageBuffer
+ autoload :WebSocket
+ autoload :Subscriptions
+ autoload :TaggedLoggerProxy
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb
new file mode 100644
index 0000000000..070a70e4e2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/authorization.rb
@@ -0,0 +1,13 @@
+module ActionCable
+ module Connection
+ module Authorization
+ class UnauthorizedError < StandardError; end
+
+ private
+ def reject_unauthorized_connection
+ logger.error "An unauthorized connection attempt was rejected"
+ raise UnauthorizedError
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
new file mode 100644
index 0000000000..7e9eec7508
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -0,0 +1,219 @@
+require 'action_dispatch'
+
+module ActionCable
+ module Connection
+ # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
+ # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # authentication and authorization.
+ #
+ # Here's a basic example:
+ #
+ # module ApplicationCable
+ # class Connection < ActionCable::Connection::Base
+ # identified_by :current_user
+ #
+ # def connect
+ # self.current_user = find_verified_user
+ # logger.add_tags current_user.name
+ # end
+ #
+ # def disconnect
+ # # Any cleanup work needed when the cable connection is cut.
+ # end
+ #
+ # protected
+ # def find_verified_user
+ # if current_user = User.find_by_identity cookies.signed[:identity_id]
+ # current_user
+ # else
+ # reject_unauthorized_connection
+ # end
+ # end
+ # end
+ # end
+ #
+ # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
+ # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
+ # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
+ #
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
+ #
+ # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ #
+ # Pretty simple, eh?
+ class Base
+ include Identification
+ include InternalChannel
+ include Authorization
+
+ attr_reader :server, :env, :subscriptions
+ delegate :worker_pool, :pubsub, to: :server
+
+ attr_reader :logger
+
+ def initialize(server, env)
+ @server, @env = server, env
+
+ @logger = new_tagged_logger
+
+ @websocket = ActionCable::Connection::WebSocket.new(env)
+ @subscriptions = ActionCable::Connection::Subscriptions.new(self)
+ @message_buffer = ActionCable::Connection::MessageBuffer.new(self)
+
+ @started_at = Time.now
+ end
+
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
+ def process
+ logger.info started_request_message
+
+ if websocket.possible? && allow_request_origin?
+ websocket.on(:open) { |event| send_async :on_open }
+ websocket.on(:message) { |event| on_message event.data }
+ websocket.on(:close) { |event| send_async :on_close }
+
+ respond_to_successful_request
+ else
+ respond_to_invalid_request
+ end
+ end
+
+ # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
+ # The data is routed to the proper channel that the connection has subscribed to.
+ def receive(data_in_json)
+ if websocket.alive?
+ subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
+ else
+ logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
+ end
+ end
+
+ # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
+ # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
+ def transmit(data)
+ websocket.transmit data
+ end
+
+ # Close the WebSocket connection.
+ def close
+ websocket.close
+ end
+
+ # Invoke a method on the connection asynchronously through the pool of thread workers.
+ def send_async(method, *arguments)
+ worker_pool.async.invoke(self, method, *arguments)
+ end
+
+ # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
+ # This can be returned by a health check against the connection.
+ def statistics
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+
+ def beat
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
+ end
+
+
+ protected
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
+ def request
+ @request ||= begin
+ environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
+ ActionDispatch::Request.new(environment || env)
+ end
+ end
+
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
+ def cookies
+ request.cookie_jar
+ end
+
+
+ private
+ attr_reader :websocket
+ attr_reader :message_buffer
+
+ def on_open
+ connect if respond_to?(:connect)
+ subscribe_to_internal_channel
+ beat
+
+ message_buffer.process!
+ server.add_connection(self)
+ rescue ActionCable::Connection::Authorization::UnauthorizedError
+ respond_to_invalid_request
+ end
+
+ def on_message(message)
+ message_buffer.append message
+ end
+
+ def on_close
+ logger.info finished_request_message
+
+ server.remove_connection(self)
+
+ subscriptions.unsubscribe_from_all
+ unsubscribe_from_internal_channel
+
+ disconnect if respond_to?(:disconnect)
+ end
+
+
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+
+ if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] }
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+
+ def respond_to_successful_request
+ websocket.rack_response
+ end
+
+ def respond_to_invalid_request
+ close if websocket.alive?
+
+ logger.info finished_request_message
+ [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
+ end
+
+
+ # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
+ def new_tagged_logger
+ TaggedLoggerProxy.new server.logger,
+ tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
+ end
+
+ def started_request_message
+ 'Started %s "%s"%s for %s at %s' % [
+ request.request_method,
+ request.filtered_path,
+ websocket.possible? ? ' [WebSocket]' : '',
+ request.ip,
+ Time.now.to_s ]
+ end
+
+ def finished_request_message
+ 'Finished "%s"%s for %s at %s' % [
+ request.filtered_path,
+ websocket.possible? ? ' [WebSocket]' : '',
+ request.ip,
+ Time.now.to_s ]
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
new file mode 100644
index 0000000000..2d75ff8d6d
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -0,0 +1,46 @@
+require 'set'
+
+module ActionCable
+ module Connection
+ module Identification
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :identifiers
+ self.identifiers = Set.new
+ end
+
+ class_methods do
+ # Mark a key as being a connection identifier index that can then used to find the specific connection again later.
+ # Common identifiers are current_user and current_account, but could be anything really.
+ #
+ # Note that anything marked as an identifier will automatically create a delegate by the same name on any
+ # channel instances created off the connection.
+ def identified_by(*identifiers)
+ Array(identifiers).each { |identifier| attr_accessor identifier }
+ self.identifiers += identifiers
+ end
+ end
+
+ # Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
+ def connection_identifier
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
+ end
+
+ private
+ def connection_gid(ids)
+ ids.map do |o|
+ if o.respond_to? :to_gid_param
+ o.to_gid_param
+ else
+ o.to_s
+ end
+ end.sort.join(":")
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
new file mode 100644
index 0000000000..c065a24ab7
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -0,0 +1,45 @@
+module ActionCable
+ module Connection
+ # Makes it possible for the RemoteConnection to disconnect a specific connection.
+ module InternalChannel
+ extend ActiveSupport::Concern
+
+ private
+ def internal_redis_channel
+ "action_cable/#{connection_identifier}"
+ end
+
+ def subscribe_to_internal_channel
+ if connection_identifier.present?
+ callback = -> (message) { process_internal_message(message) }
+ @_internal_redis_subscriptions ||= []
+ @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+
+ EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
+ logger.info "Registered connection (#{connection_identifier})"
+ end
+ end
+
+ def unsubscribe_from_internal_channel
+ if @_internal_redis_subscriptions.present?
+ @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ end
+ end
+
+ def process_internal_message(message)
+ message = ActiveSupport::JSON.decode(message)
+
+ case message['type']
+ when 'disconnect'
+ logger.info "Removing connection (#{connection_identifier})"
+ websocket.close
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ close
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
new file mode 100644
index 0000000000..25cff75b41
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module Connection
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
+ # Entirely internal operation and should not be used directly by the user.
+ class MessageBuffer
+ def initialize(connection)
+ @connection = connection
+ @buffered_messages = []
+ end
+
+ def append(message)
+ if valid? message
+ if processing?
+ receive message
+ else
+ buffer message
+ end
+ else
+ connection.logger.error "Couldn't handle non-string message: #{message.class}"
+ end
+ end
+
+ def processing?
+ @processing
+ end
+
+ def process!
+ @processing = true
+ receive_buffered_messages
+ end
+
+ private
+ attr_reader :connection
+ attr_accessor :buffered_messages
+
+ def valid?(message)
+ message.is_a?(String)
+ end
+
+ def receive(message)
+ connection.send_async :receive, message
+ end
+
+ def buffer(message)
+ buffered_messages << message
+ end
+
+ def receive_buffered_messages
+ receive buffered_messages.shift until buffered_messages.empty?
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
new file mode 100644
index 0000000000..6199db4898
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -0,0 +1,75 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
+module ActionCable
+ module Connection
+ # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
+ # the connection to the proper channel. Should not be used directly by the user.
+ class Subscriptions
+ def initialize(connection)
+ @connection = connection
+ @subscriptions = {}
+ end
+
+ def execute_command(data)
+ case data['command']
+ when 'subscribe' then add data
+ when 'unsubscribe' then remove data
+ when 'message' then perform_action data
+ else
+ logger.error "Received unrecognized command in #{data.inspect}"
+ end
+ rescue Exception => e
+ logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
+ end
+
+ def add(data)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+
+ subscription_klass = connection.server.channel_classes[id_options[:channel]]
+
+ if subscription_klass
+ subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ else
+ logger.error "Subscription class not found (#{data.inspect})"
+ end
+ end
+
+ def remove(data)
+ logger.info "Unsubscribing from channel: #{data['identifier']}"
+ remove_subscription subscriptions[data['identifier']]
+ end
+
+ def remove_subscription(subscription)
+ subscription.unsubscribe_from_channel
+ subscriptions.delete(subscription.identifier)
+ end
+
+ def perform_action(data)
+ find(data).perform_action ActiveSupport::JSON.decode(data['data'])
+ end
+
+
+ def identifiers
+ subscriptions.keys
+ end
+
+ def unsubscribe_from_all
+ subscriptions.each { |id, channel| channel.unsubscribe_from_channel }
+ end
+
+
+ private
+ attr_reader :connection, :subscriptions
+ delegate :logger, to: :connection
+
+ def find(data)
+ if subscription = subscriptions[data['identifier']]
+ subscription
+ else
+ raise "Unable to find subscription with identifier: #{data['identifier']}"
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
new file mode 100644
index 0000000000..e5319087fb
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -0,0 +1,40 @@
+module ActionCable
+ module Connection
+ # Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional
+ # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
+ # The connection is long-lived, so it needs its own set of tags for its independent duration.
+ class TaggedLoggerProxy
+ attr_reader :tags
+
+ def initialize(logger, tags:)
+ @logger = logger
+ @tags = tags.flatten
+ end
+
+ def add_tags(*tags)
+ @tags += tags.flatten
+ @tags = @tags.uniq
+ end
+
+ def tag(logger)
+ if logger.respond_to?(:tagged)
+ current_tags = tags - logger.formatter.current_tags
+ logger.tagged(*current_tags) { yield }
+ else
+ yield
+ end
+ end
+
+ %i( debug info warn error fatal unknown ).each do |severity|
+ define_method(severity) do |message|
+ log severity, message
+ end
+ end
+
+ protected
+ def log(type, message)
+ tag(@logger) { @logger.send type, message }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
new file mode 100644
index 0000000000..169b683b8c
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -0,0 +1,29 @@
+require 'faye/websocket'
+
+module ActionCable
+ module Connection
+ # Decorate the Faye::WebSocket with helpers we need.
+ class WebSocket
+ delegate :rack_response, :close, :on, to: :websocket
+
+ def initialize(env)
+ @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ end
+
+ def possible?
+ websocket
+ end
+
+ def alive?
+ websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ websocket.send data
+ end
+
+ private
+ attr_reader :websocket
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
new file mode 100644
index 0000000000..4777c3886b
--- /dev/null
+++ b/actioncable/lib/action_cable/engine.rb
@@ -0,0 +1,27 @@
+require 'rails/engine'
+require 'active_support/ordered_options'
+require 'action_cable/helpers/action_cable_helper'
+
+module ActionCable
+ class Engine < ::Rails::Engine
+ config.action_cable = ActiveSupport::OrderedOptions.new
+
+ config.to_prepare do
+ ApplicationController.helper ActionCable::Helpers::ActionCableHelper
+ end
+
+ initializer "action_cable.logger" do
+ ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger }
+ end
+
+ initializer "action_cable.set_configs" do |app|
+ options = app.config.action_cable
+
+ options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
+
+ ActiveSupport.on_load(:action_cable) do
+ options.each { |k,v| send("#{k}=", v) }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
new file mode 100644
index 0000000000..b82751468a
--- /dev/null
+++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
@@ -0,0 +1,29 @@
+module ActionCable
+ module Helpers
+ module ActionCableHelper
+ # Returns an "action-cable-url" meta tag with the value of the url specified in your
+ # configuration. Ensure this is above your javascript tag:
+ #
+ # <head>
+ # <%= action_cable_meta_tag %>
+ # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %>
+ # </head>
+ #
+ # This is then used by ActionCable to determine the url of your websocket server.
+ # Your CoffeeScript can then connect to the server without needing to specify the
+ # url directly:
+ #
+ # #= require cable
+ # @App = {}
+ # App.cable = Cable.createConsumer()
+ #
+ # Make sure to specify the correct server location in each of your environments
+ # config file:
+ #
+ # config.action_cable.url = "ws://example.com:28080"
+ def action_cable_meta_tag
+ tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb
new file mode 100644
index 0000000000..618ba7357a
--- /dev/null
+++ b/actioncable/lib/action_cable/process/logging.rb
@@ -0,0 +1,12 @@
+require 'action_cable/server'
+require 'eventmachine'
+require 'celluloid'
+
+EM.error_handler do |e|
+ puts "Error raised inside the event loop: #{e.message}"
+ puts e.backtrace.join("\n")
+end
+
+Celluloid.logger = ActionCable.server.logger
+
+ActionCable.server.config.log_to_stdout if Rails.env.development? \ No newline at end of file
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
new file mode 100644
index 0000000000..1230d905ad
--- /dev/null
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -0,0 +1,64 @@
+module ActionCable
+ # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by
+ # searching the identifier declared on the connection. Example:
+ #
+ # module ApplicationCable
+ # class Connection < ActionCable::Connection::Base
+ # identified_by :current_user
+ # ....
+ # end
+ # end
+ #
+ # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
+ #
+ # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses
+ # the internal channel that all these servers are subscribed to).
+ class RemoteConnections
+ attr_reader :server
+
+ def initialize(server)
+ @server = server
+ end
+
+ def where(identifier)
+ RemoteConnection.new(server, identifier)
+ end
+
+ private
+ # Represents a single remote connection found via ActionCable.server.remote_connections.where(*).
+ # Exists for the solely for the purpose of calling #disconnect on that connection.
+ class RemoteConnection
+ class InvalidIdentifiersError < StandardError; end
+
+ include Connection::Identification, Connection::InternalChannel
+
+ def initialize(server, ids)
+ @server = server
+ set_identifier_instance_vars(ids)
+ end
+
+ # Uses the internal channel to disconnect the connection.
+ def disconnect
+ server.broadcast internal_redis_channel, type: 'disconnect'
+ end
+
+ # Returns all the identifiers that were applied to this connection.
+ def identifiers
+ server.connection_identifiers
+ end
+
+ private
+ attr_reader :server
+
+ def set_identifier_instance_vars(ids)
+ raise InvalidIdentifiersError unless valid_identifiers?(ids)
+ ids.each { |k,v| instance_variable_set("@#{k}", v) }
+ end
+
+ def valid_identifiers?(ids)
+ keys = ids.keys
+ identifiers.all? { |id| keys.include?(id) }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb
new file mode 100644
index 0000000000..a2a89d5f1e
--- /dev/null
+++ b/actioncable/lib/action_cable/server.rb
@@ -0,0 +1,19 @@
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module Server
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Connections
+ autoload :Configuration
+
+ autoload :Worker
+ autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management'
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
new file mode 100644
index 0000000000..f1585dc776
--- /dev/null
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -0,0 +1,74 @@
+require 'em-hiredis'
+
+module ActionCable
+ module Server
+ # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
+ # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers.
+ #
+ # Also, this is the server instance used for broadcasting. See Broadcasting for details.
+ class Base
+ include ActionCable::Server::Broadcasting
+ include ActionCable::Server::Connections
+
+ cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }
+
+ def self.logger; config.logger; end
+ delegate :logger, to: :config
+
+ def initialize
+ end
+
+ # Called by rack to setup the server.
+ def call(env)
+ setup_heartbeat_timer
+ config.connection_class.new(self, env).process
+ end
+
+ # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
+ def disconnect(identifiers)
+ remote_connections.where(identifiers).disconnect
+ end
+
+ # Gateway to RemoteConnections. See that class for details.
+ def remote_connections
+ @remote_connections ||= RemoteConnections.new(self)
+ end
+
+ # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
+ def worker_pool
+ @worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
+ end
+
+ # Requires and returns an hash of all the channel class constants keyed by name.
+ def channel_classes
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
+ end
+
+ # The redis pubsub adapter used for all streams/broadcasting.
+ def pubsub
+ @pubsub ||= redis.pubsub
+ end
+
+ # The EventMachine Redis instance used by the pubsub adapter.
+ def redis
+ @redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ logger.info "[ActionCable] Redis reconnect failed."
+ # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
+ # @connections.map &:close
+ end
+ end
+ end
+
+ # All the identifiers applied to the connection class associated with this server.
+ def connection_identifiers
+ config.connection_class.identifiers
+ end
+ end
+
+ ActiveSupport.run_load_hooks(:action_cable, Base.config)
+ end
+end
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
new file mode 100644
index 0000000000..6e0fbae387
--- /dev/null
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -0,0 +1,54 @@
+require 'redis'
+
+module ActionCable
+ module Server
+ # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
+ # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
+ #
+ # class WebNotificationsChannel < ApplicationCable::Channel
+ # def subscribed
+ # stream_from "web_notifications_#{current_user.id}"
+ # end
+ # end
+ #
+ # # Somewhere in your app this is called, perhaps from a NewCommentJob
+ # ActionCable.server.broadcast \
+ # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' }
+ #
+ # # Client-side coffescript which assumes you've already requested the right to send web notifications
+ # App.cable.subscriptions.create "WebNotificationsChannel",
+ # received: (data) ->
+ # new Notification data['title'], body: data['body']
+ module Broadcasting
+ # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded.
+ def broadcast(broadcasting, message)
+ broadcaster_for(broadcasting).broadcast(message)
+ end
+
+ # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that
+ # may need multiple spots to transmit to a specific broadcasting over and over.
+ def broadcaster_for(broadcasting)
+ Broadcaster.new(self, broadcasting)
+ end
+
+ # The redis instance used for broadcasting. Not intended for direct user use.
+ def broadcasting_redis
+ @broadcasting_redis ||= Redis.new(config.redis)
+ end
+
+ private
+ class Broadcaster
+ attr_reader :server, :broadcasting
+
+ def initialize(server, broadcasting)
+ @server, @broadcasting = server, broadcasting
+ end
+
+ def broadcast(message)
+ server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
+ server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
new file mode 100644
index 0000000000..f7fcee019b
--- /dev/null
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -0,0 +1,67 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
+module ActionCable
+ module Server
+ # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points
+ # in a Rails config initializer.
+ class Configuration
+ attr_accessor :logger, :log_tags
+ attr_accessor :connection_class, :worker_pool_size
+ attr_accessor :redis_path, :channels_path
+ attr_accessor :disable_request_forgery_protection, :allowed_request_origins
+ attr_accessor :url
+
+ def initialize
+ @logger = Rails.logger
+ @log_tags = []
+
+ @connection_class = ApplicationCable::Connection
+ @worker_pool_size = 100
+
+ @redis_path = Rails.root.join('config/redis/cable.yml')
+ @channels_path = Rails.root.join('app/channels')
+
+ @disable_request_forgery_protection = false
+ end
+
+ def log_to_stdout
+ console = ActiveSupport::Logger.new($stdout)
+ console.formatter = @logger.formatter
+ console.level = @logger.level
+
+ @logger.extend(ActiveSupport::Logger.broadcast(console))
+ end
+
+ def channel_paths
+ @channels ||= Dir["#{channels_path}/**/*_channel.rb"]
+ end
+
+ def channel_class_names
+ @channel_class_names ||= channel_paths.collect do |channel_path|
+ Pathname.new(channel_path).basename.to_s.split('.').first.camelize
+ end
+ end
+
+ def redis
+ @redis ||= config_for(redis_path).with_indifferent_access
+ end
+
+ private
+ # FIXME: Extract this from Rails::Application in a way it can be used here.
+ def config_for(path)
+ if path.exist?
+ require "yaml"
+ require "erb"
+ (YAML.load(ERB.new(path.read).result) || {})[Rails.env] || {}
+ else
+ raise "Could not load configuration. No such file - #{path}"
+ end
+ rescue Psych::SyntaxError => e
+ raise "YAML syntax error occurred while parsing #{path}. " \
+ "Please note that YAML must be consistently indented using spaces. Tabs are not allowed. " \
+ "Error: #{e.message}"
+ end
+ end
+ end
+end
+
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
new file mode 100644
index 0000000000..47dcea8c20
--- /dev/null
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -0,0 +1,37 @@
+module ActionCable
+ module Server
+ # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so
+ # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
+ # As such, this is primarily for internal use.
+ module Connections
+ BEAT_INTERVAL = 3
+
+ def connections
+ @connections ||= []
+ end
+
+ def add_connection(connection)
+ connections << connection
+ end
+
+ def remove_connection(connection)
+ connections.delete connection
+ end
+
+ # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
+ # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
+ # disconnect.
+ def setup_heartbeat_timer
+ EM.next_tick do
+ @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
+ EM.next_tick { connections.map(&:beat) }
+ end
+ end
+ end
+
+ def open_connections_statistics
+ connections.map(&:statistics)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
new file mode 100644
index 0000000000..e063b2a2e1
--- /dev/null
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -0,0 +1,42 @@
+require 'celluloid'
+require 'active_support/callbacks'
+
+module ActionCable
+ module Server
+ # Worker used by Server.send_async to do connection work in threads. Only for internal use.
+ class Worker
+ include ActiveSupport::Callbacks
+ include Celluloid
+
+ attr_reader :connection
+ define_callbacks :work
+ include ActiveRecordConnectionManagement
+
+ def invoke(receiver, method, *args)
+ @connection = receiver
+
+ run_callbacks :work do
+ receiver.send method, *args
+ end
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ end
+
+ def run_periodic_timer(channel, callback)
+ @connection = channel.connection
+
+ run_callbacks :work do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
+ end
+ end
+
+ private
+ def logger
+ ActionCable.server.logger
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
new file mode 100644
index 0000000000..ecece4e270
--- /dev/null
+++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -0,0 +1,22 @@
+module ActionCable
+ module Server
+ class Worker
+ # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
+ module ActiveRecordConnectionManagement
+ extend ActiveSupport::Concern
+
+ included do
+ if defined?(ActiveRecord::Base)
+ set_callback :work, :around, :with_database_connections
+ end
+ end
+
+ def with_database_connections
+ connection.logger.tag(ActiveRecord::Base.logger) { yield }
+ ensure
+ ActiveRecord::Base.clear_active_connections!
+ end
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/actioncable/lib/action_cable/version.rb b/actioncable/lib/action_cable/version.rb
new file mode 100644
index 0000000000..4947029dcc
--- /dev/null
+++ b/actioncable/lib/action_cable/version.rb
@@ -0,0 +1,3 @@
+module ActionCable
+ VERSION = '0.0.3'
+end \ No newline at end of file