aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Moss <me@jonathanmoss.me>2016-01-07 11:28:52 -0500
committerJon Moss <me@jonathanmoss.me>2016-01-18 18:59:00 -0500
commit7b79ae0335b67377636cf2ba7be70a4119ca90cd (patch)
treef96b714284055bec5c48798b3dfca745d6516558
parent439154250ccd75e9d392a1cbb6f0105ea857e6f5 (diff)
downloadrails-7b79ae0335b67377636cf2ba7be70a4119ca90cd.tar.gz
rails-7b79ae0335b67377636cf2ba7be70a4119ca90cd.tar.bz2
rails-7b79ae0335b67377636cf2ba7be70a4119ca90cd.zip
Add Postgres adapter
-rw-r--r--actioncable/actioncable.gemspec1
-rw-r--r--actioncable/lib/action_cable/storage_adapter.rb1
-rw-r--r--actioncable/lib/action_cable/storage_adapter/postgres.rb109
3 files changed, 111 insertions, 0 deletions
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index a04fc932aa..2ab8b4785a 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -28,4 +28,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'puma'
s.add_development_dependency 'mocha'
+ s.add_development_dependency 'pg'
end
diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb
index 991270d2b3..f1c395eb3a 100644
--- a/actioncable/lib/action_cable/storage_adapter.rb
+++ b/actioncable/lib/action_cable/storage_adapter.rb
@@ -1,6 +1,7 @@
module ActionCable
module StorageAdapter
autoload :Base, 'action_cable/storage_adapter/base'
+ autoload :Postgres, 'action_cable/storage_adapter/postgres'
autoload :Redis, 'action_cable/storage_adapter/redis'
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
new file mode 100644
index 0000000000..67bc2cd77a
--- /dev/null
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -0,0 +1,109 @@
+require 'thread'
+
+module ActionCable
+ module StorageAdapter
+ class Postgres < Base
+ # The storage instance used for broadcasting. Not intended for direct user use.
+ def broadcast
+ @broadcast ||= PostgresWrapper.new
+ end
+
+ def pubsub
+ PostgresWrapper.new
+ end
+
+ class Listener
+ include Singleton
+
+ attr_accessor :subscribers
+
+ def initialize
+ @subscribers = Hash.new {|h,k| h[k] = [] }
+ @sync = Mutex.new
+ @queue = Queue.new
+
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
+
+ def listen
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ loop do
+ until @queue.empty?
+ value = @queue.pop(true)
+ if value.first == :listen
+ pg_conn.exec("LISTEN #{value[1]}")
+ elsif value.first == :unlisten
+ pg_conn.exec("UNLISTEN #{value[1]}")
+ end
+ end
+
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ callback.call(message)
+ end
+ end
+ end
+ end
+ end
+
+ def subscribe_to(channel, callback)
+ @sync.synchronize do
+ if @subscribers[channel].empty?
+ @queue.push([:listen, channel])
+ end
+
+ @subscribers[channel] << callback
+ end
+ end
+
+ def unsubscribe_to(channel, callback = nil)
+ @sync.synchronize do
+ if callback
+ @subscribers[channel].delete(callback)
+ else
+ @subscribers.delete(channel)
+ end
+
+ if @subscribers[channel].empty?
+ @queue.push([:unlisten, channel])
+ end
+ end
+ end
+ end
+
+ class PostgresWrapper
+ def publish(channel, message)
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ pg_conn.exec("NOTIFY #{channel}, '#{message}'")
+ end
+ end
+
+ def subscribe(channel, &callback)
+ Listener.instance.subscribe_to(channel, callback)
+ # Needed for channel/streams.rb#L79
+ ::EM::DefaultDeferrable.new
+ end
+
+ def unsubscribe(channel)
+ Listener.instance.unsubscribe_to(channel)
+ end
+
+ def unsubscribe_proc(channel, block)
+ Listener.instance.unsubscribe_to(channel, block)
+ end
+ end
+
+ end
+ end
+end