From 7b79ae0335b67377636cf2ba7be70a4119ca90cd Mon Sep 17 00:00:00 2001
From: Jon Moss <me@jonathanmoss.me>
Date: Thu, 7 Jan 2016 11:28:52 -0500
Subject: Add Postgres adapter

---
 actioncable/actioncable.gemspec                    |   1 +
 actioncable/lib/action_cable/storage_adapter.rb    |   1 +
 .../lib/action_cable/storage_adapter/postgres.rb   | 109 +++++++++++++++++++++
 3 files changed, 111 insertions(+)
 create mode 100644 actioncable/lib/action_cable/storage_adapter/postgres.rb

(limited to 'actioncable')

diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index a04fc932aa..2ab8b4785a 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -28,4 +28,5 @@ Gem::Specification.new do |s|
 
   s.add_development_dependency 'puma'
   s.add_development_dependency 'mocha'
+  s.add_development_dependency 'pg'
 end
diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb
index 991270d2b3..f1c395eb3a 100644
--- a/actioncable/lib/action_cable/storage_adapter.rb
+++ b/actioncable/lib/action_cable/storage_adapter.rb
@@ -1,6 +1,7 @@
 module ActionCable
   module StorageAdapter
     autoload :Base, 'action_cable/storage_adapter/base'
+    autoload :Postgres, 'action_cable/storage_adapter/postgres'
     autoload :Redis, 'action_cable/storage_adapter/redis'
   end
 end
diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb
new file mode 100644
index 0000000000..67bc2cd77a
--- /dev/null
+++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb
@@ -0,0 +1,109 @@
+require 'thread'
+
+module ActionCable
+  module StorageAdapter
+    class Postgres < Base
+      # The storage instance used for broadcasting. Not intended for direct user use.
+      def broadcast
+        @broadcast ||= PostgresWrapper.new
+      end
+
+      def pubsub
+        PostgresWrapper.new
+      end
+
+      class Listener
+        include Singleton
+
+        attr_accessor :subscribers
+
+        def initialize
+          @subscribers = Hash.new {|h,k| h[k] = [] }
+          @sync = Mutex.new
+          @queue = Queue.new
+
+          Thread.new do
+            Thread.current.abort_on_exception = true
+            listen
+          end
+        end
+
+        def listen
+          ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+            pg_conn = ar_conn.raw_connection
+
+            loop do
+              until @queue.empty?
+                value = @queue.pop(true)
+                if value.first == :listen
+                  pg_conn.exec("LISTEN #{value[1]}")
+                elsif value.first == :unlisten
+                  pg_conn.exec("UNLISTEN #{value[1]}")
+                end
+              end
+
+              pg_conn.wait_for_notify(1) do |chan, pid, message|
+                @subscribers[chan].each do |callback|
+                  callback.call(message)
+                end
+              end
+            end
+          end
+        end
+
+        def subscribe_to(channel, callback)
+          @sync.synchronize do
+            if @subscribers[channel].empty?
+              @queue.push([:listen, channel])
+            end
+
+            @subscribers[channel] << callback
+          end
+        end
+
+        def unsubscribe_to(channel, callback = nil)
+          @sync.synchronize do
+            if callback
+              @subscribers[channel].delete(callback)
+            else
+              @subscribers.delete(channel)
+            end
+
+            if @subscribers[channel].empty?
+              @queue.push([:unlisten, channel])
+            end
+          end
+        end
+      end
+
+      class PostgresWrapper
+        def publish(channel, message)
+          ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+            pg_conn = ar_conn.raw_connection
+
+            unless pg_conn.is_a?(PG::Connection)
+              raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+            end
+
+            pg_conn.exec("NOTIFY #{channel}, '#{message}'")
+          end
+        end
+
+        def subscribe(channel, &callback)
+          Listener.instance.subscribe_to(channel, callback)
+          # Needed for channel/streams.rb#L79
+          ::EM::DefaultDeferrable.new
+        end
+
+        def unsubscribe(channel)
+          Listener.instance.unsubscribe_to(channel)
+        end
+
+        def unsubscribe_proc(channel, block)
+          Listener.instance.unsubscribe_to(channel, block)
+        end
+      end
+
+    end
+  end
+end
-- 
cgit v1.2.3