This page provides a detailed walkthrough of the Prebid Sales Agent source code. It covers the project directory structure, core modules, tool implementations, database layer, services, adapters, and the key architectural patterns that tie everything together.
The source code lives under the src/ directory, organized by architectural layer:
src/
├── core/ # Core application logic
│ ├── main.py # FastMCP server, route mounting, entry point
│ ├── schemas.py # Pydantic request/response models
│ ├── tool_context.py # ToolContext dependency injection
│ ├── config_loader.py # Environment variable loading, feature flags
│ ├── audit_logger.py # Operation logging, security event tracking
│ ├── database/ # Database layer
│ │ ├── models.py # SQLAlchemy ORM models
│ │ ├── database.py # Engine initialization, pool config
│ │ └── database_session.py # get_db_session() context manager
│ └── tools/ # MCP tool implementations
│ ├── capabilities.py # get_adcp_capabilities
│ ├── products.py # get_products
│ ├── creative_formats.py # list_creative_formats
│ ├── media_buy_create.py # create_media_buy
│ ├── media_buy_update.py # update_media_buy
│ ├── media_buy_delivery.py # get_media_buy_delivery
│ ├── media_buy_list.py # get_media_buys
│ ├── properties.py # list_authorized_properties
│ ├── performance.py # update_performance_index
│ └── creatives/ # Creative management tools
│ ├── sync_wrappers.py # sync_creatives
│ └── listing.py # list_creatives
├── services/ # Business logic services
│ ├── ai_product_service.py # Gemini AI product discovery
│ ├── targeting_capabilities.py # Targeting taxonomy management
│ └── gam_inventory_service.py # GAM inventory synchronization
├── adapters/ # Ad server adapters
│ ├── base.py # AdServerAdapter abstract base class
│ ├── google_ad_manager.py # GAM API integration
│ └── mock_ad_server.py # Mock adapter for testing
└── admin/ # Admin UI
├── app.py # Flask application factory
└── blueprints/ # Flask blueprints
├── tenants.py # Tenant management views
└── activity_stream.py # SSE activity feed
Supporting directories outside src/:
tests/
├── unit/ # Fast, no-dependency tests
├── integration/ # Requires PostgreSQL
└── e2e/ # Full Docker-based tests
alembic/ # Database migration scripts
docker/ # Dockerfiles and compose configs
.claude/ # Claude Code project context
src/core/main.py is the entry point for the entire application. It creates the FastMCP server instance, mounts the Admin UI as a sub-application, and registers all MCP tools.
from mcp.server.fastmcp import FastMCP
# Create the FastMCP server
mcp = FastMCP("salesagent")
# Mount the Flask admin UI
mcp.mount("/admin", admin_app)
# Tools are registered via @mcp.tool() decorators in the tools/ directory
# The import of tool modules triggers their registration
Key responsibilities:
FastMCP server instance/admin@mcp.tool() registrationsrc/core/schemas.py defines all request and response models as Pydantic BaseModel classes. Every piece of data entering or leaving the system passes through these schemas for validation.
from pydantic import BaseModel, Field
from enum import Enum
from typing import Optional
from datetime import datetime
class PricingModel(str, Enum):
cpm = "cpm"
vcpm = "vcpm"
cpc = "cpc"
cpcv = "cpcv"
cpv = "cpv"
cpp = "cpp"
flat_rate = "flat_rate"
class Product(BaseModel):
product_id: str
name: str
description: str
delivery_type: DeliveryType
format_ids: list[str]
pricing_options: list[PricingOption]
estimated_exposures: Optional[ExposureEstimate] = None
channels: list[str]
countries: list[str]
See the API Schema Reference for the complete model catalog.
src/core/tool_context.py defines the ToolContext class, which is the primary mechanism for dependency injection in MCP tools. Every tool that needs database access, adapter access, or tenant scoping receives a ToolContext instance.
class ToolContext:
tenant_id: int # Resolved from auth token
principal_id: int # The authenticated agent/user
db_session: Session # SQLAlchemy session (scoped to tenant)
adapter: AdServerAdapter # Tenant's configured ad server adapter
config: Config # Server configuration and feature flags
The get_tool_context dependency resolver:
ToolContext with all resolved dependenciesDepends()src/core/config_loader.py handles loading configuration from environment variables, resolving tenant-specific settings, and managing feature flags.
| Responsibility | Description |
|---|---|
| Environment loading | Reads .env file and environment variables |
| Tenant resolution | Maps hostnames and tokens to tenant configurations |
| Feature flags | Controls optional features (AI search, approval workflows) |
| Validation | Ensures required variables are set at startup |
src/core/audit_logger.py records all significant operations for compliance and debugging. Every media buy creation, creative upload, status change, and administrative action is logged with:
The src/core/tools/ directory contains one file per MCP tool (or group of related tools). Each tool is registered with the FastMCP server via the @mcp.tool() decorator.
All tools follow a consistent pattern:
from src.core.main import mcp
from src.core.tool_context import ToolContext, get_tool_context
from fastapi import Depends
@mcp.tool()
async def tool_name(
param1: str,
param2: int = 0,
ctx: ToolContext = Depends(get_tool_context),
) -> ToolResponse:
"""Tool description shown to AI agents."""
return await _tool_name_impl(param1, param2, ctx)
| File | Tool(s) | Category | Description |
|---|---|---|---|
capabilities.py |
get_adcp_capabilities |
Discovery | Protocol capability discovery and agent metadata |
products.py |
get_products |
Discovery | AI-powered product search with Gemini integration |
creative_formats.py |
list_creative_formats |
Discovery | Creative format specifications and constraints |
media_buy_create.py |
create_media_buy |
Execution | Campaign creation with targeting and budget |
media_buy_update.py |
update_media_buy |
Execution | Campaign modification (budget, dates, status) |
media_buy_delivery.py |
get_media_buy_delivery |
Performance | Delivery metrics from the ad server |
media_buy_list.py |
get_media_buys |
Execution | Query and list media buys with filters |
creatives/sync_wrappers.py |
sync_creatives |
Creative | Upload and sync creative assets with the ad server |
creatives/listing.py |
list_creatives |
Creative | Search and filter the creative library |
properties.py |
list_authorized_properties |
Governance | Publisher domain authorization |
performance.py |
update_performance_index |
Performance | Package-level performance data aggregation |
Tools use a with_error_logging wrapper that provides consistent error handling and audit logging:
@mcp.tool()
async def create_media_buy(
params: CreateMediaBuyRequest,
ctx: ToolContext = Depends(get_tool_context),
) -> CreateMediaBuyResponse:
"""Create a new media buy campaign."""
async with with_error_logging(ctx, "create_media_buy"):
# Business logic here
# Errors are caught, logged, and returned as structured error responses
...
This wrapper:
Some tools expose their core logic through raw_functions.py modules, which provide a plain Python interface without MCP decorators. These are used by the A2A (Agent-to-Agent) JSON-RPC handler to invoke the same business logic through a different protocol.
The database layer lives in src/core/database/ and uses SQLAlchemy 2.x with async support.
src/core/database/models.py defines SQLAlchemy declarative models for all database tables:
| Model | Table | Key Fields | Relationships |
|---|---|---|---|
Tenant |
tenants |
id, name, slug, ad_server_type, config |
Has many: principals, products, media_buys, audit_logs |
Principal |
principals |
id, tenant_id, email, name, token_hash, role |
Belongs to: tenant. Has many: media_buys |
Product |
products |
id, tenant_id, name, description, pricing, targeting |
Belongs to: tenant. Has many: media_buys |
MediaBuy |
media_buys |
id, tenant_id, principal_id, product_id, budget, status, flight_dates |
Belongs to: tenant, principal, product. Has many: creatives |
Creative |
creatives |
id, tenant_id, media_buy_id, format, content, approval_status |
Belongs to: tenant, media_buy |
AuditLog |
audit_logs |
id, tenant_id, principal_id, action, entity_type, entity_id, details, timestamp |
Belongs to: tenant, principal |
All models include tenant_id as a foreign key with a composite unique constraint to enforce data isolation.
src/core/database/database.py creates and configures the SQLAlchemy async engine with connection pooling:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_recycle=3600,
)
src/core/database/database_session.py provides the get_db_session() context manager, which is the only sanctioned way to obtain a database session:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_session():
async with AsyncSession(engine) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
All database access must go through get_db_session(). Never create connections manually or use raw connection strings. This ensures proper connection pooling, transaction management, and cleanup.
The src/services/ directory contains business logic that is shared across multiple tools.
src/services/ai_product_service.py implements the AI-powered product discovery pipeline. When an AI agent calls get_products with a natural language brief:
When no Gemini API key is configured, the service falls back to keyword-based matching against product names and descriptions.
src/services/targeting_capabilities.py manages geographic and custom targeting taxonomies. It resolves targeting identifiers (country codes, DMA codes, custom key-values) into ad server-specific targeting criteria.
src/services/gam_inventory_service.py synchronizes inventory data between the Sales Agent’s local product catalog and GAM. It handles:
The src/adapters/ directory implements the adapter pattern for ad server integrations.
src/adapters/base.py defines the AdServerAdapter abstract base class. All adapters must implement these methods:
class AdServerAdapter(ABC):
@abstractmethod
async def create_order(self, media_buy: MediaBuy) -> str: ...
@abstractmethod
async def create_line_item(self, media_buy: MediaBuy, order_id: str) -> str: ...
@abstractmethod
async def upload_creative(self, creative: Creative) -> str: ...
@abstractmethod
async def get_delivery_report(self, order_id: str) -> DeliveryReport: ...
@abstractmethod
async def sync_creative(self, creative: Creative) -> str: ...
src/adapters/google_ad_manager.py implements the AdServerAdapter interface for Google Ad Manager. It uses the GAM API client library to translate AdCP operations into GAM API calls. See the Google Ad Manager Integration guide for configuration details.
src/adapters/mock_ad_server.py provides a mock implementation that returns simulated responses. It is used for:
The src/admin/ directory contains a Flask web application that provides the publisher dashboard.
src/admin/app.py creates the Flask application, configures session management, and mounts it as a sub-application of the FastMCP server at /admin.
| Blueprint | File | Routes | Description |
|---|---|---|---|
| Tenants | blueprints/tenants.py |
/admin/tenants/* |
Tenant management, product CRUD, principal management |
| Activity Stream | blueprints/activity_stream.py |
/admin/activity/* |
Real-time SSE feed of system events |
The Admin UI supports OAuth login, role-based access control, and a human-in-the-loop approval queue for media buys and creatives.
Every MCP tool combines the @mcp.tool() decorator for registration with the with_error_logging context manager for consistent error handling and audit logging:
@mcp.tool()
async def tool_name(
params: RequestModel,
ctx: ToolContext = Depends(get_tool_context),
) -> ResponseModel:
"""Tool description."""
async with with_error_logging(ctx, "tool_name"):
# Business logic
return ResponseModel(...)
All tenant-scoped operations receive a ToolContext via FastAPI-style dependency injection. This ensures every database query, adapter call, and audit log entry is automatically scoped to the correct tenant:
ctx: ToolContext = Depends(get_tool_context)
# ctx.tenant_id, ctx.principal_id, ctx.db, ctx.adapter are all available
Database access is always mediated by the get_db_session() context manager. This enforces connection pooling, transaction boundaries, and automatic rollback on errors.
Business logic never calls ad server APIs directly. Instead, it delegates to the adapter instance from ToolContext, which implements the AdServerAdapter interface. This makes the entire service layer ad-server-agnostic.
Tools expose their core logic through plain Python functions (without MCP decorators) in raw_functions.py modules. The A2A JSON-RPC handler calls these functions directly, ensuring both protocols execute identical business logic.
The application starts from src/core/main.py:
FastMCP("salesagent") creates the MCP server instance@mcp.tool() registration for all 14 tools/adminsrc/core/main.py
│
├── Creates FastMCP server
├── Imports src/core/tools/*.py → Registers 14 MCP tools
├── Mounts src/admin/app.py → Admin UI at /admin
├── Configures auth middleware → Token → ToolContext resolution
└── Starts Uvicorn → HTTP/SSE on :8080