Overview
The Retriever is the data entry point for every Gaussia evaluation. You subclass it and implement load_dataset() to return your conversation data in the Dataset format.
from gaussia.core.retriever import Retriever
from gaussia.schemas.common import Dataset
class MyRetriever(Retriever):
def load_dataset(self) -> list[Dataset]:
# Load and return your data
...
The interface
class Retriever(ABC):
def __init__(self, **kwargs):
self.kwargs = kwargs
@property
def iteration_level(self) -> IterationLevel:
return IterationLevel.FULL_DATASET
@abstractmethod
def load_dataset(self) -> list[Dataset] | Iterator[Dataset] | Iterator[StreamedBatch]:
...
Any keyword arguments passed to Metric.run(MyRetriever, **kwargs) are forwarded to your retriever’s __init__.
Iteration levels
Full dataset (default)
Loads the entire dataset into memory. Best for small to medium datasets.
class FullRetriever(Retriever):
def load_dataset(self) -> list[Dataset]:
return [Dataset(...), Dataset(...)]
Stream sessions
Yields one Dataset (session) at a time. Ideal for large datasets or database-backed sources.
from gaussia.schemas.common import IterationLevel
class StreamRetriever(Retriever):
@property
def iteration_level(self) -> IterationLevel:
return IterationLevel.STREAM_SESSIONS
def load_dataset(self):
for row in database.fetch_sessions():
yield Dataset(
session_id=row["id"],
assistant_id=row["assistant"],
context=row["context"],
conversation=[Batch(**b) for b in row["batches"]],
)
Stream batches
Yields individual QA pairs wrapped in StreamedBatch. Useful for real-time or event-driven evaluation.
from gaussia.schemas.common import IterationLevel, SessionMetadata, StreamedBatch
class EventRetriever(Retriever):
@property
def iteration_level(self) -> IterationLevel:
return IterationLevel.STREAM_BATCHES
def load_dataset(self):
for event in message_queue.consume():
yield StreamedBatch(
metadata=SessionMetadata(
session_id=event["session_id"],
assistant_id=event["assistant_id"],
context=event["context"],
),
batch=Batch(**event["interaction"]),
)
Passing configuration
Configuration flows from Metric.run() kwargs through to your retriever:
# These kwargs are passed to MyRetriever.__init__
results = Context.run(
MyRetriever,
model=model,
db_url="postgresql://...", # Forwarded to retriever
limit=100, # Forwarded to retriever
)
class MyRetriever(Retriever):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.db_url = kwargs["db_url"]
self.limit = kwargs.get("limit", 50)
When using STREAM_BATCHES or STREAM_SESSIONS with a generator, you cannot use FULL_DATASET iteration level. Gaussia will raise a ValueError if a generator is returned with FULL_DATASET.