This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Solver

Configure and run the solver to find optimal solutions.

The solver is the engine that finds optimal solutions to your planning problems. This section covers how to configure and run it.

Topics

Quick Example

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)

# Configure the solver
solver_config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(
        spent_limit=Duration(seconds=30)
    )
)

# Create and run the solver
solver_factory = SolverFactory.create(solver_config)
solver = solver_factory.build_solver()

problem = load_problem()  # Your problem data
solution = solver.solve(problem)

print(f"Best score: {solution.score}")

Termination

The solver needs to know when to stop. Common termination conditions:

ConditionDescription
spent_limitStop after a time limit (e.g., 30 seconds)
best_score_limitStop when a target score is reached
unimproved_spent_limitStop if no improvement for a duration
step_count_limitStop after a number of optimization steps

1 - Solver Configuration

Configure the solver with SolverConfig and related classes.

Configure the solver using Python dataclasses. This defines what to solve, how to score, and when to stop.

SolverConfig

The main configuration class:

from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

solver_config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(
        spent_limit=Duration(seconds=30)
    ),
)

Required Fields

FieldDescription
solution_classThe @planning_solution class
entity_class_listList of @planning_entity classes
score_director_factory_configHow to calculate scores

Optional Fields

FieldDescriptionDefault
termination_configWhen to stopNever (manual termination)
environment_modeValidation levelREPRODUCIBLE
random_seedFor reproducibilityRandom

ScoreDirectorFactoryConfig

Configures constraint evaluation:

ScoreDirectorFactoryConfig(
    constraint_provider_function=define_constraints
)

With Constraint Provider

from my_app.constraints import define_constraints

ScoreDirectorFactoryConfig(
    constraint_provider_function=define_constraints
)

TerminationConfig

Controls when the solver stops:

Time Limit

TerminationConfig(
    spent_limit=Duration(seconds=30)
)

# Other duration units
Duration(minutes=5)
Duration(hours=1)
Duration(milliseconds=500)

Score Target

Stop when a target score is reached:

TerminationConfig(
    best_score_limit="0hard/-10soft"
)

Step Limit

Stop after a number of steps:

TerminationConfig(
    step_count_limit=10000
)

Unimproved Time

Stop if no improvement for a duration:

TerminationConfig(
    unimproved_spent_limit=Duration(seconds=30)
)

Combining Conditions

Multiple conditions use OR logic:

TerminationConfig(
    spent_limit=Duration(minutes=5),
    best_score_limit="0hard/0soft",  # OR achieves perfect
    unimproved_spent_limit=Duration(seconds=60)  # OR stuck
)

Environment Mode

Controls validation and reproducibility:

from solverforge_legacy.solver.config import EnvironmentMode

SolverConfig(
    environment_mode=EnvironmentMode.REPRODUCIBLE,
    ...
)
ModeDescriptionUse Case
NON_REPRODUCIBLEFastest, no validationProduction
REPRODUCIBLEDeterministic resultsDefault
FAST_ASSERTQuick validationTesting
FULL_ASSERTComplete validationDebugging

Debugging Score Corruption

Use FULL_ASSERT to detect score calculation bugs:

SolverConfig(
    environment_mode=EnvironmentMode.FULL_ASSERT,
    ...
)

This validates every score calculation but is slow.

Reproducibility

For reproducible results, set a random seed:

SolverConfig(
    random_seed=42,
    environment_mode=EnvironmentMode.REPRODUCIBLE,
    ...
)

Configuration Overrides

Override configuration when building a solver:

from solverforge_legacy.solver.config import SolverConfigOverride

solver_factory = SolverFactory.create(solver_config)

# Override termination for this solver instance
override = SolverConfigOverride(
    termination_config=TerminationConfig(spent_limit=Duration(seconds=10))
)
solver = solver_factory.build_solver(override)

Complete Example

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
    EnvironmentMode,
)

from my_app.domain import Timetable, Lesson
from my_app.constraints import define_constraints


def create_solver():
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(minutes=5),
            best_score_limit="0hard/0soft",
        ),
        environment_mode=EnvironmentMode.REPRODUCIBLE,
        random_seed=42,
    )

    factory = SolverFactory.create(config)
    return factory.build_solver()

