Strategy Execution API
Detailed guide for executing YAML strategies through the BioMapper API.
Overview
BioMapper strategies are YAML-defined workflows that execute as background jobs. The API provides comprehensive job management including execution, monitoring, pausing, and checkpointing. Strategies are executed using the MinimalStrategyService with a shared execution context that flows through all actions.
Strategy Definition
Strategies can be executed in two ways:
Pre-defined Strategies: YAML files stored in
src/biomapper/configs/strategies/Inline Strategies: YAML content submitted directly in the API request
Strategy Structure
name: example_strategy
description: Example workflow for data processing
parameters:
input_file: "${DATA_DIR}/input.csv"
output_dir: "${OUTPUT_DIR}"
threshold: 0.8
steps:
- name: load_data
action:
type: LOAD_DATASET_IDENTIFIERS
params:
file_path: "${parameters.input_file}"
identifier_column: "id"
output_key: "raw_data"
- name: process_data
action:
type: FILTER_DATASET
params:
input_key: "raw_data"
threshold: "${parameters.threshold}"
output_key: "filtered_data"
- name: export_results
action:
type: EXPORT_DATASET_V2
params:
input_key: "filtered_data"
output_file: "${parameters.output_dir}/results.csv"
Execution Workflow
Submit Strategy
response = client.post("/api/strategies/v2/execute", json={ "strategy": "example_strategy", "parameters": { "input_file": "/data/mydata.csv", "threshold": 0.9 }, "options": { "checkpoint_enabled": false, "timeout_seconds": 3600 } }) job_id = response.json()["job_id"]
Job Creation
Unique job ID generated (UUID)
Job record created in SQLite database
Background task initiated
Immediate response with job ID
Strategy Loading
YAML file loaded from
src/biomapper/configs/strategies/directory and subdirectoriesParameters substituted using ParameterResolver (
${parameters.key})Environment variables resolved (
${env.VAR}or${VAR})Default values supported (
${parameters.key:-default})Strategy validated for required fields (name, steps)
Action Execution
Actions executed sequentially by MinimalStrategyService
Each action receives shared execution context
Context contains:
datasets,statistics,output_files,current_identifiersActions self-register via
@register_actiondecoratorActions modify context in-place
Progress Tracking
# Poll for status status = client.get(f"/api/jobs/{job_id}/status") print(f"Progress: {status.json()['progress']}%") # Or use SSE for real-time updates for event in client.stream(f"/api/jobs/{job_id}/events"): print(f"Step: {event['current_step']}") print(f"Progress: {event['progress']}%")
Result Retrieval
results = client.get(f"/api/jobs/{job_id}/results") data = results.json() # Access outputs datasets = data["results"]["datasets"] statistics = data["results"]["statistics"] files = data["results"]["output_files"]
Execution Context
The execution context is a shared dictionary passed between actions:
context = {
"datasets": {
"raw_data": [...], # Named datasets
"processed": [...],
"normalized": [...]
},
"current_identifiers": [...], # Active identifier set
"statistics": {
"total_records": 1000,
"processing_time": 45.2,
"action_metrics": {...}
},
"output_files": [
"/results/output.csv",
"/results/report.html"
],
"metadata": {
"strategy_name": "example_strategy",
"start_time": "2024-08-13T10:00:00Z",
"parameters": {...}
}
}
Parameter Substitution
Parameters can be substituted in YAML strategies:
Pattern |
Description |
|---|---|
|
Strategy parameters passed at execution |
|
Environment variables |
|
Shorthand for environment variables |
|
Metadata fields (less common) |
Example:
params:
file_path: "${parameters.input_file}"
output_dir: "${env.OUTPUT_DIR}"
threshold: "${parameters.threshold:-0.8}" # Default value
Job Management
Job States
State |
Description |
|---|---|
|
Job created but not started |
|
Currently executing |
|
Execution paused by user |
|
Successfully finished |
|
Execution failed with error |
|
Cancelled by user |
Job Control
Pause Execution:
client.post(f"/api/jobs/{job_id}/pause")
Resume Execution:
client.post(f"/api/jobs/{job_id}/resume")
Cancel Job:
client.post(f"/api/jobs/{job_id}/cancel")
Checkpointing
BioMapper supports checkpointing for long-running strategies:
Enable Checkpointing:
response = client.post("/api/strategies/v2/execute", json={
"strategy": "long_running_strategy",
"options": {
"checkpoint_enabled": True,
"checkpoint_frequency": 5 # Every 5 actions
}
})
List Checkpoints:
checkpoints = client.get(f"/api/jobs/{job_id}/checkpoints")
Restore from Checkpoint:
client.post(f"/api/jobs/{job_id}/restore/{checkpoint_id}")
Error Handling
Strategy Validation Errors
{
"detail": "Strategy validation failed",
"errors": [
{
"field": "steps[0].action.type",
"message": "Unknown action type: INVALID_ACTION"
}
]
}
Execution Errors
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "failed",
"error": {
"step": "load_data",
"action": "LOAD_DATASET_IDENTIFIERS",
"message": "File not found: /data/missing.csv",
"traceback": "..."
}
}
Recovery Options
Partial results available even if later steps fail
Checkpoints allow resuming from last successful step
Failed jobs can be cloned with modified parameters
Performance Considerations
Memory Management
Large datasets processed in chunks (10,000 rows default)
Automatic garbage collection between actions
Context size monitoring to prevent memory overflow
Concurrency
Multiple strategies can execute simultaneously
Default limit: 10 concurrent executions
Job queue for excess requests
Timeouts
response = client.post("/api/strategies/v2/execute", json={
"strategy": "example_strategy",
"options": {
"timeout_seconds": 3600, # 1 hour
"action_timeout": 300 # 5 min per action
}
})
Monitoring and Logging
Execution Logs
logs = client.get(f"/api/jobs/{job_id}/logs")
for entry in logs.json()["logs"]:
print(f"[{entry['level']}] {entry['message']}")
Metrics
metrics = client.get(f"/api/jobs/{job_id}/metrics")
print(f"CPU Usage: {metrics.json()['cpu_percent']}%")
print(f"Memory: {metrics.json()['memory_mb']} MB")
print(f"Execution Time: {metrics.json()['elapsed_seconds']}s")
Progress Events
Real-time progress via Server-Sent Events:
import json
import requests
# SSE endpoint for streaming updates
response = requests.get(
f"http://localhost:8000/api/jobs/{job_id}/events",
stream=True
)
for line in response.iter_lines():
if line:
event = json.loads(line)
if event["type"] == "progress":
print(f"Progress: {event['percentage']}%")
elif event["type"] == "step_complete":
print(f"Completed: {event['step_name']}")
# WebSocket endpoint also available:
# ws://localhost:8000/api/jobs/{job_id}/ws
Best Practices
Use Checkpointing for long-running strategies
Set Appropriate Timeouts to prevent hanging jobs
Monitor Memory Usage for large datasets
Handle Errors Gracefully with try-catch in client code
Use Parameter Defaults in YAML for flexibility
Stream Progress for better user experience
Clean Up Old Jobs periodically to save disk space
Example: Complete Workflow
from biomapper.client import BiomapperClient
import asyncio
async def run_workflow():
async with BiomapperClient() as client:
# Submit strategy
job = await client.execute_strategy(
"protein_harmonization",
parameters={
"input_file": "/data/proteins.csv",
"output_dir": "/results"
},
options={
"checkpoint_enabled": True,
"timeout_seconds": 3600
}
)
# Monitor progress
async for event in client.stream_progress(job.id):
print(f"Progress: {event.percentage}%")
if event.type == "error":
print(f"Error: {event.message}")
break
# Get results
result = await client.get_job_results(job.id)
if result.success:
print(f"Processed {len(result.datasets['output'])} records")
print(f"Files created: {result.output_files}")
return result
# Run the workflow
result = asyncio.run(run_workflow())
—
Verification Sources
Last verified: 2025-08-17
This documentation was verified against the following project resources:
/biomapper/src/biomapper/api/api/routes/strategies_v2_simple.py(V2 strategy execution endpoints and job handling)/biomapper/src/biomapper/api/api/routes/jobs.py(Job management with persistence and checkpointing)/biomapper/src/biomapper/api/services/persistent_execution_engine.py(Execution engine with checkpoint support)/biomapper/src/biomapper/core/minimal_strategy_service.py(MinimalStrategyService implementation and YAML loading)/biomapper/src/biomapper/actions/registry.py(Self-registering action system)/biomapper/src/biomapper/core/standards/parameter_validator.py(Parameter validation and standardization)/biomapper/src/biomapper/client/client_v2.py(Client-side progress tracking and SSE)/biomapper/CLAUDE.md(Strategy execution patterns and architecture)