Context Pipeline
The agent's context assembly is structured as a priority-ordered chain of ContextProvider transformations that run on every LLM call. Three built-in providers ship out of the box; you can register custom providers to inject cross-cutting context (your CRM account state, today's on-call rotation, a feature-flag snapshot) at any priority slot without forking the agent package.
This page documents the extension API. It is aimed at developers integrating ZenSearch into a larger Go application — if you only deploy ZenSearch as-is via Docker, you don't need this page.
Built-in providers
NewOrchestrator automatically registers three providers:
| Priority | Name | Purpose |
|---|---|---|
| 10 | ObservationContextProvider | Injects the observational-memory prefix when the conversation has stored observations or a reflection. Persists to state.Messages so subsequent iterations see the prefix without re-injecting |
| 20 | CompactionContextProvider | Runs deterministic message compaction. Skipped when observations are present (the two compression layers are mutually exclusive) |
| 30 | TruncationContextProvider | Final message-count and token-budget truncation. Pure view producer — does NOT write back to state.Messages so subsequent iterations still see the full compacted history |
Lower priority numbers run earlier. Built-in slots have deliberate gaps so custom providers can slot in at 5, 15, 25, or 100 without renumbering.
ContextProvider interface
package agent
type ContextProvider interface {
// Name is a short stable identifier used in logs and metrics.
Name() string
// Priority controls ordering — lower runs earlier.
Priority() int
// EstimateTokens is advisory; the pipeline does not currently
// call it automatically. Return 0 if estimation isn't meaningful.
EstimateTokens(ctx context.Context, state *AgentState) int
// Apply transforms the message list. Provider may also mutate
// state for sidecar data (e.g. recording that the prefix was
// injected at iteration N). Returning nil is treated as
// "no change". Errors are logged and the provider is skipped —
// subsequent providers still run.
Apply(ctx context.Context, state *AgentState, messages []ChatMessage, budget int) ([]ChatMessage, error)
}
Registering a custom provider
There are two equivalent paths. Pick whichever fits your wiring style.
Declarative — via OrchestratorConfig.ExtraContextProviders
import "github.com/zenousai/zensearch/pkg/agent"
cfg := &agent.OrchestratorConfig{
Enabled: true,
// ... other config ...
ExtraContextProviders: []agent.ContextProvider{
myCRMSnapshotProvider,
myOnCallRotationProvider,
},
}
orch := agent.NewOrchestrator(cfg, registry, chatClient, logger)
NewOrchestrator registers the three built-in providers first, then loops over ExtraContextProviders and registers each one. The pipeline's stable priority sort places them wherever their Priority() value slots in.
Imperative — via Orchestrator.ContextPipeline().Register()
orch := agent.NewOrchestrator(cfg, registry, chatClient, logger)
orch.ContextPipeline().Register(myCRMSnapshotProvider)
orch.ContextPipeline().Register(myOnCallRotationProvider)
This path is useful when the providers depend on services that are constructed after the orchestrator (e.g. a provider that needs a Postgres handle wired up later in startup).
Worked example: CRM account snapshot provider
A provider that fetches the user's current CRM account record and prepends it as a system message before every LLM call.
package myapp
import (
"context"
"encoding/json"
"fmt"
"github.com/zenousai/zensearch/pkg/agent"
)
type CRMSnapshotProvider struct {
crm CRMClient
}
func (*CRMSnapshotProvider) Name() string { return "crm_snapshot" }
func (*CRMSnapshotProvider) Priority() int { return 5 } // Before observations (10)
func (*CRMSnapshotProvider) EstimateTokens(ctx context.Context, state *agent.AgentState) int {
return 200 // Rough — the snapshot is a small JSON blob
}
func (p *CRMSnapshotProvider) Apply(
ctx context.Context,
state *agent.AgentState,
messages []agent.ChatMessage,
budget int,
) ([]agent.ChatMessage, error) {
account, err := p.crm.GetAccountForUser(ctx, state.UserID)
if err != nil {
// Errors are logged and the provider is skipped — return
// the unchanged slice so other providers still run.
return messages, fmt.Errorf("crm snapshot: %w", err)
}
if account == nil {
return messages, nil
}
snapshot, _ := json.Marshal(account)
prefix := agent.ChatMessage{
Role: "system",
Content: fmt.Sprintf("Current CRM account context:\n%s", snapshot),
}
// Insert after the real system prompt (index 0) so the LLM still
// sees the agent's instructions first.
out := make([]agent.ChatMessage, 0, len(messages)+1)
if len(messages) > 0 && messages[0].Role == "system" {
out = append(out, messages[0], prefix)
out = append(out, messages[1:]...)
} else {
out = append(out, prefix)
out = append(out, messages...)
}
return out, nil
}
Wire it up:
orch := agent.NewOrchestrator(cfg, registry, chatClient, logger)
orch.ContextPipeline().Register(&CRMSnapshotProvider{crm: crmClient})
Provider semantics
A few invariants worth knowing when writing a custom provider:
Failures are isolated. If Apply returns an error, the pipeline logs a warning and skips this provider — subsequent providers still run with the previous output. The pipeline ignores the returned slice on the error path (continues past it), so (messages, err) and (nil, err) are functionally equivalent. Returning (messages, err) is still recommended as a defensive convention so a future pipeline change can pick up partial output without surprising existing providers.
Returning nil means "no change". A provider that decides it has nothing to add can return (nil, nil) and the pipeline treats it as a no-op. This keeps skip branches in custom providers compact.
Context cancellation propagates. The pipeline checks ctx.Done() between providers, so a cancelled request stops the chain immediately. Long-running providers (e.g. one that calls a slow external service) should respect the context themselves.
Persistence is the provider's job, not the pipeline's. Pipeline.Assemble returns the final message list but does NOT write it back to state.Messages. If your provider's transformation should persist across LLMNode iterations (so subsequent runs see the modified state), the provider must write state.Messages = next itself before returning. The built-in observation and compaction providers do this; the truncation provider does not (truncation is a transient view).
Run order is by Priority(). Insertion order is preserved within a single priority value (stable sort), but two providers with priorities 10 and 5 will always run 5 first regardless of registration order.
Where the pipeline runs
The pipeline is invoked twice per agent execution:
LLMNode— every iteration of the agent's tool-calling loop, immediately before the chat client callSynthesizeNode— once at the end of execution, before the final answer is generated
Custom providers run in both places, so a CRM snapshot provider would refresh the account state on every iteration AND on the final synthesis pass.
See also
- AI Agents — the user-facing description of compaction, observational memory, and procedural memory built on this pipeline. The "Context Compaction" and "Observational Memory" subsections list every env var that toggles a built-in provider