orangeqs.juice.client.pubsub#
Helpers to instantiate publisher and subscribers from Juice configuration.
Module Contents#
Functions#
Publish an event to pubsub and database. |
|
Create an async publisher instance from Juice configuration. |
|
Create a synchronous publisher instance from Juice configuration. |
|
Create an async subscriber instance from Juice configuration. |
|
Create a synchronous subscriber instance from Juice configuration. |
API#
- async orangeqs.juice.client.pubsub.publish_event(event: orangeqs.juice.messaging.protocol.Event, publisher: orangeqs.juice.messaging.protocol.PublisherAsync, write_api: influxdb_client.WriteApi | influxdb_client.client.write_api_async.WriteApiAsync, bucket: str) None#
Publish an event to pubsub and database.
This is a convenience function to publish an event and write it to the database. In the future, the database functionality will be deprecated and handled internally by the Juice Task Manager. See #60 for more information.
Parameters#
event (Event): The event to publish.
publisher (PublisherAsync): The publisher to use for publishing the event.
write_api (WriteApi | WriteApiAsync): The InfluxDB write API to use for writing the event.
bucket (str): The InfluxDB bucket to write the event to.
- orangeqs.juice.client.pubsub.publisher_async(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None) orangeqs.juice.messaging.protocol.PublisherAsync#
Create an async publisher instance from Juice configuration.
Parameters#
connection_info (TaskManagerConnectionInfo, optional): Connection info to use for determining publisher uri. If not provided, will load from disk.
Returns#
(PublisherAsync): Publisher to publish events.
Examples#
Instantiate a publisher and publish an event:
from orangeqs.juice.messaging import Event from orangeqs.juice.client.pubsub import publisher_async class HelloEvent(Event): message: str publisher = publisher_async() await publisher.publish(HelloEvent(message="Hello, World!"))
- orangeqs.juice.client.pubsub.publisher_blocking(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None) orangeqs.juice.messaging.protocol.PublisherBlocking#
Create a synchronous publisher instance from Juice configuration.
Parameters#
config (TaskManagerConfig, optional): Task manager configuration to use for determining publisher uri. If not provided, will load from disk.
connection_info (TaskManagerConnectionInfo, optional): Connection info to use for determining publisher uri. If not provided, will load from disk.
Returns#
(PublisherBlocking): Publisher to publish events.
Examples#
Instantiate a publisher and publish an event:
from orangeqs.juice.messaging import Event from orangeqs.juice.client.pubsub import publisher_blocking class HelloEvent(Event): message: str publisher = publisher_blocking() publisher.publish(HelloEvent(message="Hello, World!"))
- orangeqs.juice.client.pubsub.subscriber_async(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None, *, queue: asyncio.Queue[orangeqs.juice.messaging.protocol.EventType] | None = None) orangeqs.juice.messaging.protocol.SubscriberAsync[orangeqs.juice.messaging.protocol.EventType]#
Create an async subscriber instance from Juice configuration.
Parameters#
config (JuiceSettings): Juice configuration to use for determining subscriber uri.
queue (asyncio.Queue, optional): Optional queue new messages will be put in. Will instantiate a new queue if not provided.
Returns#
(SubscriberAsync): Subscriber to subscribe to events.
Examples#
Instantiate a subscriber and listen for events:
import asyncio from orangeqs.juice.messaging import Event from orangeqs.juice.client.pubsub import subscriber_async class HelloEvent(Event): message: str subscriber = subscriber_async() subscriber.subscribe(HelloEvent) while True: event = await subscriber.get() print(f"Received: {event.model_dump()}")
- orangeqs.juice.client.pubsub.subscriber_blocking(connection_info: orangeqs.juice.schemas.task_manager.TaskManagerConnectionInfo | None = None, /) orangeqs.juice.messaging.protocol.SubscriberBlocking[orangeqs.juice.messaging.protocol.Event]#
Create a synchronous subscriber instance from Juice configuration.
Parameters#
config (JuiceSettings): Juice configuration to use for determining subscriber uri.
Returns#
(SubscriberBlocking): Subscriber to subscribe to events.
Examples#
Instantiate a subscriber and get the next event:
import asyncio from orangeqs.juice.messaging import Event from orangeqs.juice.client.pubsub import subscriber_blocking class HelloEvent(Event): message: str subscriber = subscriber_blocking() subscriber.subscribe(HelloEvent) event = subscriber.get()