PubSub¶
coredis includes a PubSub
class
that subscribes to channels and listens for new messages.
Creating an instance can be done through the coredis.Redis.pubsub()
or coredis.RedisCluster.pubsub()
methods.
r = coredis.Redis(...)
p = r.pubsub()
Once a PubSub
instance is created,
channels and patterns can be subscribed to.
await p.subscribe('my-first-channel', 'my-second-channel', ...)
await p.psubscribe('my-*', ...)
The PubSub
instance is now subscribed to those channels/patterns. The
subscription confirmations can be seen by reading messages from the PubSub
instance.
await p.get_message()
# {'pattern': None, 'type': 'subscribe', 'channel': 'my-second-channel', 'data': 1L}
await p.get_message()
# {'pattern': None, 'type': 'subscribe', 'channel': 'my-first-channel', 'data': 2L}
await p.get_message()
# {'pattern': None, 'type': 'psubscribe', 'channel': 'my-*', 'data': 3L}
Every message read from a PubSub
instance
will be a typed dictionary defined as:
- class PubSubMessage[source]
Bases:
TypedDict
- type: str
One of the following:
- subscribe
Server response when a client subscribes to a channel(s)
- unsubscribe
Server response when a client unsubscribes from a channel(s)
- psubscribe
Server response when a client subscribes to a pattern(s)
- punsubscribe
Server response when a client unsubscribes from a pattern(s)
- ssubscribe
Server response when a client subscribes to a shard channel(s)
- sunsubscribe
Server response when a client unsubscribes from a shard channel(s)
- message
A message received from subscribing to a channel
- pmessage
A message received from subscribing to a pattern
- channel: StringT
The channel subscribed to or unsubscribed from or the channel a message was published to
- pattern: StringT | None
The pattern that was subscribed to or unsubscribed from or to which a received message was routed to
- data: int | StringT
If
type
is one of{message, pmessage}
this is the actual published messageIf
type
is one of{subscribe, psubscribe, ssubscribe, unsubscribe, punsubscribe, sunsubscribe}
this will be anint
corresponding to the number of channels and patterns that the connection is currently subscribed to.
Let’s send a message now.
# the publish method returns the number matching channel and pattern
# subscriptions. 'my-first-channel' matches both the 'my-first-channel'
# subscription and the 'my-*' pattern subscription, so this message will
# be delivered to 2 channels/patterns
await r.publish('my-first-channel', 'some data')
# 2
await p.get_message()
# {'channel': 'my-first-channel', 'data': 'some data', 'pattern': None, 'type': 'message'}
await p.get_message()
# {'channel': 'my-first-channel', 'data': 'some data', 'pattern': 'my-*', 'type': 'pmessage'}
Unsubscribing works just like subscribing. If no arguments are passed to [p]unsubscribe, all channels or patterns will be unsubscribed from.
await p.unsubscribe()
await p.punsubscribe('my-*')
await p.get_message()
# {'channel': 'my-second-channel', 'data': 2L, 'pattern': None, 'type': 'unsubscribe'}
await p.get_message()
# {'channel': 'my-first-channel', 'data': 1L, 'pattern': None, 'type': 'unsubscribe'}
await p.get_message()
# {'channel': 'my-*', 'data': 0L, 'pattern': None, 'type': 'punsubscribe'}
coredis also allows you to register callback functions to handle published messages. Message handlers take a single argument, the message, which is a dictionary just like the examples above. To subscribe to a channel or pattern with a message handler, pass the channel or pattern name as a keyword argument with its value being the callback function.
When a message is read on a channel or pattern with a message handler, the
message dictionary is created and passed to the message handler. In this case,
a None
value is returned from get_message()
since the message was already handled.
def my_handler(message):
print('MY HANDLER: ', message['data'])
await p.subscribe(**{'my-channel': my_handler})
# read the subscribe confirmation message
await p.get_message()
# {'pattern': None, 'type': 'subscribe', 'channel': 'my-channel', 'data': 1L}
await r.publish('my-channel', 'awesome data')
# 1
# for the message handler to work, we need tell the instance to read data.
# this can be done in several ways (read more below). we'll just use
# the familiar get_message() function for now
await message = p.get_message()
# 'MY HANDLER: awesome data'
# note here that the my_handler callback printed the string above.
# `message` is None because the message was handled by our handler.
print(message)
# None
If your application is not interested in the subscribe/unsubscribe confirmation messages,
you can ignore them by setting ignore_subscribe_messages
to True
. This will cause all subscribe/unsubscribe messages to be read, but they won’t
bubble up to your application.
p = r.pubsub(ignore_subscribe_messages=True)
await p.subscribe('my-channel')
await p.get_message() # hides the subscribe message and returns None
await r.publish('my-channel')
# 1
await p.get_message()
# {'channel': 'my-channel', 'data': 'my data', 'pattern': None, 'type': 'message'}
There are two main strategies for reading messages.
The examples above have been using get_message()
.
If there’s data available to be read, the method will read it, format the message
and return it or pass it to a message handler. If there’s no data to be read, it
will return None
after the configured timeout
while True:
message = await p.get_message()
if message:
# do something with the message
await asyncio.sleep(0.001) # be nice to the system :)
The second option runs an event loop in a separate thread.
run_in_thread()
creates a new thread and uses
the event loop in the main thread. The thread instance of
PubSubWorkerThread
is returned to the caller
of run_in_thread()
. The caller can use the
stop()
method on the thread
instance to shut down the event loop and thread. Behind the scenes, this is
simply a wrapper around get_message()
that runs in a separate thread, and use asyncio.run_coroutine_threadsafe()
to run coroutines.
Note: Since we’re running in a separate thread, there’s no way to handle
messages that aren’t automatically handled with registered message handlers.
Therefore, coredis prevents you from calling run_in_thread()
if you’re subscribed to patterns or channels that don’t have message handlers attached.
await p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()
PubSub instances remember what channels and patterns they are subscribed to. In
the event of a disconnection such as a network error or timeout, the
PubSub instance will re-subscribe to all prior channels and patterns when
reconnecting. Messages that were published while the client was disconnected
cannot be delivered. When you’re finished with a PubSub object, call the
close()
method to shutdown the connection.
p = r.pubsub()
...
p.close()
The Pub/Sub support commands PUBSUB-CHANNELS, PUBSUB-NUMSUB and PUBSUB-NUMPAT are also supported:
await r.pubsub_channels()
# ['foo', 'bar']
await r.pubsub_numsub('foo', 'bar')
# [('foo', 9001), ('bar', 42)]
await r.pubsub_numsub('baz')
# [('baz', 0)]
await r.pubsub_numpat()
# 1204
Cluster Pub/Sub¶
The coredis.RedisCluster
client exposes two ways of building a Pub/Sub
application.
pubsub()
returns an instance of coredis.commands.ClusterPubSub
which exposes identical functionality to the non clustered client. This is possible
without worrying about sharding as the PUBLISH command in clustered redis results
in messages being broadcasted to every node in the cluster.
On the consumer side of the equation coredis simply picks a random node and consumes the messages from all subscribed topics.
This approach, though functional does pose limited opportunity for horizontal scaling as all the nodes in the cluster will have to process the published messages for all channels.