Configuration Best Practices

Development

SolverConfig(
    environment_mode=EnvironmentMode.FULL_ASSERT,
    termination_config=TerminationConfig(spent_limit=Duration(seconds=10)),
    ...
)

Testing

SolverConfig(
    environment_mode=EnvironmentMode.REPRODUCIBLE,
    random_seed=42,  # Reproducible tests
    termination_config=TerminationConfig(spent_limit=Duration(seconds=5)),
    ...
)

Production

SolverConfig(
    environment_mode=EnvironmentMode.NON_REPRODUCIBLE,
    termination_config=TerminationConfig(
        spent_limit=Duration(minutes=5),
        unimproved_spent_limit=Duration(minutes=1),
    ),
    ...
)

Next Steps

2 - Running the Solver

Execute the solver synchronously with Solver.solve().

The simplest way to solve a problem is with Solver.solve(), which blocks until termination.

Basic Usage

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)

# Configure
config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)

# Create factory and solver
factory = SolverFactory.create(config)
solver = factory.build_solver()

# Load problem
problem = load_problem()

# Solve (blocks until done)
solution = solver.solve(problem)

# Use solution
print(f"Score: {solution.score}")

Event Listeners

Monitor progress with event listeners:

from solverforge_legacy.solver import BestSolutionChangedEvent

def on_best_solution_changed(event: BestSolutionChangedEvent):
    print(f"New best score: {event.new_best_score}")
    print(f"Time spent: {event.time_spent}")

solver.add_event_listener(on_best_solution_changed)
solution = solver.solve(problem)

BestSolutionChangedEvent Properties

PropertyDescription
new_best_scoreThe new best score
new_best_solutionThe new best solution
time_spentDuration since solving started
is_new_best_solution_initializedTrue if all variables are assigned

Removing Listeners

solver.add_event_listener(listener)
# ... later ...
solver.remove_event_listener(listener)

Early Termination

Stop solving before the termination condition:

import threading

def timeout_termination(solver, timeout_seconds):
    """Terminate after timeout."""
    time.sleep(timeout_seconds)
    solver.terminate_early()

# Start termination thread
thread = threading.Thread(target=timeout_termination, args=(solver, 60))
thread.start()

solution = solver.solve(problem)

Manual Termination

# From another thread
solver.terminate_early()

# Check if termination was requested
if solver.is_terminate_early():
    print("Termination was requested")

Checking Solver State

# Is the solver currently running?
if solver.is_solving():
    print("Solver is running")

# Was early termination requested?
if solver.is_terminate_early():
    print("Termination requested")

Problem Changes (Real-Time)

Modify the problem while solving:

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange[Timetable]):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, working_solution: Timetable, score_director):
        # Add to working solution
        working_solution.lessons.append(self.lesson)
        # Notify score director
        score_director.after_entity_added(self.lesson)

# Add change while solving
new_lesson = Lesson("new", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))

See Real-Time Planning for more details.

Solver Reuse

Don’t reuse a solver instance—create a new one for each solve:

# Correct: New solver each time
solver1 = factory.build_solver()
solution1 = solver1.solve(problem1)

solver2 = factory.build_solver()
solution2 = solver2.solve(problem2)

# Incorrect: Reusing solver
solver = factory.build_solver()
solution1 = solver.solve(problem1)
solution2 = solver.solve(problem2)  # Don't do this!

Threading

Solver.solve() blocks the calling thread. For non-blocking operation, use:

  1. Background thread:

    thread = threading.Thread(target=lambda: solver.solve(problem))
    thread.start()
    
  2. SolverManager (recommended for production): See SolverManager

Error Handling

try:
    solution = solver.solve(problem)
except Exception as e:
    print(f"Solving failed: {e}")
    # Handle error (log, retry, etc.)

Complete Example

from solverforge_legacy.solver import SolverFactory, BestSolutionChangedEvent
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("solver")


def solve_timetable(problem: Timetable) -> Timetable:
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(minutes=5),
            unimproved_spent_limit=Duration(seconds=60),
        ),
    )

    factory = SolverFactory.create(config)
    solver = factory.build_solver()

    # Log progress
    def on_progress(event: BestSolutionChangedEvent):
        logger.info(f"Score: {event.new_best_score} at {event.time_spent}")

    solver.add_event_listener(on_progress)

    # Solve
    logger.info("Starting solver...")
    solution = solver.solve(problem)
    logger.info(f"Solving finished. Final score: {solution.score}")

    return solution


