Skip to main content

Overview

The Retriever is the data entry point for every Gaussia evaluation. You subclass it and implement load_dataset() to return your conversation data in the Dataset format.
from gaussia.core.retriever import Retriever
from gaussia.schemas.common import Dataset

class MyRetriever(Retriever):
    def load_dataset(self) -> list[Dataset]:
        # Load and return your data
        ...

The interface

class Retriever(ABC):
    def __init__(self, **kwargs):
        self.kwargs = kwargs

    @property
    def iteration_level(self) -> IterationLevel:
        return IterationLevel.FULL_DATASET

    @abstractmethod
    def load_dataset(self) -> list[Dataset] | Iterator[Dataset] | Iterator[StreamedBatch]:
        ...
Any keyword arguments passed to Metric.run(MyRetriever, **kwargs) are forwarded to your retriever’s __init__.

Iteration levels

Full dataset (default)

Loads the entire dataset into memory. Best for small to medium datasets.
class FullRetriever(Retriever):
    def load_dataset(self) -> list[Dataset]:
        return [Dataset(...), Dataset(...)]

Stream sessions

Yields one Dataset (session) at a time. Ideal for large datasets or database-backed sources.
from gaussia.schemas.common import IterationLevel

class StreamRetriever(Retriever):
    @property
    def iteration_level(self) -> IterationLevel:
        return IterationLevel.STREAM_SESSIONS

    def load_dataset(self):
        for row in database.fetch_sessions():
            yield Dataset(
                session_id=row["id"],
                assistant_id=row["assistant"],
                context=row["context"],
                conversation=[Batch(**b) for b in row["batches"]],
            )

Stream batches

Yields individual QA pairs wrapped in StreamedBatch. Useful for real-time or event-driven evaluation.
from gaussia.schemas.common import IterationLevel, SessionMetadata, StreamedBatch

class EventRetriever(Retriever):
    @property
    def iteration_level(self) -> IterationLevel:
        return IterationLevel.STREAM_BATCHES

    def load_dataset(self):
        for event in message_queue.consume():
            yield StreamedBatch(
                metadata=SessionMetadata(
                    session_id=event["session_id"],
                    assistant_id=event["assistant_id"],
                    context=event["context"],
                ),
                batch=Batch(**event["interaction"]),
            )

Passing configuration

Configuration flows from Metric.run() kwargs through to your retriever:
# These kwargs are passed to MyRetriever.__init__
results = Context.run(
    MyRetriever,
    model=model,
    db_url="postgresql://...",  # Forwarded to retriever
    limit=100,                  # Forwarded to retriever
)

class MyRetriever(Retriever):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.db_url = kwargs["db_url"]
        self.limit = kwargs.get("limit", 50)
When using STREAM_BATCHES or STREAM_SESSIONS with a generator, you cannot use FULL_DATASET iteration level. Gaussia will raise a ValueError if a generator is returned with FULL_DATASET.