1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
module ActionCable
class Connection
PING_INTERVAL = 3
attr_reader :env, :server
delegate :worker_pool, :pubsub, :logger, to: :server
def initialize(server, env)
@server = server
@env = env
@accept_messages = false
@pending_messages = []
end
def process
if Faye::WebSocket.websocket?(@env)
@subscriptions = {}
@websocket = Faye::WebSocket.new(@env)
@websocket.on(:open) do |event|
broadcast_ping_timestamp
@ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
worker_pool.async.invoke(self, :initialize_client)
end
@websocket.on(:message) do |event|
message = event.data
if message.is_a?(String)
if @accept_messages
worker_pool.async.invoke(self, :received_data, message)
else
@pending_messages << message
end
end
end
@websocket.on(:close) do |event|
worker_pool.async.invoke(self, :cleanup_subscriptions)
worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)
EventMachine.cancel_timer(@ping_timer) if @ping_timer
end
@websocket.rack_response
else
invalid_request
end
end
def received_data(data)
return unless websocket_alive?
data = ActiveSupport::JSON.decode data
case data['action']
when 'subscribe'
subscribe_channel(data)
when 'unsubscribe'
unsubscribe_channel(data)
when 'message'
process_message(data)
end
end
def cleanup_subscriptions
@subscriptions.each do |id, channel|
channel.unsubscribe
end
end
def broadcast(data)
logger.info "Sending data: #{data}"
@websocket.send data
end
def handle_exception
logger.error "[ActionCable] Closing connection"
@websocket.close
end
private
def initialize_client
connect if respond_to?(:connect)
@accept_messages = true
worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
end
def broadcast_ping_timestamp
broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
end
def subscribe_channel(data)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }
if subscription_klass
logger.info "Subscribing to channel: #{id_key}"
@subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
else
logger.error "Unable to subscribe to channel: #{id_key}"
end
end
def process_message(message)
if @subscriptions[message['identifier']]
@subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
else
logger.error "Unable to process message: #{message}"
end
end
def unsubscribe_channel(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
@subscriptions[data['identifier']].unsubscribe
@subscriptions.delete(data['identifier'])
end
def invalid_request
[404, {'Content-Type' => 'text/plain'}, ['Page not found']]
end
def websocket_alive?
@websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
end
end
end
|