if __name__ == "__main__":
    problem = load_problem()
    solution = solve_timetable(problem)
    save_solution(solution)

Next Steps

3 - SolverManager

Manage concurrent and asynchronous solving jobs.

SolverManager handles concurrent solving jobs, making it ideal for web applications and services.

Creating a SolverManager

from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)

config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)

solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)

Basic Solving

solve()

Non-blocking solve that returns a future:

import uuid

job_id = str(uuid.uuid4())

# Start solving (non-blocking)
future = solver_manager.solve(job_id, problem)

# ... do other work ...

# Get result (blocks until done)
solution = future.get_final_best_solution()
print(f"Score: {solution.score}")

solve_and_listen()

Solve with progress callbacks:

def on_best_solution_changed(solution: Timetable):
    print(f"New best: {solution.score}")
    # Update UI, save to database, etc.

def on_exception(error):
    print(f"Solving failed: {error}")

solver_manager.solve_and_listen(
    job_id,
    problem_finder=lambda _: problem,
    best_solution_consumer=on_best_solution_changed,
    exception_handler=on_exception,
)

Managing Jobs

Check Job Status

status = solver_manager.get_solver_status(job_id)
# Returns: NOT_SOLVING, SOLVING_ACTIVE, SOLVING_ENDED

Get Current Best Solution

solution = solver_manager.get_best_solution(job_id)
if solution:
    print(f"Current best: {solution.score}")

Terminate Early

solver_manager.terminate_early(job_id)

FastAPI Integration

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid

solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    global solver_manager

    config = SolverConfig(...)
    factory = SolverFactory.create(config)
    solver_manager = SolverManager.create(factory)

    yield

    solver_manager.close()


app = FastAPI(lifespan=lifespan)


@app.post("/solve")
async def start_solving(problem: TimetableRequest) -> str:
    job_id = str(uuid.uuid4())

    def on_best_solution(solution: Timetable):
        solutions[job_id] = solution

    solver_manager.solve_and_listen(
        job_id,
        problem_finder=lambda _: problem.to_domain(),
        best_solution_consumer=on_best_solution,
    )

    return job_id


@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
    if job_id not in solutions:
        raise HTTPException(404, "Job not found")

    solution = solutions[job_id]
    status = solver_manager.get_solver_status(job_id)

    return {
        "status": status.name,
        "score": str(solution.score),
        "solution": TimetableResponse.from_domain(solution),
    }


@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
    solver_manager.terminate_early(job_id)
    return {"status": "terminating"}

Concurrent Jobs

SolverManager handles multiple jobs concurrently:

# Start multiple jobs
job1 = solver_manager.solve("job1", problem1)
job2 = solver_manager.solve("job2", problem2)
job3 = solver_manager.solve("job3", problem3)

# Each runs in its own thread
# Results available when ready
solution1 = job1.get_final_best_solution()

Resource Limits

By default, jobs run with no limit on concurrent execution. For resource management:

# Limit concurrent solvers
solver_manager = SolverManager.create(
    solver_factory,
    parallel_solver_count=4,  # Max 4 concurrent jobs
)

Problem Changes During Solving

Add changes to running jobs:

from solverforge_legacy.solver import ProblemChange

class AddEntity(ProblemChange[Timetable]):
    def __init__(self, entity):
        self.entity = entity

    def do_change(self, working_solution, score_director):
        working_solution.lessons.append(self.entity)
        score_director.after_entity_added(self.entity)

# Add change to running job
solver_manager.add_problem_change(job_id, AddEntity(new_lesson))

Cleanup

Always close the SolverManager when done:

# Using context manager
with SolverManager.create(factory) as manager:
    # ... use manager ...
# Automatically closed

# Manual cleanup
try:
    # ... use manager ...
finally:
    solver_manager.close()

Error Handling

def on_exception(job_id: str, exception: Exception):
    logger.error(f"Job {job_id} failed: {exception}")
    # Clean up, notify user, etc.

