orangeqs.juice.database#

Database functionality based on InfluxDB2.

OrangeQS Juice integrates with an InfluxDB2 database for storing time series data. This module provides the Point base class, which can be subclassed to define schemas for specific measurements.

See Database for an interactive guide on how to use the database.

Package Contents#

Classes#

Point

Base class for time-series data points used to store and query data from InfluxDB.

API#

class orangeqs.juice.database.Point(/, **data: Any)#

Bases: pydantic.BaseModel

Base class for time-series data points used to store and query data from InfluxDB.

Subclassing this class defines a schema for a specific measurement. This schema class can be used to store and query data from InfluxDB. The schema is based on a pydantic.BaseModel, so all pydantic features are available.

This base class follows two additional conventions:

  • By default, all fields are stored as InfluxDB fields. To mark a field as a tag, annotate the type using the string "tag", e.g. field: typing.Annotated[str, "tag"].

  • Every subclass must define a measurement class variable, which specifies the InfluxDB measurement name, e.g. measurement: typing.ClassVar[str] = "some_name".

Also see schema design best practices in the InfluxDB documentation, especially regarding the use of tags vs. fields.

Examples#

This example shows:

  • How to define a schema for a specific measurement.

  • How to store a record.

  • How to query it from the database.

from typing import Annotated, ClassVar
from datetime import datetime
from orangeqs.juice.database import Point

# Define a schema
class SomeEvent(Point):
    measurement: ClassVar[str] = "some_event"

    event_id: Annotated[str, "tag"]
    value: float

# Writing the event to InfluxDB
event = SomeEvent(event_id="event123", value=42.0)
event.write(write_api, bucket="my_bucket")

# Querying events from InfluxDB
events = SomeEvent.query(
    query_api,
    bucket="my_bucket",
    filters={"event_id": "event123"},
    limit=10
)
measurement: ClassVar[str]#

None

Name of the InfluxDB measurement. Must be unique and defined in every subclass.

time: datetime.datetime#

‘Field(…)’

The timestamp of the data point. Defaults to the current time in UTC.

to_influxdb2() influxdb_client.Point#

Convert the Point instance to an InfluxDB Point.

Returns#

  • (influxdb_client.Point): The corresponding InfluxDB Point object.

write(write_api: influxdb_client.WriteApi, bucket: str, **kwargs: Any) None#

Write the Point instance to InfluxDB.

Parameters#

  • write_api (influxdb_client.WriteApi): The InfluxDB WriteApi instance.

  • bucket (str): The target bucket in InfluxDB.

  • **kwargs (Any): Additional keyword arguments to pass to the WriteApi.write() method.

async write_async(write_api: influxdb_client.client.write_api_async.WriteApiAsync, bucket: str, **kwargs: Any) None#

Write the Point instance to InfluxDB.

Parameters#

  • write_api (influxdb_client.WriteApiAsync): The InfluxDB WriteApiAsync instance.

  • bucket (str): The target bucket in InfluxDB.

  • **kwargs (Any): Additional keyword arguments to pass to the WriteApiAsync.write() method.

classmethod query(query_api: influxdb_client.QueryApi, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#

Query the database for all points of this type based on filters.

Parameters#

  • query_api (influxdb_client.QueryApi): The InfluxDB QueryApi instance.

  • bucket (str): The target bucket in InfluxDB.

  • output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of Self, “dataframe” returns a pandas DataFrame (default is “list”).

  • filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter {"name": ["a", "b"], "color": "blue"} corresponds to (name == "a" or name == "b") and (color == "blue").

  • limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.

  • start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).

  • stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.

  • sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.

Returns#

  • (list[Self] | pd.DataFrame): A collection of Self instances or a DataFrame if output="dataframe".

async classmethod query_async(query_api: influxdb_client.client.query_api_async.QueryApiAsync, bucket: str, output: Literal[list, dataframe] = 'list', filters: orangeqs.juice.database._point._FilterType | None = None, limit: int = 100, start: str = '-1h', stop: str = 'now()', sort: Literal[asc, desc, None] = 'desc') list[typing_extensions.Self] | pandas.DataFrame#

Query the database for all points of this type based on filters.

Parameters#

  • query_api (influxdb_client.QueryApiAsync): The InfluxDB QueryApiAsync instance.

  • bucket (str): The target bucket in InfluxDB.

  • output (“list” or “dataframe”, optional): The output format: “list” returns a list of instances of Self, “dataframe” returns a pandas DataFrame (default is “list”).

  • filters (dict[str, str | int | float | bool | list[str | int | float | bool]]): A dictionary of filters to apply. The keys correspond to field or tag names, and the values can be a single value or a list of values for “OR” filtering. All filters are combined with “AND”. Defaults to None, which means no filtering. For example, the filter {"name": ["a", "b"], "color": "blue"} corresponds to (name == "a" or name == "b") and (color == "blue").

  • limit (int, optional): The maximum number of records per series key to return. Defaults to 100. A series key is a unique combination of tag values. See series in the InfluxDB documentation for more information.

  • start (str, optional): The start time for the query in InfluxDB time format. Defaults to “-1h” (last hour).

  • stop (str, optional): The stop time for the query in InfluxDB time format. Defaults to “now()”.

  • sort (“asc”, “desc” or None, optional): The sort order for the results based on time. If None, no sorting is applied. Defaults to “desc”.

Returns#

  • (list[Self] | pd.DataFrame): A collection of Self instances or a DataFrame if output="dataframe".