From 6aeaed4c1a370084e82c6712a32422a58dac8b8c Mon Sep 17 00:00:00 2001 From: Jon Moss <me@jonathanmoss.me> Date: Fri, 15 Jan 2016 19:03:05 -0500 Subject: All Redis deps are now optional, Postgres --> PostgreSQL adapter --- actioncable/lib/action_cable/storage_adapter.rb | 2 +- .../lib/action_cable/storage_adapter/postgres.rb | 95 ------------------- .../lib/action_cable/storage_adapter/postgresql.rb | 101 +++++++++++++++++++++ .../lib/action_cable/storage_adapter/redis.rb | 8 +- 4 files changed, 108 insertions(+), 98 deletions(-) delete mode 100644 actioncable/lib/action_cable/storage_adapter/postgres.rb create mode 100644 actioncable/lib/action_cable/storage_adapter/postgresql.rb (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb index f1c395eb3a..a4fe12c770 100644 --- a/actioncable/lib/action_cable/storage_adapter.rb +++ b/actioncable/lib/action_cable/storage_adapter.rb @@ -1,7 +1,7 @@ module ActionCable module StorageAdapter autoload :Base, 'action_cable/storage_adapter/base' - autoload :Postgres, 'action_cable/storage_adapter/postgres' + autoload :PostgreSQL, 'action_cable/storage_adapter/postgresql' autoload :Redis, 'action_cable/storage_adapter/redis' end end diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb deleted file mode 100644 index 5d874533be..0000000000 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ /dev/null @@ -1,95 +0,0 @@ -require 'thread' - -module ActionCable - module StorageAdapter - class Postgres < Base - # The storage instance used for broadcasting. Not intended for direct user use. - def broadcast(channel, payload) - with_connection do |pg_conn| - pg_conn.exec("NOTIFY #{channel}, '#{payload}'") - end - end - - def subscribe(channel, callback, success_callback = nil) - listener.subscribe_to(channel, callback, success_callback) - end - - def unsubscribe(channel, callback) - listener.unsubscribe_to(channel, callback) - end - - def with_connection(&block) # :nodoc: - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - - yield pg_conn - end - end - - private - def listener - @listener ||= Listener.new(self) - end - - class Listener - def initialize(adapter) - @adapter = adapter - @subscribers = Hash.new { |h,k| h[k] = [] } - @sync = Mutex.new - @queue = Queue.new - - Thread.new do - Thread.current.abort_on_exception = true - listen - end - end - - def listen - @adapter.with_connection do |pg_conn| - loop do - until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") - end - - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } - end - end - end - end - end - end - - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) - end - - @subscribers[channel] << callback - end - end - - def unsubscribe_to(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) - - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) - end - end - end - end - end - end -end diff --git a/actioncable/lib/action_cable/storage_adapter/postgresql.rb b/actioncable/lib/action_cable/storage_adapter/postgresql.rb new file mode 100644 index 0000000000..1d8460e2ea --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter/postgresql.rb @@ -0,0 +1,101 @@ +require 'thread' + +begin + require 'pg' +rescue Gem::LoadError => e + raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." +end + +module ActionCable + module StorageAdapter + class PostgreSQL < Base + # The storage instance used for broadcasting. Not intended for direct user use. + def broadcast(channel, payload) + with_connection do |pg_conn| + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end + end + + def subscribe(channel, callback, success_callback = nil) + listener.subscribe_to(channel, callback, success_callback) + end + + def unsubscribe(channel, callback) + listener.unsubscribe_to(channel, callback) + end + + def with_connection(&block) # :nodoc: + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + yield pg_conn + end + end + + private + def listener + @listener ||= Listener.new(self) + end + + class Listener + def initialize(adapter) + @adapter = adapter + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + listen + end + end + + def listen + @adapter.with_connection do |pg_conn| + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } + end + end + end + end + end + end + + def subscribe_to(channel, callback, success_callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel, success_callback]) + end + + @subscribers[channel] << callback + end + end + + def unsubscribe_to(channel, callback) + @sync.synchronize do + @subscribers[channel].delete(callback) + + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb index 3e0ede057a..62a3971ec7 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -1,5 +1,9 @@ -require 'em-hiredis' -require 'redis' +begin + require 'em-hiredis' + require 'redis' +rescue Gem::LoadError => e + raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." +end module ActionCable module StorageAdapter -- cgit v1.2.3