solver_manager.solve_and_listen(
    job_id,
    problem_finder=lambda _: problem,
    best_solution_consumer=on_solution,
    exception_handler=on_exception,
)

Best Practices

Do

  • Use solve_and_listen() for progress updates
  • Store solutions externally (database, cache)
  • Handle exceptions properly
  • Close SolverManager on shutdown

Don’t

  • Block the main thread waiting for results
  • Store solutions only in memory (lose on restart)
  • Forget to handle job cleanup

Next Steps

4 - SolutionManager

Analyze and explain solutions with SolutionManager.

SolutionManager provides utilities for analyzing solutions without running the solver.

Creating a SolutionManager

from solverforge_legacy.solver import SolverFactory, SolutionManager

solver_factory = SolverFactory.create(config)
solution_manager = SolutionManager.create(solver_factory)

Or from a SolverManager:

solver_manager = SolverManager.create(solver_factory)
solution_manager = SolutionManager.create(solver_manager)

Score Calculation

Calculate the score of a solution without solving:

# Update score in place
solution_manager.update(solution)
print(f"Score: {solution.score}")

This is useful for:

  • Validating manually created solutions
  • Comparing before/after changes
  • Testing constraint configurations

Score Analysis

Get a detailed breakdown of the score:

analysis = solution_manager.analyze(solution)

print(f"Overall score: {analysis.score}")

# Per-constraint breakdown
for constraint in analysis.constraint_analyses():
    print(f"\n{constraint.constraint_name}:")
    print(f"  Score: {constraint.score}")
    print(f"  Matches: {constraint.match_count}")

Constraint Matches

See exactly which entities triggered each constraint:

for constraint in analysis.constraint_analyses():
    print(f"\n{constraint.constraint_name}:")
    for match in constraint.matches():
        print(f"  - {match.justification}: {match.score}")

Indictments

Find which entities are responsible for score impacts:

for indictment in analysis.indictments():
    print(f"\nEntity: {indictment.indicted_object}")
    print(f"  Total impact: {indictment.score}")
    for match in indictment.matches():
        print(f"  - {match.constraint_name}: {match.score}")

Use Cases

Validate User Input

def validate_schedule(schedule: Schedule) -> list[str]:
    """Validate a manually created schedule."""
    solution_manager.update(schedule)

    if schedule.score.is_feasible:
        return []

    # Collect violations
    violations = []
    analysis = solution_manager.analyze(schedule)

    for constraint in analysis.constraint_analyses():
        if constraint.score.hard_score < 0:
            for match in constraint.matches():
                violations.append(str(match.justification))

    return violations

Compare Solutions

def compare_solutions(old: Schedule, new: Schedule) -> dict:
    """Compare two solutions."""
    old_analysis = solution_manager.analyze(old)
    new_analysis = solution_manager.analyze(new)

    return {
        "old_score": str(old_analysis.score),
        "new_score": str(new_analysis.score),
        "improved": new_analysis.score > old_analysis.score,
        "changes": get_constraint_changes(old_analysis, new_analysis),
    }


def get_constraint_changes(old, new):
    old_scores = {c.constraint_name: c.score for c in old.constraint_analyses()}
    changes = []

    for constraint in new.constraint_analyses():
        old_score = old_scores.get(constraint.constraint_name)
        if old_score != constraint.score:
            changes.append({
                "constraint": constraint.constraint_name,
                "old": str(old_score),
                "new": str(constraint.score),
            })

    return changes

Explain to Users

def explain_score(schedule: Schedule) -> dict:
    """Generate user-friendly score explanation."""
    analysis = solution_manager.analyze(schedule)

    hard_violations = []
    soft_penalties = []

    for constraint in analysis.constraint_analyses():
        if constraint.score.hard_score < 0:
            for match in constraint.matches():
                hard_violations.append({
                    "rule": constraint.constraint_name,
                    "details": str(match.justification),
                })
        elif constraint.score.soft_score < 0:
            soft_penalties.append({
                "rule": constraint.constraint_name,
                "impact": constraint.match_count,
            })

    return {
        "is_valid": schedule.score.is_feasible,
        "hard_violations": hard_violations,
        "soft_penalties": soft_penalties,
        "summary": generate_summary(analysis),
    }

