orangeqs.juice.database#
Database functionality based on InfluxDB2.
OrangeQS Juice integrates with an InfluxDB2
database for storing time series data.
This module provides the Point base class, which can be subclassed to define
schemas for specific measurements.
See Database for an interactive guide on how to use the database.
Package Contents#
Classes#
Base class for time-series data points used to store and query data from InfluxDB. |
API#
- class orangeqs.juice.database.Point(/, **data: Any)#
Bases:
pydantic.BaseModelBase class for time-series data points used to store and query data from InfluxDB.
Subclassing this class defines a schema for a specific measurement. This schema class can be used to store and query data from InfluxDB. The schema is based on a
pydantic.BaseModel, so all pydantic features are available.This base class follows two additional conventions:
By default, all fields are stored as InfluxDB fields. To mark a field as a tag, annotate the type using the string
"tag", e.g.field: typing.Annotated[str, "tag"].Every subclass must define a
measurementclass variable, which specifies the InfluxDB measurement name, e.g.measurement: typing.ClassVar[str] = "some_name".
Also see schema design best practices in the InfluxDB documentation, especially regarding the use of tags vs. fields.
Examples#
This example shows:
How to define a schema for a specific measurement.
How to store a record.
How to query it from the database.
from typing import Annotated, ClassVar from datetime import datetime from orangeqs.juice.database import Point # Define a schema class SomeEvent(Point): measurement: ClassVar[str] = "some_event" event_id: Annotated[str, "tag"] value: float # Writing the event to InfluxDB event = SomeEvent(event_id="event123", value=42.0) event.write(write_api, bucket="my_bucket") # Querying events from InfluxDB events = SomeEvent.query( query_api, bucket="my_bucket", filters={"event_id": "event123"}, limit=10 )
- measurement: ClassVar[str]#
None
Name of the InfluxDB measurement. Must be unique and defined in every subclass.
- time: datetime.datetime#
‘Field(…)’
The timestamp of the data point. Defaults to the current time in UTC.
- to_influxdb2() influxdb_client.Point#
Convert the Point instance to an InfluxDB Point.
Returns#
(influxdb_client.Point): The corresponding InfluxDB Point object.
- write(write_api: influxdb_client.WriteApi, bucket: str, **kwargs: Any) None#
Write the Point instance to InfluxDB.
Parameters#
write_api (influxdb_client.WriteApi): The InfluxDB WriteApi instance.
bucket (str): The target bucket in InfluxDB.
**kwargs (Any): Additional keyword arguments to pass to the
WriteApi.write()method.
- async write_async(write_api: influxdb_client.client.write_api_async.WriteApiAsync, bucket: str, **kwargs: Any) None#
Write the Point instance to InfluxDB.
Parameters#
write_api (influxdb_client.WriteApiAsync): The InfluxDB WriteApiAsync instance.
bucket (str): The target bucket in InfluxDB.
**kwargs (Any): Additional keyword arguments to pass to the
WriteApiAsync.write()method.
- classmethod query(query_api: influxdb_client.QueryApi, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#
Query the database for all points of this type based on filters.
Parameters#
query_api (influxdb_client.QueryApi): The InfluxDB QueryApi instance.
bucket (str): The target bucket in InfluxDB.
output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of
Self, “dataframe” returns a pandas DataFrame (default is “list”).filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter
{"name": ["a", "b"], "color": "blue"}corresponds to(name == "a" or name == "b") and (color == "blue").limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.
start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).
stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.
sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.
Returns#
(list[Self] | pd.DataFrame): A collection of
Selfinstances or a DataFrame ifoutput="dataframe".
- async classmethod query_async(query_api: influxdb_client.client.query_api_async.QueryApiAsync, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#
Query the database for all points of this type based on filters.
Parameters#
query_api (influxdb_client.QueryApiAsync): The InfluxDB QueryApiAsync instance.
bucket (str): The target bucket in InfluxDB.
output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of
Self, “dataframe” returns a pandas DataFrame (default is “list”).filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter
{"name": ["a", "b"], "color": "blue"}corresponds to(name == "a" or name == "b") and (color == "blue").limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.
start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).
stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.
sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.
Returns#
(list[Self] | pd.DataFrame): A collection of
Selfinstances or a DataFrame ifoutput="dataframe".