Plugin Authoring
Recotem discovers DataSource plugins via Python entry points. A plugin is any installed package that registers in the recotem.datasources group.
The examples/plugins/echo-source/ directory in this repository is a minimal, runnable reference implementation.
Plugin contract
A plugin must provide a class with three class-level attributes and one required method (fetch); __init__ and the optional probe are described below.
from __future__ import annotations
import random
from typing import ClassVar
import pandas as pd
from pydantic import BaseModel, Field
from recotem.datasource.base import DataSourceError, FetchContext
class EchoSource:
"""Returns a synthetic DataFrame — useful for testing and CI."""
# 1. type_name: discriminator value matched against the recipe YAML
# `source.type` field. Must be a non-empty string and unique across
# all installed plugins. By convention use a short lower-case slug.
type_name: ClassVar[str] = "echo"
# 2. Config: pydantic BaseModel describing the recipe sub-fields for this
# source. All fields appear under `source:` in the YAML alongside the
# `type:` discriminator. The loader passes the entire `source:` mapping
# (including `type`) to `Config.model_validate(...)`, so either declare
# `type` as a field on Config (the builtin convention — see below) or
# rely on pydantic's default `extra="ignore"` to drop it. Combining
# `extra="forbid"` with no `type` field will fail recipe load with an
# "unexpected key" error.
class Config(BaseModel):
n_users: int = Field(default=10, ge=1)
n_items: int = Field(default=20, ge=1)
n_rows: int = Field(default=100, ge=1)
seed: int = Field(default=42)
# 3. extras_required: pip extras to suggest when optional dependencies
# are missing. Leave empty if the plugin has no optional deps.
extras_required: ClassVar[list[str]] = []
# 4. no_expand_fields: frozenset of field names inside the source config
# whose string values must NEVER receive ${RECOTEM_RECIPE_*} env-var
# expansion. List any fields that carry raw SQL, query parameters, or
# other content where ${} should be treated as literals.
# Use frozenset() (empty) when no fields need protection beyond the
# global baseline (query, query_parameters) that is always guarded.
# This attribute is REQUIRED — validate_plugin_contract enforces its
# presence and its type (frozenset). A missing or wrong-type attribute
# raises DataSourceError at plugin discovery with a pointer to this doc.
no_expand_fields: ClassVar[frozenset[str]] = frozenset()
def __init__(self, config: "EchoSource.Config") -> None:
self._config = config
def fetch(self, ctx: FetchContext) -> pd.DataFrame:
"""Return a DataFrame whose columns include those named in
the recipe `schema` block (user_column, item_column, optional
time_column).
Returns a DataFrame with columns: user_id (str), item_id (str),
timestamp (int epoch seconds).
"""
cfg = self._config
max_possible = cfg.n_users * cfg.n_items
if cfg.n_rows > max_possible:
raise DataSourceError(
f"EchoSource: n_rows ({cfg.n_rows}) exceeds n_users * n_items "
f"({max_possible}). Reduce n_rows or increase n_users/n_items."
)
rng = random.Random(cfg.seed)
users = [f"user_{i}" for i in range(cfg.n_users)]
items = [f"item_{j}" for j in range(cfg.n_items)]
all_pairs = [(u, v) for u in users for v in items]
sampled = rng.sample(all_pairs, cfg.n_rows)
base_ts = 1_700_000_000
rows = [
{"user_id": u, "item_id": v, "timestamp": base_ts + idx}
for idx, (u, v) in enumerate(sampled)
]
return pd.DataFrame(rows, columns=["user_id", "item_id", "timestamp"])
def probe(self) -> None:
"""Optional. Called by recotem validate to test connectivity.
Should be cheap — never load full data.
Raise DataSourceError on failure.
Return value is ignored by recotem (Protocol declares -> None).
"""
cfg = self._config
max_possible = cfg.n_users * cfg.n_items
if cfg.n_rows > max_possible:
raise DataSourceError(
f"EchoSource: n_rows ({cfg.n_rows}) exceeds n_users * n_items "
f"({max_possible})."
)
# discarded by recotem validate — kept here for illustration only
return {"status": "ok", "rows_to_emit": cfg.n_rows, "items": cfg.n_items} # type: ignore[return-value]Rules
type_nameis the discriminator value. It appears assource.type: echoin the recipe. The registry validates that it is a non-empty string and unique across all loaded plugins; duplicatetype_namevalues cause bothrecotem trainandrecotem serveto fail at startup with aDataSourceError(exit code 3) listing the conflicting fully-qualified class names.Configis a pydanticBaseModel. Fields are validated at recipe load. Use pydantic validators for constraints. Required fields without defaults cause aRecipeErrorwhen missing from the recipe.extras_requiredis purely documentation. The registry only validates that it is alist[str]; recotem never auto-installs or auto-checks these extras. Surface a helpful message yourself in__init__(see Deferred imports) — the value of the attribute is what you cite there.no_expand_fieldsis required and must be afrozenset[str]. It names every field in the sourceConfigwhose string values must never receive${RECOTEM_RECIPE_*}environment-variable expansion.validate_plugin_contractchecks that this attribute is present and is afrozenset; a missing or wrong-type declaration raisesDataSourceErrorat plugin-discovery time with a pointer to this doc.- For most plugins, declare
no_expand_fields: ClassVar[frozenset[str]] = frozenset()— the global baseline (query,query_parameters) is already guarded unconditionally by the recipe loader. - For plugins with SQL or parameterised-query fields, list them explicitly:
no_expand_fields: ClassVar[frozenset[str]] = frozenset({"sql", "bind_params"}). This provides defence-in-depth and documents the security intent for future maintainers.
- For most plugins, declare
fetch(ctx)must return apandas.DataFrame. The DataFrame must contain at least the columns referenced inrecipe.schema(user_column,item_column, and optionallytime_column). The training pipeline accesses those columns by name immediately after fetch — a missing column surfaces as aKeyErrorand exits the train run.fetch()must raiseDataSourceErrorfor any external or transient failure (auth errors, network errors, query errors, empty results).DataSourceErroris mapped to exit code 3. Any other exception surfaces as exit code 1. Wrap third-party exceptions explicitly:pythondef fetch(self, ctx: FetchContext) -> pd.DataFrame: try: return self._do_fetch() except SomeLibraryError as exc: raise DataSourceError(str(exc)) from excDeferred imports. Do not import optional dependencies at module top-level. Defer to
__init__orfetch():pythondef __init__(self, config: "MySource.Config") -> None: try: import my_optional_dep # noqa: F401 except ImportError as exc: raise DataSourceError( "MySource requires 'recotem[myextra]'. " "Install with: pip install 'recotem[myextra]'" ) from exc self.config = configThis ensures missing extras produce a clear
DataSourceErrormentioning the required extra by name, rather than anImportErrorwith exit code 1.
Package structure
The reference plugin under examples/plugins/echo-source/ uses this layout:
recotem-echo-source/
├── pyproject.toml
└── src/
└── recotem_echo/
├── __init__.py # re-exports EchoSource so "recotem_echo:EchoSource" resolves
└── source.py # EchoSource class definitionA flatter recotem_echo/__init__.py containing the class directly also works — what matters is that the entry-point string <module>:<class> resolves.
pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "recotem-echo-source"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = ["recotem>=2.0,<3", "pandas>=2.2,<4"]
[project.entry-points."recotem.datasources"]
echo = "recotem_echo:EchoSource"
[tool.hatch.build.targets.wheel]
packages = ["src/recotem_echo"]The entry-point key (echo) is the name reported in registry log/error messages but is not used as the discriminator — Recotem uses the loaded class's type_name attribute. By convention, keep them the same.
Install and use
uv pip install -e examples/plugins/echo-source/Verify discovery by running recotem validate against a recipe that uses the plugin — the loader resolves source.type through the entry-point registry and will report Unknown DataSource type 'echo' if the plugin is not installed in the same environment as recotem.
recotem schema includes plugin configs
recotem schema builds the JSON Schema at runtime by constructing a discriminated union of every registered DataSource Config class (including plugin-provided ones) and substituting it into the Recipe model. Plugin Config schemas do appear in the output — this is what makes IDE autocompletion work for source.* fields. The union is assembled via build_source_config_union() at invocation time, so the plugin must be installed in the same Python environment as recotem.
Recipe:
name: echo_test
source:
type: echo
n_users: 50
n_items: 100
n_rows: 500
seed: 42 # optional; omit to use the default seed
schema:
user_column: user_id
item_column: item_id
time_column: timestamp # EchoSource emits integer epoch-second timestamps
training:
algorithms: [TopPop]
metric: ndcg
cutoff: 10
n_trials: 1
output:
path: ./artifacts/echo_test.recotemTrain:
recotem train recipe.yamlFetchContext
FetchContext carries metadata that fetch() can optionally use:
@dataclass
class FetchContext:
recipe_name: str # the recipe's name field
run_id: str # unique ID for this training run (UUID)
extra: dict[str, Any] = field(default_factory=dict) # reserved for future useMost plugins ignore ctx. It is useful for logging and for idempotency keys when fetching from write-heavy sources.
Constraints on fetch()
- Synchronous, returning a single
pandas.DataFrame. Generators,Iterator[DataFrame], andasync defare not supported — the training pipeline callsfetch(ctx)directly and reads.columnsimmediately. - Whole-DataFrame in memory. Recotem trains on the full result set (irspack constructs a sparse matrix from it). For larger-than-memory sources, do the chunking and aggregation inside
fetch()and return a pre-aggregated DataFrame (e.g. counts of(user, item)pairs). - Credentials never come via
FetchContext.extra(it is reserved). Read them from environment variables (preferred — works with K8s Secrets, systemdEnvironmentFile, Docker--env-file) or from recipe-declaredConfigfields (but never accept secrets in YAML — reference an env var via${RECOTEM_RECIPE_*}instead).
Item metadata loading
If your plugin's recipe uses item_metadata, the metadata is loaded by recotem.metadata.loader.load_item_metadata. Failures surface as MetadataError (not DataSourceError) so they are distinguishable from source-fetch failures. The exception carries a .cause attribute indicating the failure origin:
.cause | Meaning |
|---|---|
"http_fetch" | HTTP/HTTPS fetch failed (SSRF guard, byte cap, sha256 mismatch). __cause__ is HttpFetchError. |
"parse" | File could not be parsed as the declared type (CSV/Parquet). |
"field_missing" | A required field is absent and on_field_missing="error". |
"io" | Local or object-store read failed. |
"unknown" | Catch-all for unexpected failures. |
The loader accepts an optional recipe_name= keyword argument. When provided, the recipe name is threaded into HTTP fetcher log context so that redirect and byte-cap log events (e.g. metadata_source_redirect) are correlated with the recipe that triggered the load. This is set automatically by the watcher; you only need it when calling load_item_metadata directly (e.g. in tests).
Compatibility
The plugin contract is part of the recotem 2.x public surface. Pin recotem>=2.0,<3 in your plugin's pyproject.toml — the type_name / Config / fetch(ctx) shape is stable within a major version. The probe() hook may gain optional parameters in a future minor release; use **kwargs: Any if you want to be future-proof.
The entry-point key in [project.entry-points."recotem.datasources"] is informational only (used in error messages); the discriminator is the class's type_name. If two installed plugins both declare type_name = "csv", both recotem train and recotem serve exit 3 at startup with both fully-qualified class names — uninstall one or rename its type_name.
Validation in recotem validate
recotem validate recipes/my_recipe.yaml instantiates the source class (which exercises the __init__ deferred-import / extras check) but does not call fetch(). If the source defines an optional probe() method, recotem validate calls it for a lightweight connectivity / auth check:
def probe(self) -> dict:
"""Optional. Called by recotem validate to test connectivity.
Should be cheap (LIMIT 1, dry-run, fs.exists, ...) — never load full data.
Raise DataSourceError on failure. Return a small status dict that
recotem validate logs (e.g. {"status": "ok", "rows_to_emit": n_rows}).
"""
...When probe() is defined, recotem validate reports DataSource: probe OK (<type_name>); when it is not, it reports DataSource: extras OK (<type_name>, no probe defined). The builtin CSVSource / ParquetSource use fsspec exists(), and BigQuerySource uses a dry-run query job.
Testing
Test fetch() directly without the CLI:
from recotem_echo import EchoSource
from recotem.datasource.base import FetchContext
source = EchoSource(EchoSource.Config(n_users=20, n_items=50, n_rows=200))
ctx = FetchContext(recipe_name="test", run_id="abc")
df = source.fetch(ctx)
assert {"user_id", "item_id", "timestamp"}.issubset(df.columns)
assert len(df) == 200Use recotem.recipe.load_recipe in integration tests to confirm the full YAML → Recipe → DataSource path. recipe.source is an instance of the plugin's Config model:
from recotem.recipe import load_recipe
from recotem_echo import EchoSource
recipe = load_recipe("tests/fixtures/echo_recipe.yaml")
assert isinstance(recipe.source, EchoSource.Config)Plugin trust
DANGER
Third-party DataSource plugins run with full process privileges. A malicious plugin can read env vars including RECOTEM_SIGNING_KEYS and RECOTEM_API_KEYS. Pin plugin versions, hash-pin via your lock file, and review source code before deploying. See Security — Plugin trust.