API Endpoint

from fastapi import FastAPI

@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
    solution = solutions.get(job_id)
    if not solution:
        raise HTTPException(404)

    analysis = solution_manager.analyze(solution)

    return {
        "score": str(analysis.score),
        "is_feasible": analysis.score.is_feasible,
        "constraints": [
            {
                "name": c.constraint_name,
                "score": str(c.score),
                "matches": c.match_count,
            }
            for c in analysis.constraint_analyses()
        ],
    }

Debugging

Finding Score Corruption

def debug_score(solution):
    """Debug score calculation."""
    # Calculate fresh
    solution_manager.update(solution)
    fresh_score = solution.score

    # Analyze
    analysis = solution_manager.analyze(solution)
    analyzed_score = analysis.score

    if fresh_score != analyzed_score:
        print(f"Score mismatch: {fresh_score} vs {analyzed_score}")

    # Check each constraint
    total_hard = 0
    total_soft = 0
    for c in analysis.constraint_analyses():
        total_hard += c.score.hard_score
        total_soft += c.score.soft_score
        print(f"{c.constraint_name}: {c.score}")

    print(f"\nCalculated: {total_hard}hard/{total_soft}soft")
    print(f"Reported: {analyzed_score}")

Finding Unexpected Matches

def find_unexpected_matches(solution, constraint_name):
    """Debug why a constraint is matching."""
    analysis = solution_manager.analyze(solution)

    for c in analysis.constraint_analyses():
        if c.constraint_name == constraint_name:
            print(f"\n{constraint_name} matches ({c.match_count}):")
            for match in c.matches():
                print(f"  - {match.justification}")
            return

    print(f"Constraint '{constraint_name}' not found")

Performance Notes

  • update() is fast (incremental calculation)
  • analyze() is slower (collects all match details)
  • Cache analysis results if calling repeatedly
  • Don’t analyze every solution during solving

Next Steps

5 - Benchmarking

Compare solver configurations and tune performance.

Benchmarking helps you compare different solver configurations and find the best settings for your problem.

Why Benchmark

  • Compare algorithms: Find the best algorithm combination
  • Tune parameters: Optimize termination times, moves, etc.
  • Validate changes: Ensure improvements don’t regress
  • Understand scaling: See how performance changes with problem size

Basic Benchmarking

Create a simple benchmark by running the solver multiple times:

import time
from statistics import mean, stdev

def benchmark_config(config: SolverConfig, problems: list, runs: int = 3):
    """Benchmark a solver configuration."""
    results = []

    for problem in problems:
        problem_results = []
        for run in range(runs):
            factory = SolverFactory.create(config)
            solver = factory.build_solver()

            start = time.time()
            solution = solver.solve(problem)
            elapsed = time.time() - start

            problem_results.append({
                "score": solution.score,
                "time": elapsed,
                "feasible": solution.score.is_feasible,
            })

        results.append({
            "problem": problem.id,
            "avg_score": mean(r["score"].soft_score for r in problem_results),
            "avg_time": mean(r["time"] for r in problem_results),
            "feasibility_rate": sum(r["feasible"] for r in problem_results) / runs,
        })

    return results

Comparing Configurations

def compare_termination_times():
    """Compare different termination durations."""
    base_config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
    )

    durations = [10, 30, 60, 120, 300]  # seconds
    problems = load_benchmark_problems()

    results = {}
    for duration in durations:
        config = SolverConfig(
            **vars(base_config),
            termination_config=TerminationConfig(
                spent_limit=Duration(seconds=duration)
            ),
        )
        results[duration] = benchmark_config(config, problems)

    return results

Benchmark Report

Generate a readable report:

def generate_report(results: dict):
    """Generate benchmark report."""
    print("=" * 60)
    print("BENCHMARK REPORT")
    print("=" * 60)

    for config_name, config_results in results.items():
        print(f"\n{config_name}:")
        print("-" * 40)

        total_score = 0
        total_time = 0
        feasible_count = 0

        for r in config_results:
            print(f"  {r['problem']}: score={r['avg_score']:.1f}, "
                  f"time={r['avg_time']:.1f}s, "
                  f"feasible={r['feasibility_rate']*100:.0f}%")
            total_score += r["avg_score"]
            total_time += r["avg_time"]
            feasible_count += r["feasibility_rate"]

        n = len(config_results)
        print(f"\n  Average: score={total_score/n:.1f}, "
              f"time={total_time/n:.1f}s, "
              f"feasible={feasible_count/n*100:.0f}%")

    print("\n" + "=" * 60)

