diff options
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb')
-rw-r--r-- | actioncable/lib/action_cable/subscription_adapter/postgresql.rb | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..66c7852f6e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -5,6 +5,11 @@ require 'thread' module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + def initialize(*) + super + @listener = nil + end + def broadcast(channel, payload) with_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") @@ -37,14 +42,15 @@ module ActionCable private def listener - @listener ||= Listener.new(self) + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -63,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +99,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + @event_loop.post { super } end end end |