orangeqs.juice.messaging.protocol#
Protocol interfaces for messaging between Juice services.
This module defines generic interfaces that should be implemented using a base protocol, e.g ZMQ. All interfaces in this module are agnostic to both the underlying protocol and Juice itself. This means that this module contains no references to Juice and/or the underlying protocol.
Both the subscriber and publisher are expected to connect to a central proxy such that one can publish and subscribe to messages from a single source.
As of now this module only contains interfaces for pub/sub messaging. In the future it will also include standard interfaces for request/reply.
Module Contents#
Classes#
Message representing an event that is published to Juice services. |
|
Asynchronous client for subscribing to events from other Juice services. |
|
Synchronous blocking client for subscribing to events from other Juice services. |
|
Asynchronous client for publishing events to other Juice services. |
|
Synchronous blocking client for publishing events to other Juice services. |
|
Asynchronous client for subscribing to events from other Juice services. |
|
Asynchronous client for publishing events to other Juice services. |
Data#
API#
- class orangeqs.juice.messaging.protocol.Event(/, **data: Any)#
Bases:
orangeqs.juice.messaging.message.Message,orangeqs.juice.database.PointMessage representing an event that is published to Juice services.
Other events sent over pub/sub must inherit this class. See
Messagefor more information. This class also inherits fromPointwhich allows these events to be stored in the database.Examples#
For example, one can add an event that represents a kernel status changing.
class KernelStatusChanged(Event): def topic(self) -> str: return self.service service: str status: str
This event includes two fields (service and status). Additionally, it marks that the topic of the event should be the service name. Now, subscribers can use a topic filter to only subscribe to kernel status updates of a specific service. If the subscriber does not add a topic filter it will receive all kernel status updates by default.
Also see
topic().- topic() str | None#
Topic of the message.
Can be used by listeners to filter events of the same type. Can be implemented by subclasses to provide fine control on subscriptions. Defaults to no topic.
See the docstring of
Eventfor more information on how to use topics.Returns#
(str): The topic of this event.
- class orangeqs.juice.messaging.protocol.SubscriberAsync(uri: str, *, queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType] | None = None)#
Bases:
typing.Protocol[orangeqs.juice.messaging.protocol.EventType]Asynchronous client for subscribing to events from other Juice services.
The subscriber will not receive any events until you subscribe to events. See
SubscriberAsync.subscribe()for subscribing to events.See
PublisherAsyncorPublisherBlockingfor publishing events.Parameters#
uri (str): URI to connect the subscriber to. Format is implementation dependent.
queue (asyncio.Queue, optional): Optional queue new messages will be put in. Will instantiate a new queue if not provided.
Examples#
Instantiate a subscriber and print new events as they are received.
subscriber = SubscriberAsync("<uri>") subscriber.subscribe(DummyEvent, topic="something")) while True: event = await subscriber.get() assert type(event) == DummyEvent assert event.topic().startswith("something") print(f"Received {type(event)} with data {event}")
- abstract property queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType]#
Queue to which new messages will be put that are received by
listen().Deprecated, use the
get()method to retrieve messages instead.
- subscribe(event_type: type[orangeqs.juice.messaging.protocol.EventType], *, topic: str | None = None) None#
Subscribe to an event.
It is possible to subscribe to multiple event types and/or topics. The subscriber allows filtering by event type and an optional topic. The topic filter checks if the topic of the event starts with the topic filter. This means that a topic filter with value
"a.b"will match events with topic"a.b"and"a.b.c", but not"a.c"or"a.bb". Note that each topic filter always targets a specific event type, thus they are not shared between different event types.Parameters#
event_type (type[EventType]): Type of event to subscribe to. Will subscribe only to one specific event type, so not types that inherit this type.
topic (str, optional): Optional topic to filter events. See above for explanation. If not provided will subscribe to all events of specified type.
- stop() None#
Stop listening for new events.
Stops the
listen()coroutine.Deprecated, use the
get()method to retrieve messages instead.
- async listen() None#
Coroutine to listen for new events and append them to the queue.
Runs continuously until
.stop()is called. This coroutine should be continuously running to receive new messages and append them to the queue.Deprecated, use the
get()method to retrieve messages instead.
- async get(timeout: float | None = None) orangeqs.juice.messaging.protocol.EventType#
Get the next event from the subscriber.
Waits asynchronously until a new event is received.
Parameters#
timeout (float, optional): Optional timeout in seconds to wait for a new event. If not provided, will wait indefinitely.
Raises#
(TimeoutError): If a timeout is provided and no event is received within the timeout period.
- class orangeqs.juice.messaging.protocol.SubscriberBlocking(uri: str, /)#
Bases:
typing.Protocol[orangeqs.juice.messaging.protocol.EventType]Synchronous blocking client for subscribing to events from other Juice services.
The subscriber will not receive any events until you subscribe to events. See
subscribe()for subscribing to events.See
PublisherBlockingorPublisherAsyncfor publishing events.Parameters#
uri (str): URI to connect the subscriber to. Format is implementation dependent.
Examples#
Instantiate a subscriber and wait for new events.
subscriber = SubscriberBlocking("<uri>") subscriber.subscribe(DummyEvent, topic="something")) subscriber.get() # Blocking call to get the next event
- subscribe(event_type: type[orangeqs.juice.messaging.protocol.EventType], /, *, topic: str | None = None) None#
Subscribe to an event.
It is possible to subscribe to multiple event types and/or topics. The subscriber allows filtering by event type and an optional topic. The topic filter checks if the topic of the event starts with the topic filter. This means that a topic filter with value
"a.b"will match events with topic"a.b"and"a.b.c", but not"a.c"or"a.bb". Note that each topic filter always targets a specific event type, thus they are not shared between different event types.Parameters#
event_type (type[EventType]): Type of event to subscribe to. Will subscribe only to one specific event type, so not types that inherit this type.
topic (str, optional): Optional topic to filter events. See above for explanation. If not provided will subscribe to all events of specified type.
- get(timeout: float | None = None) orangeqs.juice.messaging.protocol.EventType#
Get the next event from the subscriber.
This is a blocking call that waits until a new event is received.
Parameters#
timeout (float, optional): Optional timeout in seconds to wait for a new event. If not provided, will wait indefinitely.
Raises#
(TimeoutError): If a timeout is provided and no event is received within the timeout period.
- class orangeqs.juice.messaging.protocol.PublisherAsync(uri: str)#
Bases:
typing.ProtocolAsynchronous client for publishing events to other Juice services.
See
SubscriberAsyncorSubscriberBlockingfor subscribing to events.Parameters#
uri (str): URI to connect the publisher to. Format is implementation dependent.
Examples#
Instantiate a publisher and publish an event.
publisher = PublisherAsync("<uri>") await publisher.publish(DummyEvent(data="data"))
- async publish(event: orangeqs.juice.messaging.protocol.Event) None#
Publish a message to the system.
Parameters#
event (Event): Event to be published.
- class orangeqs.juice.messaging.protocol.PublisherBlocking(uri: str)#
Bases:
typing.ProtocolSynchronous blocking client for publishing events to other Juice services.
See
SubscriberBlockingorSubscriberAsyncfor subscribing to events.Parameters#
uri (str): URI to connect the publisher to. Format is implementation dependent.
Examples#
Instantiate a publisher and publish an event.
publisher = PublisherBlocking("<uri>") publisher.publish(DummyEvent(data="data"))
- publish(event: orangeqs.juice.messaging.protocol.Event) None#
Publish a message to the system.
Parameters#
event (Event): Event to be published.
- class orangeqs.juice.messaging.protocol.Subscriber(uri: str, *, queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType] | None = None)#
Bases:
orangeqs.juice.messaging.protocol.SubscriberAsync[orangeqs.juice.messaging.protocol.EventType],typing.ProtocolAsynchronous client for subscribing to events from other Juice services.
The subscriber will not receive any events until you subscribe to events. See
SubscriberAsync.subscribe()for subscribing to events.See
PublisherAsyncorPublisherBlockingfor publishing events.Parameters#
uri (str): URI to connect the subscriber to. Format is implementation dependent.
queue (asyncio.Queue, optional): Optional queue new messages will be put in. Will instantiate a new queue if not provided.
Examples#
Instantiate a subscriber and print new events as they are received.
subscriber = SubscriberAsync("<uri>") subscriber.subscribe(DummyEvent, topic="something")) while True: event = await subscriber.get() assert type(event) == DummyEvent assert event.topic().startswith("something") print(f"Received {type(event)} with data {event}")
- class orangeqs.juice.messaging.protocol.Publisher(uri: str)#
Bases:
orangeqs.juice.messaging.protocol.PublisherAsync,typing.ProtocolAsynchronous client for publishing events to other Juice services.
See
SubscriberAsyncorSubscriberBlockingfor subscribing to events.Parameters#
uri (str): URI to connect the publisher to. Format is implementation dependent.
Examples#
Instantiate a publisher and publish an event.
publisher = PublisherAsync("<uri>") await publisher.publish(DummyEvent(data="data"))