Problem Datasets

Create consistent benchmark datasets:

class BenchmarkDataset:
    """Collection of benchmark problems."""

    @staticmethod
    def small():
        """Small problems for quick testing."""
        return [
            generate_problem(lessons=20, rooms=3, timeslots=10),
            generate_problem(lessons=30, rooms=4, timeslots=10),
        ]

    @staticmethod
    def medium():
        """Medium problems for standard benchmarks."""
        return [
            generate_problem(lessons=100, rooms=10, timeslots=25),
            generate_problem(lessons=150, rooms=12, timeslots=25),
        ]

    @staticmethod
    def large():
        """Large problems for stress testing."""
        return [
            generate_problem(lessons=500, rooms=20, timeslots=50),
            generate_problem(lessons=1000, rooms=30, timeslots=50),
        ]

Reproducible Benchmarks

For consistent results:

def reproducible_benchmark(config: SolverConfig, problem, seed: int = 42):
    """Run benchmark with fixed seed."""
    config = SolverConfig(
        **vars(config),
        environment_mode=EnvironmentMode.REPRODUCIBLE,
        random_seed=seed,
    )

    factory = SolverFactory.create(config)
    solver = factory.build_solver()

    return solver.solve(problem)

Metrics to Track

Primary Metrics

MetricDescription
Best ScoreFinal solution quality
Time to BestWhen best score was found
Feasibility Rate% of runs finding feasible solution

Secondary Metrics

MetricDescription
Score Over TimeScore improvement curve
Steps per SecondAlgorithm throughput
Memory UsagePeak memory consumption

Score Over Time

Track how score improves:

def benchmark_with_history(config: SolverConfig, problem):
    """Benchmark with score history."""
    history = []

    def on_progress(event):
        history.append({
            "time": event.time_spent.total_seconds(),
            "score": event.new_best_score,
        })

    factory = SolverFactory.create(config)
    solver = factory.build_solver()
    solver.add_event_listener(on_progress)

    solution = solver.solve(problem)

    return {
        "final_score": solution.score,
        "history": history,
    }

Visualization

Plot results with matplotlib:

import matplotlib.pyplot as plt

def plot_score_over_time(results: dict):
    """Plot score improvement over time."""
    plt.figure(figsize=(10, 6))

    for config_name, result in results.items():
        times = [h["time"] for h in result["history"]]
        scores = [h["score"].soft_score for h in result["history"]]
        plt.plot(times, scores, label=config_name)

    plt.xlabel("Time (seconds)")
    plt.ylabel("Soft Score")
    plt.title("Score Improvement Over Time")
    plt.legend()
    plt.grid(True)
    plt.savefig("benchmark_results.png")

CI/CD Integration

Add benchmarks to your pipeline:

# test_benchmark.py
import pytest

def test_minimum_score():
    """Ensure solver achieves minimum score."""
    config = load_production_config()
    problem = BenchmarkDataset.small()[0]

    factory = SolverFactory.create(config)
    solver = factory.build_solver()
    solution = solver.solve(problem)

    assert solution.score.is_feasible, "Solution should be feasible"
    assert solution.score.soft_score >= -100, "Score should be >= -100"


def test_performance_regression():
    """Check for performance regression."""
    config = load_production_config()
    problem = BenchmarkDataset.medium()[0]

    start = time.time()
    factory = SolverFactory.create(config)
    solver = factory.build_solver()
    solution = solver.solve(problem)
    elapsed = time.time() - start

    assert solution.score.is_feasible
    assert elapsed < 120, "Should complete within 2 minutes"

Best Practices

Do

  • Use consistent problem datasets
  • Run multiple times (3-5) for statistical significance
  • Track both score and time
  • Use reproducible mode for comparisons

Don’t

  • Compare results from different machines
  • Use production data for benchmarks (privacy)
  • Optimize for benchmark problems only
  • Ignore feasibility rate

Next Steps