Start streaming from the named broadcasting pubsub queue.
Optionally, you can pass a callback that’ll be used instead of
the default of just transmitting the updates straight to the subscriber.
Pass coder: ActiveSupport::JSON to decode
messages as JSON before passing to the
callback. Defaults to coder: nil which does no decoding, passes
raw messages.
# File actioncable/lib/action_cable/channel/streams.rb, line 76
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
# Build a stream handler by wrapping the user-provided callback with
# a decoder or defaulting to a JSON-decoding retransmitter.
handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
streams << [ broadcasting, handler ]
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
end