Solver
Configure and run the solver to find optimal solutions.
The solver is the engine that finds optimal solutions to your planning problems. This section covers how to configure and run it.
Topics
Quick Example
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
# Configure the solver
solver_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
)
)
# Create and run the solver
solver_factory = SolverFactory.create(solver_config)
solver = solver_factory.build_solver()
problem = load_problem() # Your problem data
solution = solver.solve(problem)
print(f"Best score: {solution.score}")
Termination
The solver needs to know when to stop. Common termination conditions:
| Condition | Description |
|---|
spent_limit | Stop after a time limit (e.g., 30 seconds) |
best_score_limit | Stop when a target score is reached |
unimproved_spent_limit | Stop if no improvement for a duration |
step_count_limit | Stop after a number of optimization steps |
1 - Solver Configuration
Configure the solver with SolverConfig and related classes.
Configure the solver using Python dataclasses. This defines what to solve, how to score, and when to stop.
SolverConfig
The main configuration class:
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
solver_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
),
)
Required Fields
| Field | Description |
|---|
solution_class | The @planning_solution class |
entity_class_list | List of @planning_entity classes |
score_director_factory_config | How to calculate scores |
Optional Fields
| Field | Description | Default |
|---|
termination_config | When to stop | Never (manual termination) |
environment_mode | Validation level | REPRODUCIBLE |
random_seed | For reproducibility | Random |
ScoreDirectorFactoryConfig
Configures constraint evaluation:
ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
)
With Constraint Provider
from my_app.constraints import define_constraints
ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
)
TerminationConfig
Controls when the solver stops:
Time Limit
TerminationConfig(
spent_limit=Duration(seconds=30)
)
# Other duration units
Duration(minutes=5)
Duration(hours=1)
Duration(milliseconds=500)
Score Target
Stop when a target score is reached:
TerminationConfig(
best_score_limit="0hard/-10soft"
)
Step Limit
Stop after a number of steps:
TerminationConfig(
step_count_limit=10000
)
Unimproved Time
Stop if no improvement for a duration:
TerminationConfig(
unimproved_spent_limit=Duration(seconds=30)
)
Combining Conditions
Multiple conditions use OR logic:
TerminationConfig(
spent_limit=Duration(minutes=5),
best_score_limit="0hard/0soft", # OR achieves perfect
unimproved_spent_limit=Duration(seconds=60) # OR stuck
)
Environment Mode
Controls validation and reproducibility:
from solverforge_legacy.solver.config import EnvironmentMode
SolverConfig(
environment_mode=EnvironmentMode.REPRODUCIBLE,
...
)
| Mode | Description | Use Case |
|---|
NON_REPRODUCIBLE | Fastest, no validation | Production |
REPRODUCIBLE | Deterministic results | Default |
FAST_ASSERT | Quick validation | Testing |
FULL_ASSERT | Complete validation | Debugging |
Debugging Score Corruption
Use FULL_ASSERT to detect score calculation bugs:
SolverConfig(
environment_mode=EnvironmentMode.FULL_ASSERT,
...
)
This validates every score calculation but is slow.
Reproducibility
For reproducible results, set a random seed:
SolverConfig(
random_seed=42,
environment_mode=EnvironmentMode.REPRODUCIBLE,
...
)
Configuration Overrides
Override configuration when building a solver:
from solverforge_legacy.solver.config import SolverConfigOverride
solver_factory = SolverFactory.create(solver_config)
# Override termination for this solver instance
override = SolverConfigOverride(
termination_config=TerminationConfig(spent_limit=Duration(seconds=10))
)
solver = solver_factory.build_solver(override)
Complete Example
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
EnvironmentMode,
)
from my_app.domain import Timetable, Lesson
from my_app.constraints import define_constraints
def create_solver():
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(minutes=5),
best_score_limit="0hard/0soft",
),
environment_mode=EnvironmentMode.REPRODUCIBLE,
random_seed=42,
)
factory = SolverFactory.create(config)
return factory.build_solver()
Configuration Best Practices
Development
SolverConfig(
environment_mode=EnvironmentMode.FULL_ASSERT,
termination_config=TerminationConfig(spent_limit=Duration(seconds=10)),
...
)
Testing
SolverConfig(
environment_mode=EnvironmentMode.REPRODUCIBLE,
random_seed=42, # Reproducible tests
termination_config=TerminationConfig(spent_limit=Duration(seconds=5)),
...
)
Production
SolverConfig(
environment_mode=EnvironmentMode.NON_REPRODUCIBLE,
termination_config=TerminationConfig(
spent_limit=Duration(minutes=5),
unimproved_spent_limit=Duration(minutes=1),
),
...
)
Next Steps
2 - Running the Solver
Execute the solver synchronously with Solver.solve().
The simplest way to solve a problem is with Solver.solve(), which blocks until termination.
Basic Usage
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
# Configure
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)
# Create factory and solver
factory = SolverFactory.create(config)
solver = factory.build_solver()
# Load problem
problem = load_problem()
# Solve (blocks until done)
solution = solver.solve(problem)
# Use solution
print(f"Score: {solution.score}")
Event Listeners
Monitor progress with event listeners:
from solverforge_legacy.solver import BestSolutionChangedEvent
def on_best_solution_changed(event: BestSolutionChangedEvent):
print(f"New best score: {event.new_best_score}")
print(f"Time spent: {event.time_spent}")
solver.add_event_listener(on_best_solution_changed)
solution = solver.solve(problem)
BestSolutionChangedEvent Properties
| Property | Description |
|---|
new_best_score | The new best score |
new_best_solution | The new best solution |
time_spent | Duration since solving started |
is_new_best_solution_initialized | True if all variables are assigned |
Removing Listeners
solver.add_event_listener(listener)
# ... later ...
solver.remove_event_listener(listener)
Early Termination
Stop solving before the termination condition:
import threading
def timeout_termination(solver, timeout_seconds):
"""Terminate after timeout."""
time.sleep(timeout_seconds)
solver.terminate_early()
# Start termination thread
thread = threading.Thread(target=timeout_termination, args=(solver, 60))
thread.start()
solution = solver.solve(problem)
Manual Termination
# From another thread
solver.terminate_early()
# Check if termination was requested
if solver.is_terminate_early():
print("Termination was requested")
Checking Solver State
# Is the solver currently running?
if solver.is_solving():
print("Solver is running")
# Was early termination requested?
if solver.is_terminate_early():
print("Termination requested")
Problem Changes (Real-Time)
Modify the problem while solving:
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange[Timetable]):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, working_solution: Timetable, score_director):
# Add to working solution
working_solution.lessons.append(self.lesson)
# Notify score director
score_director.after_entity_added(self.lesson)
# Add change while solving
new_lesson = Lesson("new", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))
See Real-Time Planning for more details.
Solver Reuse
Don’t reuse a solver instance—create a new one for each solve:
# Correct: New solver each time
solver1 = factory.build_solver()
solution1 = solver1.solve(problem1)
solver2 = factory.build_solver()
solution2 = solver2.solve(problem2)
# Incorrect: Reusing solver
solver = factory.build_solver()
solution1 = solver.solve(problem1)
solution2 = solver.solve(problem2) # Don't do this!
Threading
Solver.solve() blocks the calling thread. For non-blocking operation, use:
Background thread:
thread = threading.Thread(target=lambda: solver.solve(problem))
thread.start()
SolverManager (recommended for production):
See SolverManager
Error Handling
try:
solution = solver.solve(problem)
except Exception as e:
print(f"Solving failed: {e}")
# Handle error (log, retry, etc.)
Complete Example
from solverforge_legacy.solver import SolverFactory, BestSolutionChangedEvent
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("solver")
def solve_timetable(problem: Timetable) -> Timetable:
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(minutes=5),
unimproved_spent_limit=Duration(seconds=60),
),
)
factory = SolverFactory.create(config)
solver = factory.build_solver()
# Log progress
def on_progress(event: BestSolutionChangedEvent):
logger.info(f"Score: {event.new_best_score} at {event.time_spent}")
solver.add_event_listener(on_progress)
# Solve
logger.info("Starting solver...")
solution = solver.solve(problem)
logger.info(f"Solving finished. Final score: {solution.score}")
return solution
if __name__ == "__main__":
problem = load_problem()
solution = solve_timetable(problem)
save_solution(solution)
Next Steps
3 - SolverManager
Manage concurrent and asynchronous solving jobs.
SolverManager handles concurrent solving jobs, making it ideal for web applications and services.
Creating a SolverManager
from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)
solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)
Basic Solving
solve()
Non-blocking solve that returns a future:
import uuid
job_id = str(uuid.uuid4())
# Start solving (non-blocking)
future = solver_manager.solve(job_id, problem)
# ... do other work ...
# Get result (blocks until done)
solution = future.get_final_best_solution()
print(f"Score: {solution.score}")
solve_and_listen()
Solve with progress callbacks:
def on_best_solution_changed(solution: Timetable):
print(f"New best: {solution.score}")
# Update UI, save to database, etc.
def on_exception(error):
print(f"Solving failed: {error}")
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_best_solution_changed,
exception_handler=on_exception,
)
Managing Jobs
Check Job Status
status = solver_manager.get_solver_status(job_id)
# Returns: NOT_SOLVING, SOLVING_ACTIVE, SOLVING_ENDED
Get Current Best Solution
solution = solver_manager.get_best_solution(job_id)
if solution:
print(f"Current best: {solution.score}")
Terminate Early
solver_manager.terminate_early(job_id)
FastAPI Integration
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid
solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
global solver_manager
config = SolverConfig(...)
factory = SolverFactory.create(config)
solver_manager = SolverManager.create(factory)
yield
solver_manager.close()
app = FastAPI(lifespan=lifespan)
@app.post("/solve")
async def start_solving(problem: TimetableRequest) -> str:
job_id = str(uuid.uuid4())
def on_best_solution(solution: Timetable):
solutions[job_id] = solution
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem.to_domain(),
best_solution_consumer=on_best_solution,
)
return job_id
@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
status = solver_manager.get_solver_status(job_id)
return {
"status": status.name,
"score": str(solution.score),
"solution": TimetableResponse.from_domain(solution),
}
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
solver_manager.terminate_early(job_id)
return {"status": "terminating"}
Concurrent Jobs
SolverManager handles multiple jobs concurrently:
# Start multiple jobs
job1 = solver_manager.solve("job1", problem1)
job2 = solver_manager.solve("job2", problem2)
job3 = solver_manager.solve("job3", problem3)
# Each runs in its own thread
# Results available when ready
solution1 = job1.get_final_best_solution()
Resource Limits
By default, jobs run with no limit on concurrent execution. For resource management:
# Limit concurrent solvers
solver_manager = SolverManager.create(
solver_factory,
parallel_solver_count=4, # Max 4 concurrent jobs
)
Problem Changes During Solving
Add changes to running jobs:
from solverforge_legacy.solver import ProblemChange
class AddEntity(ProblemChange[Timetable]):
def __init__(self, entity):
self.entity = entity
def do_change(self, working_solution, score_director):
working_solution.lessons.append(self.entity)
score_director.after_entity_added(self.entity)
# Add change to running job
solver_manager.add_problem_change(job_id, AddEntity(new_lesson))
Cleanup
Always close the SolverManager when done:
# Using context manager
with SolverManager.create(factory) as manager:
# ... use manager ...
# Automatically closed
# Manual cleanup
try:
# ... use manager ...
finally:
solver_manager.close()
Error Handling
def on_exception(job_id: str, exception: Exception):
logger.error(f"Job {job_id} failed: {exception}")
# Clean up, notify user, etc.
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_solution,
exception_handler=on_exception,
)
Best Practices
Do
- Use
solve_and_listen() for progress updates - Store solutions externally (database, cache)
- Handle exceptions properly
- Close SolverManager on shutdown
Don’t
- Block the main thread waiting for results
- Store solutions only in memory (lose on restart)
- Forget to handle job cleanup
Next Steps
4 - SolutionManager
Analyze and explain solutions with SolutionManager.
SolutionManager provides utilities for analyzing solutions without running the solver.
Creating a SolutionManager
from solverforge_legacy.solver import SolverFactory, SolutionManager
solver_factory = SolverFactory.create(config)
solution_manager = SolutionManager.create(solver_factory)
Or from a SolverManager:
solver_manager = SolverManager.create(solver_factory)
solution_manager = SolutionManager.create(solver_manager)
Score Calculation
Calculate the score of a solution without solving:
# Update score in place
solution_manager.update(solution)
print(f"Score: {solution.score}")
This is useful for:
- Validating manually created solutions
- Comparing before/after changes
- Testing constraint configurations
Score Analysis
Get a detailed breakdown of the score:
analysis = solution_manager.analyze(solution)
print(f"Overall score: {analysis.score}")
# Per-constraint breakdown
for constraint in analysis.constraint_analyses():
print(f"\n{constraint.constraint_name}:")
print(f" Score: {constraint.score}")
print(f" Matches: {constraint.match_count}")
Constraint Matches
See exactly which entities triggered each constraint:
for constraint in analysis.constraint_analyses():
print(f"\n{constraint.constraint_name}:")
for match in constraint.matches():
print(f" - {match.justification}: {match.score}")
Indictments
Find which entities are responsible for score impacts:
for indictment in analysis.indictments():
print(f"\nEntity: {indictment.indicted_object}")
print(f" Total impact: {indictment.score}")
for match in indictment.matches():
print(f" - {match.constraint_name}: {match.score}")
Use Cases
def validate_schedule(schedule: Schedule) -> list[str]:
"""Validate a manually created schedule."""
solution_manager.update(schedule)
if schedule.score.is_feasible:
return []
# Collect violations
violations = []
analysis = solution_manager.analyze(schedule)
for constraint in analysis.constraint_analyses():
if constraint.score.hard_score < 0:
for match in constraint.matches():
violations.append(str(match.justification))
return violations
Compare Solutions
def compare_solutions(old: Schedule, new: Schedule) -> dict:
"""Compare two solutions."""
old_analysis = solution_manager.analyze(old)
new_analysis = solution_manager.analyze(new)
return {
"old_score": str(old_analysis.score),
"new_score": str(new_analysis.score),
"improved": new_analysis.score > old_analysis.score,
"changes": get_constraint_changes(old_analysis, new_analysis),
}
def get_constraint_changes(old, new):
old_scores = {c.constraint_name: c.score for c in old.constraint_analyses()}
changes = []
for constraint in new.constraint_analyses():
old_score = old_scores.get(constraint.constraint_name)
if old_score != constraint.score:
changes.append({
"constraint": constraint.constraint_name,
"old": str(old_score),
"new": str(constraint.score),
})
return changes
Explain to Users
def explain_score(schedule: Schedule) -> dict:
"""Generate user-friendly score explanation."""
analysis = solution_manager.analyze(schedule)
hard_violations = []
soft_penalties = []
for constraint in analysis.constraint_analyses():
if constraint.score.hard_score < 0:
for match in constraint.matches():
hard_violations.append({
"rule": constraint.constraint_name,
"details": str(match.justification),
})
elif constraint.score.soft_score < 0:
soft_penalties.append({
"rule": constraint.constraint_name,
"impact": constraint.match_count,
})
return {
"is_valid": schedule.score.is_feasible,
"hard_violations": hard_violations,
"soft_penalties": soft_penalties,
"summary": generate_summary(analysis),
}
API Endpoint
from fastapi import FastAPI
@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
solution = solutions.get(job_id)
if not solution:
raise HTTPException(404)
analysis = solution_manager.analyze(solution)
return {
"score": str(analysis.score),
"is_feasible": analysis.score.is_feasible,
"constraints": [
{
"name": c.constraint_name,
"score": str(c.score),
"matches": c.match_count,
}
for c in analysis.constraint_analyses()
],
}
Debugging
Finding Score Corruption
def debug_score(solution):
"""Debug score calculation."""
# Calculate fresh
solution_manager.update(solution)
fresh_score = solution.score
# Analyze
analysis = solution_manager.analyze(solution)
analyzed_score = analysis.score
if fresh_score != analyzed_score:
print(f"Score mismatch: {fresh_score} vs {analyzed_score}")
# Check each constraint
total_hard = 0
total_soft = 0
for c in analysis.constraint_analyses():
total_hard += c.score.hard_score
total_soft += c.score.soft_score
print(f"{c.constraint_name}: {c.score}")
print(f"\nCalculated: {total_hard}hard/{total_soft}soft")
print(f"Reported: {analyzed_score}")
Finding Unexpected Matches
def find_unexpected_matches(solution, constraint_name):
"""Debug why a constraint is matching."""
analysis = solution_manager.analyze(solution)
for c in analysis.constraint_analyses():
if c.constraint_name == constraint_name:
print(f"\n{constraint_name} matches ({c.match_count}):")
for match in c.matches():
print(f" - {match.justification}")
return
print(f"Constraint '{constraint_name}' not found")
update() is fast (incremental calculation)analyze() is slower (collects all match details)- Cache analysis results if calling repeatedly
- Don’t analyze every solution during solving
Next Steps
5 - Benchmarking
Compare solver configurations and tune performance.
Benchmarking helps you compare different solver configurations and find the best settings for your problem.
Why Benchmark
- Compare algorithms: Find the best algorithm combination
- Tune parameters: Optimize termination times, moves, etc.
- Validate changes: Ensure improvements don’t regress
- Understand scaling: See how performance changes with problem size
Basic Benchmarking
Create a simple benchmark by running the solver multiple times:
import time
from statistics import mean, stdev
def benchmark_config(config: SolverConfig, problems: list, runs: int = 3):
"""Benchmark a solver configuration."""
results = []
for problem in problems:
problem_results = []
for run in range(runs):
factory = SolverFactory.create(config)
solver = factory.build_solver()
start = time.time()
solution = solver.solve(problem)
elapsed = time.time() - start
problem_results.append({
"score": solution.score,
"time": elapsed,
"feasible": solution.score.is_feasible,
})
results.append({
"problem": problem.id,
"avg_score": mean(r["score"].soft_score for r in problem_results),
"avg_time": mean(r["time"] for r in problem_results),
"feasibility_rate": sum(r["feasible"] for r in problem_results) / runs,
})
return results
Comparing Configurations
def compare_termination_times():
"""Compare different termination durations."""
base_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
)
durations = [10, 30, 60, 120, 300] # seconds
problems = load_benchmark_problems()
results = {}
for duration in durations:
config = SolverConfig(
**vars(base_config),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=duration)
),
)
results[duration] = benchmark_config(config, problems)
return results
Benchmark Report
Generate a readable report:
def generate_report(results: dict):
"""Generate benchmark report."""
print("=" * 60)
print("BENCHMARK REPORT")
print("=" * 60)
for config_name, config_results in results.items():
print(f"\n{config_name}:")
print("-" * 40)
total_score = 0
total_time = 0
feasible_count = 0
for r in config_results:
print(f" {r['problem']}: score={r['avg_score']:.1f}, "
f"time={r['avg_time']:.1f}s, "
f"feasible={r['feasibility_rate']*100:.0f}%")
total_score += r["avg_score"]
total_time += r["avg_time"]
feasible_count += r["feasibility_rate"]
n = len(config_results)
print(f"\n Average: score={total_score/n:.1f}, "
f"time={total_time/n:.1f}s, "
f"feasible={feasible_count/n*100:.0f}%")
print("\n" + "=" * 60)
Problem Datasets
Create consistent benchmark datasets:
class BenchmarkDataset:
"""Collection of benchmark problems."""
@staticmethod
def small():
"""Small problems for quick testing."""
return [
generate_problem(lessons=20, rooms=3, timeslots=10),
generate_problem(lessons=30, rooms=4, timeslots=10),
]
@staticmethod
def medium():
"""Medium problems for standard benchmarks."""
return [
generate_problem(lessons=100, rooms=10, timeslots=25),
generate_problem(lessons=150, rooms=12, timeslots=25),
]
@staticmethod
def large():
"""Large problems for stress testing."""
return [
generate_problem(lessons=500, rooms=20, timeslots=50),
generate_problem(lessons=1000, rooms=30, timeslots=50),
]
Reproducible Benchmarks
For consistent results:
def reproducible_benchmark(config: SolverConfig, problem, seed: int = 42):
"""Run benchmark with fixed seed."""
config = SolverConfig(
**vars(config),
environment_mode=EnvironmentMode.REPRODUCIBLE,
random_seed=seed,
)
factory = SolverFactory.create(config)
solver = factory.build_solver()
return solver.solve(problem)
Metrics to Track
Primary Metrics
| Metric | Description |
|---|
| Best Score | Final solution quality |
| Time to Best | When best score was found |
| Feasibility Rate | % of runs finding feasible solution |
Secondary Metrics
| Metric | Description |
|---|
| Score Over Time | Score improvement curve |
| Steps per Second | Algorithm throughput |
| Memory Usage | Peak memory consumption |
Score Over Time
Track how score improves:
def benchmark_with_history(config: SolverConfig, problem):
"""Benchmark with score history."""
history = []
def on_progress(event):
history.append({
"time": event.time_spent.total_seconds(),
"score": event.new_best_score,
})
factory = SolverFactory.create(config)
solver = factory.build_solver()
solver.add_event_listener(on_progress)
solution = solver.solve(problem)
return {
"final_score": solution.score,
"history": history,
}
Visualization
Plot results with matplotlib:
import matplotlib.pyplot as plt
def plot_score_over_time(results: dict):
"""Plot score improvement over time."""
plt.figure(figsize=(10, 6))
for config_name, result in results.items():
times = [h["time"] for h in result["history"]]
scores = [h["score"].soft_score for h in result["history"]]
plt.plot(times, scores, label=config_name)
plt.xlabel("Time (seconds)")
plt.ylabel("Soft Score")
plt.title("Score Improvement Over Time")
plt.legend()
plt.grid(True)
plt.savefig("benchmark_results.png")
CI/CD Integration
Add benchmarks to your pipeline:
# test_benchmark.py
import pytest
def test_minimum_score():
"""Ensure solver achieves minimum score."""
config = load_production_config()
problem = BenchmarkDataset.small()[0]
factory = SolverFactory.create(config)
solver = factory.build_solver()
solution = solver.solve(problem)
assert solution.score.is_feasible, "Solution should be feasible"
assert solution.score.soft_score >= -100, "Score should be >= -100"
def test_performance_regression():
"""Check for performance regression."""
config = load_production_config()
problem = BenchmarkDataset.medium()[0]
start = time.time()
factory = SolverFactory.create(config)
solver = factory.build_solver()
solution = solver.solve(problem)
elapsed = time.time() - start
assert solution.score.is_feasible
assert elapsed < 120, "Should complete within 2 minutes"
Best Practices
Do
- Use consistent problem datasets
- Run multiple times (3-5) for statistical significance
- Track both score and time
- Use reproducible mode for comparisons
Don’t
- Compare results from different machines
- Use production data for benchmarks (privacy)
- Optimize for benchmark problems only
- Ignore feasibility rate
Next Steps