SolverForge (Legacy)
Technical documentation for SolverForge Legacy — the pure Python constraint solver using the Timefold backend.
Every organization faces planning problems: providing products or services with a limited set of constrained resources (employees, assets, time, and money). SolverForge’s Planning AI optimizes these problems to do more business with fewer resources using Constraint Satisfaction Programming.
SolverForge is a lightweight, embeddable constraint satisfaction engine which optimizes planning problems. Example use cases include:
- Vehicle Routing - Optimize delivery routes for fleets of vehicles
- Employee Scheduling - Assign shifts to employees based on skills and availability
- School Timetabling - Schedule lessons to timeslots and rooms
- Meeting Scheduling - Find optimal times and rooms for meetings
- Bin Packing - Efficiently pack items into containers
- Task Assignment - Assign tasks to resources optimally

Python-First Design
SolverForge provides a Pythonic API using:
- Decorators for domain modeling (
@planning_entity, @planning_solution) - Type annotations with
Annotated for constraint and property marking - Dataclasses for clean, readable domain models
- Fluent constraint stream API for intuitive constraint definition
from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import (
planning_entity, planning_solution,
PlanningId, PlanningVariable, PlanningEntityCollectionProperty,
ProblemFactCollectionProperty, ValueRangeProvider, PlanningScore
)
from solverforge_legacy.solver.score import HardSoftScore
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
teacher: str
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
@planning_solution
@dataclass
class Timetable:
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
Requirements
- Python 3.10+ (3.11 or 3.12 recommended)
- JDK 17+ (for the optimization engine backend)
Next Steps
1 - Concepts
Understand the fundamental concepts of planning optimization and constraint solving.
Before diving into SolverForge, it’s helpful to understand the core concepts behind planning optimization. This section covers:
1.1 - What is Planning Optimization?
Introduction to planning optimization and constraint satisfaction.
Planning
The need to create plans generally arises from a desire to achieve a goal:
- Build a house
- Correctly staff a hospital shift
- Complete work at all customer locations
- Deliver packages efficiently
Achieving those goals involves organizing the available resources. To correctly staff a hospital you need enough qualified personnel in a variety of fields and specializations to cover the opening hours of the hospital.
Any plan to deploy resources, whether to staff a hospital shift or to assemble the building materials for a new house, is done under constraints.
Constraints could be:
- Physical laws - People can’t work two shifts in two separate locations at the same time, and you can’t mount a roof on a house that doesn’t exist
- Regulations - Employees need a certain number of hours between shifts or are only allowed to work a maximum number of hours per week
- Preferences - Certain employees prefer to work specific shift patterns
Feasible Plans
Any plan needs to consider all three elements—goals, resources, and constraints—in balance to be a feasible plan. A plan that fails to account for all the elements of the problem is an infeasible plan.
For instance, if a hospital staff roster covers all shifts, but assigns employees back-to-back shifts with no breaks for sleep or life outside work, it is not a valid plan.
Why Planning Problems are Hard
Planning problems become harder to solve as the number of resources and constraints increase. Creating an employee shift schedule for a small team of four employees is fairly straightforward. However, if each employee performs a specific function within the business and those functions need to be performed in a specific order, changes that affect one employee quickly cascade and affect everybody on the team.
As more employees and different work specializations are added, things become much more complicated.
Example: For a trivial field service routing problem with 4 vehicles and 8 visits, the number of possibilities that a brute force algorithm considers is 19,958,418.
What would take a team of planners many hours to schedule can be automatically scheduled by SolverForge in a fraction of the time.
Operations Research
Operations Research (OR) is a field of research focused on finding optimal (or near optimal) solutions to problems with techniques that improve decision-making.
Constraint satisfaction programming is part of Operations Research that aims to satisfy all the constraints of a problem.
Planning AI
Planning AI is a type of artificial intelligence designed specifically to handle complex planning and scheduling tasks, and to satisfy the constraints of planning problems. Instead of just automating simple, repetitive tasks, it helps you make better decisions by sorting through countless possibilities to find the best solutions—saving you time, reducing costs, and improving efficiency.
Why Planning AI is Different
Traditional methods of planning often involve manually sifting through options or relying on basic tools that can’t keep up with the complexity of real-world problems. Planning AI, on the other hand, uses advanced strategies to quickly focus on the most promising solutions, even when the situation is extremely complicated.
Planning AI also makes it possible to understand the final solution with a breakdown of:
- Which constraints have been violated
- Scores for individual constraints
- An overall score
This makes Planning AI incredibly valuable in industries where getting the right plan is crucial—whether that’s scheduling workers, routing deliveries, or managing resources in a factory.
Constraints and Scoring
Constraints can be considered hard, medium, or soft.
Hard Constraints
Hard constraints represent rules and limitations of the real world that any planning solution has to respect. For instance, there are only 24 hours in a day and people can only be in one place at a time. Hard constraints also include rules that must be adhered to, such as employee contracts and the order in which dependent tasks are completed.
Breaking hard constraints results in infeasible plans.
Medium Constraints
Medium constraints help manage plans when resources are limited (for instance, when there aren’t enough technicians to complete all the customer visits or there aren’t enough employees to work all the available shifts). Medium constraints incentivize SolverForge to assign as many entities (visits or shifts) as possible.
Soft Constraints
Soft constraints help optimize plans based on the business goals, for instance:
- Minimize travel time between customer visits
- Assign employees to their preferred shifts
- Keep teachers in the same room for consecutive lessons
Understanding Scores
To help determine the quality of the solution, plans are assigned a score with values for hard, medium, and soft constraints.
0hard/-257medium/-6119520soft
From this example score we can see:
- Zero hard constraints were broken (feasible!)
- Medium and soft scores have negative values (room for optimization)
Note: The scores do not show how many constraints were broken, but weighted values associated with those constraints.
Score Comparison
Because breaking hard constraints would result in an infeasible solution, a solution that breaks zero hard constraints and has a soft constraint score of -1,000,000 is better than a solution that breaks one hard constraint and has a soft constraint score of 0.
The weight of constraints can be tweaked to adjust their impact on the solution.
1.2 - Problem Types
Common categories of planning and scheduling problems.
SolverForge can solve a wide variety of planning and scheduling problems. Here are some common categories:
Scheduling Problems
Assign activities to time slots and resources.
Employee Scheduling (Rostering)
Assign employees to shifts based on:
- Skills and qualifications
- Availability and preferences
- Labor regulations (max hours, rest periods)
- Fairness (balanced workload)
Examples: Hospital nurse scheduling, retail staff scheduling, call center scheduling
Assign lessons to timeslots and rooms:
- Teachers can only teach one class at a time
- Rooms have limited capacity
- Student groups shouldn’t have conflicts
- Preference for consecutive lessons
Examples: University course scheduling, school class scheduling
Meeting Scheduling
Find optimal times for meetings:
- Required attendees must be available
- Rooms must be available and large enough
- Minimize conflicts with other meetings
- Consider timezone differences
Job Shop Scheduling
Schedule jobs on machines:
- Operations must follow a specific order
- Machines can only do one job at a time
- Minimize total completion time (makespan)
Examples: Manufacturing scheduling, print shop scheduling
Routing Problems
Plan routes and sequences for vehicles or resources.
Vehicle Routing Problem (VRP)
Plan delivery or service routes:
- Vehicle capacity constraints
- Time windows for deliveries
- Minimize total travel distance/time
- Multiple depots possible
Variants:
- CVRP - Capacitated VRP
- VRPTW - VRP with Time Windows
- PDPTW - Pickup and Delivery with Time Windows
Examples: Delivery route planning, field service scheduling, waste collection
Traveling Salesman Problem (TSP)
Visit all locations exactly once with minimum travel:
- Single vehicle
- Return to starting point
- Minimize total distance
Examples: Sales territory planning, circuit board drilling
Assignment Problems
Assign entities to resources or positions.
Task Assignment
Assign tasks to workers or machines:
- Match skills/capabilities
- Balance workload
- Meet deadlines
- Minimize cost
Examples: Project team assignment, warehouse task allocation
Bin Packing
Pack items into containers:
- Items have sizes/weights
- Containers have capacity limits
- Minimize number of containers used
Examples: Truck loading, cloud server allocation, cutting stock
Resource Allocation
Allocate limited resources to competing demands:
- Budget allocation
- Equipment assignment
- Space allocation
Complex Planning Problems
Real-world problems often combine multiple problem types:
Field Service Scheduling
Combines:
- Routing - Travel between customer locations
- Scheduling - Time windows and appointment slots
- Assignment - Match technician skills to job requirements
Project Planning
Combines:
- Task scheduling - Activities with durations and dependencies
- Resource assignment - Assign people/equipment to tasks
- Constraint satisfaction - Deadlines, budgets, availability
Problem Characteristics
When modeling your problem, consider these characteristics:
| Characteristic | Description | Example |
|---|
| Hard constraints | Must be satisfied | Legal requirements |
| Soft constraints | Should be optimized | Customer preferences |
| Planning entities | What gets assigned | Lessons, visits, shifts |
| Planning variables | The assignments | Timeslot, room, vehicle |
| Problem facts | Fixed data | Employees, rooms, skills |
Choosing the Right Model
When modeling your problem:
- Identify entities - What things need to be assigned or scheduled?
- Identify variables - What values are you assigning?
- Identify constraints - What rules must be followed?
- Define the score - How do you measure solution quality?
The Quickstarts section provides complete examples for common problem types.
1.3 - Terminology
Glossary of terms used in SolverForge documentation.
Core Concepts
Planning Problem
The input to the solver: a set of planning entities with uninitialized planning variables, plus all problem facts and constraints.
Planning Solution
The container class that holds all problem data (entities and facts) and the resulting score. Decorated with @planning_solution.
Planning Entity
A class whose instances are modified during solving. Planning entities contain planning variables. Decorated with @planning_entity.
Planning Variable
A property of a planning entity that the solver changes during optimization. Annotated with PlanningVariable.
Problem Fact
Immutable data that defines the problem but is not changed by the solver (e.g., rooms, timeslots, employees). Annotated with ProblemFactCollectionProperty or ProblemFactProperty.
Value Range
The set of possible values for a planning variable. Provided via ValueRangeProvider.
Scoring
Score
A measure of solution quality. Higher scores are better. Common types: SimpleScore, HardSoftScore, HardMediumSoftScore.
Hard Constraint
A constraint that must be satisfied for a solution to be feasible. Broken hard constraints make a solution invalid.
Soft Constraint
A constraint that should be optimized but isn’t required. Used for preferences and optimization goals.
Medium Constraint
A constraint between hard and soft, typically used for “assign as many as possible” scenarios.
Feasible Solution
A solution with no broken hard constraints (hard score of 0 or positive).
Optimal Solution
A feasible solution with the best possible soft score. May be impractical to find for large problems.
Constraint Stream
The fluent API for defining constraints. Starts with ConstraintFactory.for_each().
Algorithms
Construction Heuristic
An algorithm that builds an initial solution quickly by assigning values to all planning variables.
Local Search
An algorithm that improves an existing solution by making incremental changes (moves).
Move
A change to the solution, such as swapping two assignments or changing a single variable.
Step
One iteration of the optimization algorithm, consisting of selecting and applying a move.
Termination
The condition that stops the solver (time limit, score target, no improvement, etc.).
Advanced Concepts
Shadow Variable
A planning variable whose value is calculated from other variables, not directly assigned by the solver. Used for derived values like arrival times.
Inverse Shadow Variable
A shadow variable that maintains a reverse reference (e.g., a visit knowing which vehicle it belongs to).
Previous/Next Element Shadow Variable
Shadow variables that track the previous or next element in a list variable.
Cascading Update Shadow Variable
A shadow variable that triggers recalculation when upstream variables change.
List Variable
A planning variable that holds an ordered list of values (used for routing problems). Annotated with PlanningListVariable.
Pinning
Locking certain assignments so the solver cannot change them. Useful for preserving manual decisions or already-executed plans.
Problem Change
A modification to the problem while the solver is running (real-time planning).
Solver Components
Solver
The main component that performs optimization. Created via SolverFactory.
SolverFactory
Factory for creating Solver instances from configuration.
SolverConfig
Configuration object specifying solution class, entities, constraints, and termination.
SolverManager
Manages multiple concurrent solving jobs. Useful for web applications.
SolutionManager
Analyzes solutions: explains scores, identifies constraint violations.
ScoreDirector
Internal component that calculates scores efficiently. Used in problem changes.
Constraint Provider
A function decorated with @constraint_provider that returns a list of constraints.
Constraint Stream Operations
for_each / forEach
Start a constraint stream by iterating over all instances of a class.
for_each_unique_pair
Iterate over all unique pairs of instances (A,B where A != B, without duplicates like (B,A)).
filter
Remove items that don’t match a predicate.
join
Combine two streams by matching on joiners.
Joiner
A condition for matching items in joins (e.g., Joiners.equal(), Joiners.overlapping()).
group_by
Aggregate items by key with collectors.
Collector
Aggregation function (count, sum, min, max, toList, etc.).
penalize / reward
Apply score impact for matching items.
as_constraint
Finalize the constraint with a name.
Score Analysis
Score Explanation
Breakdown of which constraints contributed to the score.
Constraint Match
A single instance of a constraint being triggered.
Indictment
List of constraint violations associated with a specific entity.
Justification
Explanation of why a constraint was triggered.
2 - Getting Started
Install SolverForge and solve your first planning problem.
Get up and running with SolverForge in minutes.
Quick Start
- Installation - Set up Python, JDK, and install SolverForge
- Hello World - Build a simple school timetabling solver (CLI)
- Hello World with FastAPI - Add a REST API to your solver
What You’ll Learn
In the Hello World tutorial, you’ll build a school timetabling application that:
- Assigns lessons to timeslots and rooms
- Avoids scheduling conflicts (same teacher, room, or student group at the same time)
- Optimizes for teacher preferences (room stability, consecutive lessons)
This introduces the core concepts you’ll use in any SolverForge application:
- Planning entities - The things being scheduled (lessons)
- Planning variables - The values being assigned (timeslot, room)
- Constraints - The rules that define a valid solution
- Solver configuration - How to run the optimization
2.1 - Installation
Set up Python, JDK, and install SolverForge.
Prerequisites
SolverForge requires:
- Python 3.10 or higher (3.11 or 3.12 recommended)
- JDK 17 or higher (for the optimization engine backend)
Check Python Version
python --version
# Python 3.11.0 or higher
If you need to install Python, visit python.org or use your system’s package manager.
Check JDK Version
java -version
# openjdk version "17.0.x" or higher
If you need to install a JDK:
- macOS:
brew install openjdk@17 - Ubuntu/Debian:
sudo apt install openjdk-17-jdk - Fedora:
sudo dnf install java-17-openjdk - Windows: Download from Adoptium or Oracle
Make sure JAVA_HOME is set:
echo $JAVA_HOME
# Should output path to JDK installation
Install SolverForge
Using pip (Recommended)
pip install solverforge-legacy
In a Virtual Environment
# Create virtual environment
python -m venv .venv
# Activate it
source .venv/bin/activate # Linux/macOS
# or
.venv\Scripts\activate # Windows
# Install SolverForge
pip install solverforge-legacy
Verify Installation
python -c "from solverforge_legacy.solver import SolverFactory; print('SolverForge installed successfully!')"
Project Setup
For a new project, create a pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "my-solver-project"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
"solverforge-legacy == 1.24.1",
"pytest == 8.2.2", # For testing
]
Then install your project in development mode:
IDE Setup
VS Code
Install the Python extension and configure your interpreter to use the virtual environment.
PyCharm
- Open your project
- Go to Settings > Project > Python Interpreter
- Select the virtual environment interpreter
Troubleshooting
JVM Not Found
If you see errors about JVM not found:
- Verify Java is installed:
java -version - Set
JAVA_HOME environment variable - Ensure
JAVA_HOME/bin is in your PATH
Import Errors
If imports fail:
- Verify you’re in the correct virtual environment
- Re-install:
pip install --force-reinstall solverforge-legacy
Memory Issues
For large problems, you may need to increase JVM memory. This is configured automatically, but you can adjust if needed.
Next Steps
Now that SolverForge is installed, follow the Hello World Tutorial to build your first solver.
2.2 - Hello World
Build a school timetabling solver from scratch.
In this tutorial, you’ll build a school timetabling application that assigns lessons to timeslots and rooms while avoiding conflicts.
The Problem
A school needs to schedule lessons:
- Each lesson has a subject, teacher, and student group
- Available timeslots (e.g., Monday 08:30, Monday 09:30, …)
- Available rooms (Room A, Room B, Room C)
Constraints:
- Hard: No room, teacher, or student group conflicts
- Soft: Teachers prefer the same room, consecutive lessons
Project Structure
Create the following files:
hello_world/
├── domain.py # Data model
├── constraints.py # Constraint definitions
├── main.py # Entry point
└── pyproject.toml # Dependencies
Step 1: Define the Domain Model
Create domain.py with the problem facts and planning entities:
from dataclasses import dataclass, field
from datetime import time
from typing import Annotated
from solverforge_legacy.solver.domain import (
planning_entity,
planning_solution,
PlanningId,
PlanningVariable,
PlanningEntityCollectionProperty,
ProblemFactCollectionProperty,
ValueRangeProvider,
PlanningScore,
)
from solverforge_legacy.solver.score import HardSoftScore
# Problem facts (immutable data)
@dataclass
class Timeslot:
day_of_week: str
start_time: time
end_time: time
def __str__(self):
return f"{self.day_of_week} {self.start_time.strftime('%H:%M')}"
@dataclass
class Room:
name: str
def __str__(self):
return self.name
# Planning entity (modified by the solver)
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
teacher: str
student_group: str
# Planning variables - assigned by the solver
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
# Planning solution (container for all data)
@planning_solution
@dataclass
class Timetable:
id: str
# Problem facts with value range providers
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
# Planning entities
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
# Score calculated by constraints
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
Key Concepts
@planning_entity marks Lesson as something the solver will modifyPlanningVariable marks timeslot and room as values to assign@planning_solution marks Timetable as the containerValueRangeProvider tells the solver which values are available
Step 2: Define Constraints
Create constraints.py with the scoring rules:
from solverforge_legacy.solver.score import (
constraint_provider,
ConstraintFactory,
Constraint,
Joiners,
HardSoftScore,
)
from .domain import Lesson
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
return [
# Hard constraints
room_conflict(constraint_factory),
teacher_conflict(constraint_factory),
student_group_conflict(constraint_factory),
# Soft constraints
teacher_room_stability(constraint_factory),
]
def room_conflict(constraint_factory: ConstraintFactory) -> Constraint:
"""A room can accommodate at most one lesson at the same time."""
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.room),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Room conflict")
)
def teacher_conflict(constraint_factory: ConstraintFactory) -> Constraint:
"""A teacher can teach at most one lesson at the same time."""
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.teacher),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Teacher conflict")
)
def student_group_conflict(constraint_factory: ConstraintFactory) -> Constraint:
"""A student group can attend at most one lesson at the same time."""
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.student_group),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Student group conflict")
)
def teacher_room_stability(constraint_factory: ConstraintFactory) -> Constraint:
"""A teacher prefers to teach in a single room."""
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.teacher),
)
.filter(lambda lesson1, lesson2: lesson1.room != lesson2.room)
.penalize(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher room stability")
)
Constraint Stream Pattern
Each constraint follows this pattern:
- Select entities with
for_each() or for_each_unique_pair() - Filter to matching cases with
Joiners or .filter() - Penalize (or reward) with a score impact
- Name the constraint with
as_constraint()
Create main.py:
from datetime import time
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
from .domain import Timetable, Timeslot, Room, Lesson
from .constraints import define_constraints
def main():
# Configure the solver
solver_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=5)
),
)
# Create the solver
solver_factory = SolverFactory.create(solver_config)
solver = solver_factory.build_solver()
# Generate a problem
problem = generate_demo_data()
# Solve it!
solution = solver.solve(problem)
# Print the result
print_timetable(solution)
def generate_demo_data() -> Timetable:
"""Create a small demo problem."""
timeslots = [
Timeslot("MONDAY", time(8, 30), time(9, 30)),
Timeslot("MONDAY", time(9, 30), time(10, 30)),
Timeslot("MONDAY", time(10, 30), time(11, 30)),
Timeslot("TUESDAY", time(8, 30), time(9, 30)),
Timeslot("TUESDAY", time(9, 30), time(10, 30)),
]
rooms = [
Room("Room A"),
Room("Room B"),
Room("Room C"),
]
lessons = [
Lesson("1", "Math", "A. Turing", "9th grade"),
Lesson("2", "Physics", "M. Curie", "9th grade"),
Lesson("3", "Chemistry", "M. Curie", "9th grade"),
Lesson("4", "Biology", "C. Darwin", "9th grade"),
Lesson("5", "History", "I. Jones", "9th grade"),
Lesson("6", "Math", "A. Turing", "10th grade"),
Lesson("7", "Physics", "M. Curie", "10th grade"),
Lesson("8", "Geography", "C. Darwin", "10th grade"),
]
return Timetable("demo", timeslots, rooms, lessons)
def print_timetable(timetable: Timetable) -> None:
"""Print the solution in a readable format."""
print(f"\nScore: {timetable.score}\n")
for lesson in timetable.lessons:
print(f"{lesson.subject} ({lesson.teacher}, {lesson.student_group})")
print(f" -> {lesson.timeslot} in {lesson.room}")
print()
if __name__ == "__main__":
main()
Step 4: Run It
python -m hello_world.main
You should see output like:
Score: 0hard/-3soft
Math (A. Turing, 9th grade)
-> MONDAY 08:30 in Room A
Physics (M. Curie, 9th grade)
-> MONDAY 09:30 in Room B
Chemistry (M. Curie, 9th grade)
-> TUESDAY 08:30 in Room B
...
A score of 0hard means all hard constraints are satisfied (no conflicts). The negative soft score indicates room for optimization of preferences.
Understanding the Output
- 0hard = No conflicts (feasible solution!)
- -3soft = 3 soft constraint violations (teachers using multiple rooms)
The solver found a valid timetable where:
- No room has two lessons at the same time
- No teacher teaches two lessons at the same time
- No student group has two lessons at the same time
Next Steps
2.3 - Hello World with FastAPI
Add a REST API to your school timetabling solver.
This tutorial extends the Hello World example by adding a REST API using FastAPI. This is closer to how you’d deploy a solver in production.
Prerequisites
- Completed the Hello World tutorial
- FastAPI and Uvicorn installed:
pip install fastapi uvicorn
Project Structure
Extend your project:
hello_world/
├── domain.py # Same as before
├── constraints.py # Same as before
├── main.py # CLI version (optional)
├── rest_api.py # NEW: FastAPI application
└── pyproject.toml # Add fastapi, uvicorn
Step 1: Update Dependencies
Add FastAPI to your pyproject.toml:
[project]
dependencies = [
"solverforge-legacy == 1.24.1",
"fastapi >= 0.100.0",
"uvicorn >= 0.23.0",
"pytest == 8.2.2",
]
Step 2: Create the REST API
Create rest_api.py:
import uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import time
from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
from .domain import Timetable, Timeslot, Room, Lesson
from .constraints import define_constraints
# Pydantic models for API validation
class TimeslotDTO(BaseModel):
day_of_week: str
start_time: str
end_time: str
def to_domain(self) -> Timeslot:
return Timeslot(
self.day_of_week,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
class RoomDTO(BaseModel):
name: str
def to_domain(self) -> Room:
return Room(self.name)
class LessonDTO(BaseModel):
id: str
subject: str
teacher: str
student_group: str
timeslot: TimeslotDTO | None = None
room: RoomDTO | None = None
def to_domain(self, timeslots: list[Timeslot], rooms: list[Room]) -> Lesson:
ts = None
if self.timeslot:
ts = next(
(t for t in timeslots if t.day_of_week == self.timeslot.day_of_week
and t.start_time.isoformat() == self.timeslot.start_time),
None
)
rm = None
if self.room:
rm = next((r for r in rooms if r.name == self.room.name), None)
return Lesson(self.id, self.subject, self.teacher, self.student_group, ts, rm)
class TimetableDTO(BaseModel):
id: str
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
score: str | None = None
def to_domain(self) -> Timetable:
timeslots = [ts.to_domain() for ts in self.timeslots]
rooms = [r.to_domain() for r in self.rooms]
lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
return Timetable(self.id, timeslots, rooms, lessons)
@classmethod
def from_domain(cls, timetable: Timetable) -> "TimetableDTO":
return cls(
id=timetable.id,
timeslots=[
TimeslotDTO(
day_of_week=ts.day_of_week,
start_time=ts.start_time.isoformat(),
end_time=ts.end_time.isoformat(),
)
for ts in timetable.timeslots
],
rooms=[RoomDTO(name=r.name) for r in timetable.rooms],
lessons=[
LessonDTO(
id=l.id,
subject=l.subject,
teacher=l.teacher,
student_group=l.student_group,
timeslot=TimeslotDTO(
day_of_week=l.timeslot.day_of_week,
start_time=l.timeslot.start_time.isoformat(),
end_time=l.timeslot.end_time.isoformat(),
) if l.timeslot else None,
room=RoomDTO(name=l.room.name) if l.room else None,
)
for l in timetable.lessons
],
score=str(timetable.score) if timetable.score else None,
)
# Global solver manager
solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize solver manager on startup."""
global solver_manager
solver_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
),
)
solver_factory = SolverFactory.create(solver_config)
solver_manager = SolverManager.create(solver_factory)
yield
# Cleanup on shutdown
if solver_manager:
solver_manager.close()
app = FastAPI(
title="School Timetabling API",
description="Optimize school timetables using SolverForge",
lifespan=lifespan,
)
@app.post("/timetables", response_model=str)
async def submit_problem(timetable_dto: TimetableDTO) -> str:
"""Submit a timetabling problem for solving."""
job_id = str(uuid.uuid4())
problem = timetable_dto.to_domain()
def on_best_solution(solution: Timetable):
solutions[job_id] = solution
solver_manager.solve_and_listen(
job_id,
lambda _: problem,
on_best_solution,
)
return job_id
@app.get("/timetables/{job_id}", response_model=TimetableDTO)
async def get_solution(job_id: str) -> TimetableDTO:
"""Get the current best solution for a job."""
if job_id not in solutions:
raise HTTPException(status_code=404, detail="Job not found")
return TimetableDTO.from_domain(solutions[job_id])
@app.delete("/timetables/{job_id}")
async def stop_solving(job_id: str):
"""Stop solving a problem early."""
solver_manager.terminate_early(job_id)
return {"status": "terminated"}
@app.get("/demo-data", response_model=TimetableDTO)
async def get_demo_data() -> TimetableDTO:
"""Get demo problem data for testing."""
timeslots = [
Timeslot("MONDAY", time(8, 30), time(9, 30)),
Timeslot("MONDAY", time(9, 30), time(10, 30)),
Timeslot("TUESDAY", time(8, 30), time(9, 30)),
Timeslot("TUESDAY", time(9, 30), time(10, 30)),
]
rooms = [Room("Room A"), Room("Room B")]
lessons = [
Lesson("1", "Math", "A. Turing", "9th grade"),
Lesson("2", "Physics", "M. Curie", "9th grade"),
Lesson("3", "History", "I. Jones", "9th grade"),
Lesson("4", "Math", "A. Turing", "10th grade"),
]
return TimetableDTO.from_domain(Timetable("demo", timeslots, rooms, lessons))
Step 3: Run the API
uvicorn hello_world.rest_api:app --reload
The API is now running at http://localhost:8000.
Step 4: Test the API
Get Demo Data
curl http://localhost:8000/demo-data
Submit a Problem
# Get demo data and submit it for solving
curl http://localhost:8000/demo-data | curl -X POST \
-H "Content-Type: application/json" \
-d @- \
http://localhost:8000/timetables
This returns a job ID like "a1b2c3d4-...".
Check the Solution
curl http://localhost:8000/timetables/{job_id}
Stop Solving Early
curl -X DELETE http://localhost:8000/timetables/{job_id}
API Documentation
FastAPI automatically generates interactive API docs:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
Architecture Notes
SolverManager
SolverManager handles concurrent solving jobs:
- Each job runs in its own thread
- Multiple problems can be solved simultaneously
- Solutions are updated as the solver improves them
Pydantic Models
We use separate Pydantic DTOs for:
- API request/response validation
- JSON serialization
- Decoupling API schema from domain model
Production Considerations
For production deployments:
- Persistence: Store solutions in a database
- Scaling: Use a message queue for distributed solving
- Monitoring: Add logging and metrics
- Security: Add authentication and rate limiting
Next Steps
3 - Domain Modeling
Model your planning problem with entities, variables, and solutions.
Domain modeling is the foundation of any SolverForge application. You define your problem structure using Python dataclasses and type annotations.
Core Concepts
Model Structure
A typical SolverForge model consists of:
Planning Solution
├── Problem Facts (immutable data)
│ ├── Timeslots, Rooms, Employees, etc.
│ └── Value Range Providers
├── Planning Entities (mutable)
│ └── Planning Variables (assigned by solver)
└── Score (calculated from constraints)
Example
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
teacher: str
# Planning variables - assigned by the solver
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
@planning_solution
@dataclass
class Timetable:
# Problem facts - immutable
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
# Planning entities - contain variables to optimize
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
# Score - calculated by constraints
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
3.1 - Planning Entities
Define planning entities that the solver will optimize.
A planning entity is a class whose instances the solver can change during optimization. Planning entities contain planning variables that get assigned values.
The @planning_entity Decorator
Mark a class as a planning entity with @planning_entity:
from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import planning_entity, PlanningId, PlanningVariable
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
teacher: str
student_group: str
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
Planning ID
Every planning entity must have a unique identifier marked with PlanningId:
id: Annotated[str, PlanningId]
The ID is used for:
- Tracking entities during solving
- Cloning solutions
- Score explanation
The ID type can be str, int, or any hashable type.
Genuine vs Shadow Entities
There are two types of planning entities:
Genuine Entities
A genuine planning entity has at least one genuine planning variable that the solver directly assigns:
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
# Genuine variable - solver assigns this
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
Shadow-Only Entities
A shadow-only entity has only shadow variables (calculated from other entities):
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
location: Location
# Shadow variable - calculated from vehicle's visit list
vehicle: Annotated[Vehicle | None, InverseRelationShadowVariable(...)] = field(default=None)
Entity Properties
Immutable Properties
Properties without PlanningVariable annotations are immutable during solving:
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str # Immutable
teacher: str # Immutable
student_group: str # Immutable
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None) # Mutable
The solver never changes subject, teacher, or student_group.
Default Values
Planning variables should have default values (typically None) for uninitialized state:
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
Multiple Planning Variables
An entity can have multiple planning variables:
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
teacher: str
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
Each variable is assigned independently by the solver.
Entity Collections in Solution
Planning entities are collected in the planning solution:
@planning_solution
@dataclass
class Timetable:
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
The solver iterates over this collection to find entities to optimize.
Nullable Variables
By default, planning variables must be assigned. For nullable variables (when some entities might be unassigned), see Planning Variables.
Best Practices
Do
- Use
@dataclass for clean, simple entity definitions - Give each entity a unique, stable ID
- Initialize planning variables to
None - Keep entities focused on a single concept
Don’t
- Put business logic in entities (use constraints instead)
- Make planning variables required in
__init__ - Use mutable default arguments (use
field(default_factory=...) instead)
Example: Shift Assignment
@planning_entity
@dataclass
class Shift:
id: Annotated[str, PlanningId]
start_time: datetime
end_time: datetime
required_skill: str
# Assigned by solver
employee: Annotated[Employee | None, PlanningVariable] = field(default=None)
Example: Vehicle Routing
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
capacity: int
home_location: Location
# List of visits assigned to this vehicle
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
Next Steps
3.2 - Planning Variables
Define what the solver assigns: simple variables and list variables.
A planning variable is a property of a planning entity that the solver assigns values to during optimization.
Simple Planning Variable
The most common type assigns a single value from a value range:
from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import planning_entity, PlanningId, PlanningVariable
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
# Simple planning variable
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
How It Works
- The solver sees
timeslot needs a value - It looks for a
ValueRangeProvider for Timeslot in the solution - It tries different values and evaluates the score
- It assigns the best value found within the time limit
Planning List Variable
For routing problems where order matters, use PlanningListVariable:
from solverforge_legacy.solver.domain import PlanningListVariable
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
capacity: int
home_location: Location
# List variable - ordered sequence of visits
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
How It Works
The solver:
- Assigns visits to vehicles
- Determines the order of visits within each vehicle’s route
- Uses moves like insert, swap, and 2-opt for optimization
When to Use List Variables
Use PlanningListVariable when:
- Order matters (routing, sequencing)
- Entities belong to groups (visits per vehicle, tasks per worker)
- Chain relationships exist (predecessor/successor patterns)
Nullable Variables
By default, all planning variables must be assigned. For optional assignments:
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
location: Location
# This visit might not be assigned to any vehicle
vehicle: Annotated[Vehicle | None, PlanningVariable(allows_unassigned=True)] = field(default=None)
Note: When using nullable variables, add medium constraints to penalize unassigned entities.
Value Range Providers
Planning variables need a source of possible values. This is configured in the planning solution:
@planning_solution
@dataclass
class Timetable:
# This list provides values for 'timeslot' variables
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
# This list provides values for 'room' variables
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
The solver matches variables to value ranges by type:
timeslot: Annotated[Timeslot | None, PlanningVariable] uses list[Timeslot]room: Annotated[Room | None, PlanningVariable] uses list[Room]
Variable Configuration Options
Strength Comparator
For construction heuristics, you can specify how to order values:
# Stronger values tried first during construction
timeslot: Annotated[
Timeslot | None,
PlanningVariable(value_range_provider_refs=["timeslots"])
] = field(default=None)
Multiple Variables on One Entity
Entities can have multiple independent variables:
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
# Two independent variables
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
Each variable is optimized independently—assigning timeslot doesn’t affect room.
Chained Variables (Alternative to List)
For simpler routing without list variables, you can use chained planning variables. However, PlanningListVariable is generally easier and more efficient.
Variable Listener Pattern
When one variable affects another, use shadow variables:
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
location: Location
# Calculated from vehicle's visit list
vehicle: Annotated[Vehicle | None, InverseRelationShadowVariable(source_variable_name="visits")] = field(default=None)
# Calculated from previous visit
arrival_time: Annotated[datetime | None, CascadingUpdateShadowVariable(target_method_name="update_arrival_time")] = field(default=None)
See Shadow Variables for details.
Best Practices
Do
- Initialize variables to
None or empty list - Use type hints with
| None for nullable types - Match value range types exactly
Don’t
- Mix list variables with simple variables for the same concept
- Use complex types as planning variables (use references instead)
- Forget to provide a value range
Common Patterns
Scheduling
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
Assignment
employee: Annotated[Employee | None, PlanningVariable] = field(default=None)
Routing
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
Next Steps
3.3 - Planning Solutions
Define the container for problem data and solution score.
A planning solution is the container class that holds all problem data, planning entities, and the solution score.
The @planning_solution Decorator
from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import (
planning_solution,
ProblemFactCollectionProperty,
ProblemFactProperty,
PlanningEntityCollectionProperty,
ValueRangeProvider,
PlanningScore,
)
from solverforge_legacy.solver.score import HardSoftScore
@planning_solution
@dataclass
class Timetable:
id: str
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
Solution Components
A planning solution contains:
- Problem Facts - Immutable input data
- Planning Entities - Mutable entities with planning variables
- Score - Quality measure of the solution
Problem Facts
Problem facts are immutable data that define the problem:
Collection Property
For lists of facts:
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty]
rooms: Annotated[list[Room], ProblemFactCollectionProperty]
employees: Annotated[list[Employee], ProblemFactCollectionProperty]
Single Property
For single facts:
config: Annotated[ScheduleConfig, ProblemFactProperty]
start_date: Annotated[date, ProblemFactProperty]
Value Range Providers
Value ranges provide possible values for planning variables. Combine with problem fact annotations:
# This list provides values for Timeslot planning variables
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
# This list provides values for Room planning variables
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
The solver automatically matches variables to value ranges by type:
PlanningVariable of type Timeslot uses list[Timeslot]PlanningVariable of type Room uses list[Room]
Multiple Ranges for Same Type
If you have multiple value ranges of the same type, use explicit references:
@planning_solution
@dataclass
class Schedule:
preferred_timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
backup_timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
Planning Entities
Collect planning entities in the solution:
lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
For a single entity:
main_vehicle: Annotated[Vehicle, PlanningEntityProperty]
Score
Every solution needs a score field:
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
Common score types:
SimpleScore - Single levelHardSoftScore - Feasibility + optimizationHardMediumSoftScore - Three levelsBendableScore - Custom levels
The solver calculates and updates this field automatically.
Solution Identity
Include an identifier for tracking:
@planning_solution
@dataclass
class Timetable:
id: str # For tracking in SolverManager
...
Complete Example
from dataclasses import dataclass, field
from typing import Annotated
from datetime import date
from solverforge_legacy.solver.domain import (
planning_solution,
ProblemFactCollectionProperty,
ProblemFactProperty,
PlanningEntityCollectionProperty,
ValueRangeProvider,
PlanningScore,
)
from solverforge_legacy.solver.score import HardSoftScore
@planning_solution
@dataclass
class EmployeeSchedule:
# Identity
id: str
# Problem facts
schedule_start: Annotated[date, ProblemFactProperty]
employees: Annotated[list[Employee], ProblemFactCollectionProperty, ValueRangeProvider]
skills: Annotated[list[Skill], ProblemFactCollectionProperty]
# Planning entities
shifts: Annotated[list[Shift], PlanningEntityCollectionProperty]
# Score
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
Creating Problem Instances
Load or generate problem data:
def load_problem() -> Timetable:
timeslots = [
Timeslot("MONDAY", time(8, 30), time(9, 30)),
Timeslot("MONDAY", time(9, 30), time(10, 30)),
# ...
]
rooms = [
Room("Room A"),
Room("Room B"),
]
lessons = [
Lesson("1", "Math", "A. Turing", "9th grade"),
Lesson("2", "Physics", "M. Curie", "9th grade"),
# ...
]
return Timetable(
id="problem-001",
timeslots=timeslots,
rooms=rooms,
lessons=lessons,
score=None, # Solver will calculate this
)
Accessing the Solved Solution
After solving, the solution contains assigned variables and score:
solution = solver.solve(problem)
print(f"Score: {solution.score}")
print(f"Is feasible: {solution.score.is_feasible}")
for lesson in solution.lessons:
print(f"{lesson.subject}: {lesson.timeslot} in {lesson.room}")
Solution Cloning
The solver internally clones solutions to track the best solution. This happens automatically with @dataclass entities.
For custom classes, ensure proper cloning behavior or use @deep_planning_clone:
from solverforge_legacy.solver.domain import deep_planning_clone
@deep_planning_clone
class CustomClass:
# This class will be deeply cloned during solving
pass
Best Practices
Do
- Use
@dataclass for solutions - Initialize score to
None - Include all data needed for constraint evaluation
- Use descriptive field names
Don’t
- Include data not used in constraints (performance impact)
- Modify problem facts during solving
- Forget value range providers for planning variables
Next Steps
3.4 - Shadow Variables
Define calculated variables that update automatically.
A shadow variable is a planning variable whose value is calculated from other variables, not directly assigned by the solver. Shadow variables update automatically when their source variables change.
When to Use Shadow Variables
Use shadow variables for:
- Derived values - Arrival times calculated from routes
- Inverse relationships - A visit knowing which vehicle it belongs to
- Cascading calculations - End times derived from start times and durations
Shadow Variable Types
Inverse Relation Shadow Variable
Maintains a reverse reference when using list variables:
from solverforge_legacy.solver.domain import InverseRelationShadowVariable
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
location: Location
# Automatically set to the vehicle that contains this visit
vehicle: Annotated[
Vehicle | None,
InverseRelationShadowVariable(source_variable_name="visits")
] = field(default=None)
When a visit is added to vehicle.visits, visit.vehicle is automatically set.
Previous Element Shadow Variable
Tracks the previous element in a list variable:
from solverforge_legacy.solver.domain import PreviousElementShadowVariable
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
# The visit that comes before this one in the route
previous_visit: Annotated[
Visit | None,
PreviousElementShadowVariable(source_variable_name="visits")
] = field(default=None)
Next Element Shadow Variable
Tracks the next element in a list variable:
from solverforge_legacy.solver.domain import NextElementShadowVariable
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
# The visit that comes after this one in the route
next_visit: Annotated[
Visit | None,
NextElementShadowVariable(source_variable_name="visits")
] = field(default=None)
Index Shadow Variable
Tracks the position in a list variable:
from solverforge_legacy.solver.domain import IndexShadowVariable
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
# Position in the vehicle's visit list (0-based)
index: Annotated[
int | None,
IndexShadowVariable(source_variable_name="visits")
] = field(default=None)
Cascading Update Shadow Variable
For custom calculations that depend on other variables:
from solverforge_legacy.solver.domain import CascadingUpdateShadowVariable
from datetime import datetime, timedelta
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
location: Location
service_duration: timedelta
vehicle: Annotated[
Vehicle | None,
InverseRelationShadowVariable(source_variable_name="visits")
] = field(default=None)
previous_visit: Annotated[
Visit | None,
PreviousElementShadowVariable(source_variable_name="visits")
] = field(default=None)
# Calculated arrival time
arrival_time: Annotated[
datetime | None,
CascadingUpdateShadowVariable(target_method_name="update_arrival_time")
] = field(default=None)
def update_arrival_time(self):
"""Called automatically when previous_visit or vehicle changes."""
if self.vehicle is None:
self.arrival_time = None
elif self.previous_visit is None:
# First visit: departure from depot
travel_time = self.vehicle.depot.driving_time_to(self.location)
self.arrival_time = self.vehicle.departure_time + travel_time
else:
# Subsequent visit: after previous visit's departure
travel_time = self.previous_visit.location.driving_time_to(self.location)
self.arrival_time = self.previous_visit.departure_time + travel_time
@property
def departure_time(self) -> datetime | None:
"""Time when service at this visit completes."""
if self.arrival_time is None:
return None
return self.arrival_time + self.service_duration
Piggyback Shadow Variable
For variables that should be updated at the same time as another shadow variable:
from solverforge_legacy.solver.domain import PiggybackShadowVariable
@planning_entity
@dataclass
class Visit:
arrival_time: Annotated[
datetime | None,
CascadingUpdateShadowVariable(target_method_name="update_times")
] = field(default=None)
# Updated by the same method as arrival_time
departure_time: Annotated[
datetime | None,
PiggybackShadowVariable(shadow_variable_name="arrival_time")
] = field(default=None)
def update_times(self):
# Update both arrival_time and departure_time
if self.vehicle is None:
self.arrival_time = None
self.departure_time = None
else:
self.arrival_time = self.calculate_arrival()
self.departure_time = self.arrival_time + self.service_duration
Complete Vehicle Routing Example
from dataclasses import dataclass, field
from typing import Annotated
from datetime import datetime, timedelta
from solverforge_legacy.solver.domain import (
planning_entity,
PlanningId,
PlanningListVariable,
InverseRelationShadowVariable,
PreviousElementShadowVariable,
NextElementShadowVariable,
CascadingUpdateShadowVariable,
)
@dataclass
class Location:
latitude: float
longitude: float
def driving_time_to(self, other: "Location") -> timedelta:
# Simplified: assume 1 second per km
distance = ((self.latitude - other.latitude)**2 +
(self.longitude - other.longitude)**2) ** 0.5
return timedelta(seconds=int(distance * 1000))
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
depot: Location
departure_time: datetime
capacity: int
visits: Annotated[list["Visit"], PlanningListVariable] = field(default_factory=list)
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
location: Location
demand: int
service_duration: timedelta
ready_time: datetime # Earliest arrival
due_time: datetime # Latest arrival
# Shadow variables
vehicle: Annotated[
Vehicle | None,
InverseRelationShadowVariable(source_variable_name="visits")
] = field(default=None)
previous_visit: Annotated[
"Visit | None",
PreviousElementShadowVariable(source_variable_name="visits")
] = field(default=None)
next_visit: Annotated[
"Visit | None",
NextElementShadowVariable(source_variable_name="visits")
] = field(default=None)
arrival_time: Annotated[
datetime | None,
CascadingUpdateShadowVariable(target_method_name="update_arrival_time")
] = field(default=None)
def update_arrival_time(self):
if self.vehicle is None:
self.arrival_time = None
return
if self.previous_visit is None:
# First visit in route
travel = self.vehicle.depot.driving_time_to(self.location)
self.arrival_time = self.vehicle.departure_time + travel
else:
# After previous visit
prev_departure = self.previous_visit.departure_time
if prev_departure is None:
self.arrival_time = None
return
travel = self.previous_visit.location.driving_time_to(self.location)
self.arrival_time = prev_departure + travel
@property
def departure_time(self) -> datetime | None:
if self.arrival_time is None:
return None
# Wait until ready_time if arriving early
start = max(self.arrival_time, self.ready_time)
return start + self.service_duration
def is_late(self) -> bool:
return self.arrival_time is not None and self.arrival_time > self.due_time
Shadow Variable Evaluation Order
Shadow variables are evaluated in dependency order:
InverseRelationShadowVariable - First (depends only on list variable)PreviousElementShadowVariable - SecondNextElementShadowVariable - SecondIndexShadowVariable - SecondCascadingUpdateShadowVariable - After their dependenciesPiggybackShadowVariable - With their shadow source
Using Shadow Variables in Constraints
Shadow variables can be used in constraints just like regular properties:
def arrival_after_due_time(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Visit)
.filter(lambda visit: visit.is_late())
.penalize(
HardSoftScore.ONE_SOFT,
lambda visit: int((visit.arrival_time - visit.due_time).total_seconds())
)
.as_constraint("Arrival after due time")
)
Best Practices
Do
- Use
InverseRelationShadowVariable when entities need to know their container - Use
CascadingUpdateShadowVariable for calculated values like arrival times - Keep update methods simple and fast
Don’t
- Create circular shadow variable dependencies
- Do expensive calculations in update methods
- Forget to handle
None cases
Next Steps
3.5 - Pinning
Lock specific assignments to prevent the solver from changing them.
Pinning locks certain assignments so the solver cannot change them. This is useful for:
- Preserving manual decisions
- Locking in-progress or completed work
- Incremental planning with fixed history
PlanningPin Annotation
Mark an entity as pinned using the PlanningPin annotation:
from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import (
planning_entity,
PlanningId,
PlanningVariable,
PlanningPin,
)
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
subject: str
teacher: str
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
room: Annotated[Room | None, PlanningVariable] = field(default=None)
# When True, solver won't change this lesson's assignments
pinned: Annotated[bool, PlanningPin] = field(default=False)
When pinned=True, the solver will not modify timeslot or room for this lesson.
Setting Pinned State
At Problem Creation
lessons = [
Lesson("1", "Math", "A. Turing", timeslot=monday_8am, room=room_a, pinned=True), # Fixed
Lesson("2", "Physics", "M. Curie", pinned=False), # Solver will assign
]
Based on Time
Pin lessons that are already in progress or past:
from datetime import datetime
def create_problem(lessons: list[Lesson], current_time: datetime) -> Timetable:
for lesson in lessons:
if lesson.timeslot and lesson.timeslot.start_time <= current_time:
lesson.pinned = True
return Timetable(...)
Based on User Decisions
def pin_manual_assignments(lesson: Lesson, is_manual: bool):
lesson.pinned = is_manual
PlanningPinToIndex for List Variables
For list variables (routing), you can pin elements up to a certain index:
from solverforge_legacy.solver.domain import PlanningPinToIndex
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
# Elements at index 0, 1, ..., (pinned_index-1) are pinned
pinned_index: Annotated[int, PlanningPinToIndex] = field(default=0)
Example:
pinned_index=0 - No visits are pinned (all can be reordered)pinned_index=3 - First 3 visits are locked in placepinned_index=len(visits) - All visits are pinned
Updating Pinned Index
def update_pinned_for_in_progress(vehicle: Vehicle, current_time: datetime):
"""Pin visits that have already started."""
pinned_count = 0
for visit in vehicle.visits:
if visit.arrival_time and visit.arrival_time <= current_time:
pinned_count += 1
else:
break # Stop at first unstarted visit
vehicle.pinned_index = pinned_count
Use Cases
Continuous Planning
In continuous planning, pin the past and near future:
def prepare_for_replanning(solution: Schedule, current_time: datetime, buffer: timedelta):
"""
Pin assignments that:
- Have already started (in the past)
- Are starting soon (within buffer time)
"""
publish_deadline = current_time + buffer
for shift in solution.shifts:
if shift.start_time < publish_deadline:
shift.pinned = True
else:
shift.pinned = False
Respecting User Decisions
def load_schedule_with_pins(raw_data) -> Schedule:
shifts = []
for data in raw_data:
shift = Shift(
id=data["id"],
employee=find_employee(data["employee_id"]),
pinned=data.get("manually_assigned", False)
)
shifts.append(shift)
return Schedule(shifts=shifts)
Incremental Solving
Pin everything except new entities:
def add_new_lessons(solution: Timetable, new_lessons: list[Lesson]) -> Timetable:
# Pin all existing lessons
for lesson in solution.lessons:
lesson.pinned = True
# Add new lessons (unpinned)
for lesson in new_lessons:
lesson.pinned = False
solution.lessons.append(lesson)
return solution
Behavior Notes
Pinned Entities Still Affect Score
Pinned entities participate in constraint evaluation:
# This constraint still fires if a pinned lesson conflicts with an unpinned one
def room_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(Lesson, ...)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Room conflict")
)
Initialization
Pinned entities must have their planning variables already assigned:
# Correct: pinned entity has assigned values
Lesson("1", "Math", "Teacher", timeslot=slot, room=room, pinned=True)
# Incorrect: pinned entity without assignment (will cause issues)
Lesson("2", "Physics", "Teacher", timeslot=None, room=None, pinned=True)
Constraints with Pinning
You might want different constraint behavior for pinned vs unpinned:
def prefer_unpinned_over_pinned(factory: ConstraintFactory) -> Constraint:
"""If there's a conflict, prefer to move the unpinned lesson."""
return (
factory.for_each(Lesson)
.filter(lambda lesson: lesson.pinned)
.join(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
Joiners.filtering(lambda pinned, other: not other.pinned)
)
# Penalize the unpinned lesson in conflict
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Conflict with pinned lesson")
)
Best Practices
Do
- Pin entities that represent completed or in-progress work
- Use
PlanningPinToIndex for routing problems - Ensure pinned entities have valid assignments
Don’t
- Pin too many entities (solver has less freedom)
- Forget to unpin entities when requirements change
- Create infeasible problems by pinning conflicting entities
Next Steps
4 - Constraints
Define constraints using the fluent Constraint Streams API.
Constraints define the rules that make a solution valid and optimal. SolverForge uses a fluent Constraint Streams API that lets you express constraints declaratively.
Topics
Constraint Types
| Type | Purpose | Example |
|---|
| Hard | Must be satisfied for feasibility | No two lessons in the same room at the same time |
| Soft | Preferences to optimize | Teachers prefer consecutive lessons |
| Medium | Between hard and soft (optional) | Important but not mandatory constraints |
Example
from solverforge_legacy.solver.score import (
constraint_provider, ConstraintFactory, Constraint, Joiners, HardSoftScore
)
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
return [
room_conflict(constraint_factory),
teacher_conflict(constraint_factory),
teacher_room_stability(constraint_factory),
]
def room_conflict(constraint_factory: ConstraintFactory) -> Constraint:
# Hard constraint: No two lessons in the same room at the same time
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.room),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Room conflict")
)
def teacher_room_stability(constraint_factory: ConstraintFactory) -> Constraint:
# Soft constraint: Teachers prefer teaching in the same room
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.teacher),
)
.filter(lambda lesson1, lesson2: lesson1.room != lesson2.room)
.penalize(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher room stability")
)
4.1 - Constraint Streams
Build constraints using the fluent Constraint Streams API.
The Constraint Streams API is a fluent, declarative way to define constraints. It’s inspired by Java Streams and SQL, allowing you to express complex scoring logic concisely.
Basic Structure
Every constraint follows this pattern:
from solverforge_legacy.solver.score import (
constraint_provider, ConstraintFactory, Constraint, HardSoftScore
)
@constraint_provider
def define_constraints(factory: ConstraintFactory) -> list[Constraint]:
return [
my_constraint(factory),
]
def my_constraint(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(MyEntity) # 1. Select entities
.filter(lambda e: e.is_active) # 2. Filter matches
.penalize(HardSoftScore.ONE_HARD) # 3. Apply score impact
.as_constraint("My constraint") # 4. Name the constraint
)
Stream Types
Streams are typed by the number of entities they carry:
| Stream Type | Entities | Example Use |
|---|
UniConstraintStream | 1 | Single entity constraints |
BiConstraintStream | 2 | Pair constraints |
TriConstraintStream | 3 | Triple constraints |
QuadConstraintStream | 4 | Quad constraints |
Starting a Stream
for_each()
Start with all instances of an entity class:
factory.for_each(Lesson)
# Stream of: Lesson1, Lesson2, Lesson3, ...
for_each_unique_pair()
Get all unique pairs (no duplicates, no self-pairs):
factory.for_each_unique_pair(Lesson)
# Stream of: (L1,L2), (L1,L3), (L2,L3), ...
# NOT: (L1,L1), (L2,L1), ...
With joiners for efficient filtering:
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
# Only pairs with same timeslot AND same room
for_each_including_unassigned()
Include entities with unassigned planning variables:
factory.for_each_including_unassigned(Lesson)
# Includes lessons where timeslot=None or room=None
Filtering
filter()
Remove non-matching items:
factory.for_each(Lesson)
.filter(lambda lesson: lesson.teacher == "A. Turing")
For bi-streams:
factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.room != l2.room)
Joining
join()
Combine streams:
factory.for_each(Lesson)
.join(Room)
# BiStream of (Lesson, Room) for all combinations
With joiners:
factory.for_each(Lesson)
.join(
Room,
Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)
# BiStream of (Lesson, Room) where lesson.room == room
See Joiners for available joiner types.
if_exists() / if_not_exists()
Check for existence without creating pairs:
# Lessons that have at least one other lesson in the same room
factory.for_each(Lesson)
.if_exists(
Lesson,
Joiners.equal(lambda l: l.room),
Joiners.filtering(lambda l1, l2: l1.id != l2.id)
)
# Employees not assigned to any shift
factory.for_each(Employee)
.if_not_exists(
Shift,
Joiners.equal(lambda emp: emp, lambda shift: shift.employee)
)
Grouping
group_by()
Aggregate entities:
from solverforge_legacy.solver.score import ConstraintCollectors
# Count lessons per teacher
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.count()
)
# BiStream of (teacher, count)
Multiple collectors:
# Get count and list of lessons per teacher
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.count(),
ConstraintCollectors.to_list(lambda l: l)
)
# TriStream of (teacher, count, lesson_list)
See Collectors for available collector types.
Mapping
map()
Transform stream elements:
factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
# UniStream of teachers (with duplicates)
expand()
Add derived values:
factory.for_each(Lesson)
.expand(lambda lesson: lesson.duration_minutes)
# BiStream of (Lesson, duration)
distinct()
Remove duplicates:
factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
.distinct()
# UniStream of unique teachers
Scoring
penalize()
Apply negative score for matches:
# Hard constraint
.penalize(HardSoftScore.ONE_HARD)
# Soft constraint
.penalize(HardSoftScore.ONE_SOFT)
# Dynamic weight
.penalize(HardSoftScore.ONE_SOFT, lambda lesson: lesson.priority)
reward()
Apply positive score for matches:
# Reward preferred assignments
.reward(HardSoftScore.ONE_SOFT, lambda lesson: lesson.preference_score)
impact()
Apply positive or negative score based on value:
# Positive values reward, negative values penalize
.impact(HardSoftScore.ONE_SOFT, lambda l: l.score_impact)
Finalizing
as_constraint()
Name the constraint (required):
.as_constraint("Room conflict")
justify_with()
Add custom justification for score explanation:
.penalize(HardSoftScore.ONE_HARD)
.justify_with(lambda l1, l2, score: RoomConflictJustification(l1, l2, score))
.as_constraint("Room conflict")
indict_with()
Specify which entities to blame:
.penalize(HardSoftScore.ONE_HARD)
.indict_with(lambda l1, l2: [l1, l2])
.as_constraint("Room conflict")
Complete Examples
Room Conflict (Hard)
def room_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Room conflict")
)
Teacher Room Stability (Soft)
def teacher_room_stability(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.teacher)
)
.filter(lambda l1, l2: l1.room != l2.room)
.penalize(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher room stability")
)
Balance Workload (Soft)
def balance_workload(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
.filter(lambda employee, count: count > 5)
.penalize(
HardSoftScore.ONE_SOFT,
lambda employee, count: count - 5 # Penalize excess shifts
)
.as_constraint("Balance workload")
)
Best Practices
Do
- Use joiners in
for_each_unique_pair() for efficiency - Name constraints descriptively
- Break complex constraints into helper functions
Don’t
- Use
filter() when a joiner would work (less efficient) - Create overly complex single constraints (split them)
- Forget to call
as_constraint()
Next Steps
4.2 - Joiners
Efficiently filter and match entities in constraint streams.
Joiners efficiently filter pairs of entities during joins and unique pair operations. They’re more efficient than post-join filtering because they use indexing.
Basic Usage
from solverforge_legacy.solver.score import Joiners
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.room),
)
Multiple joiners are combined with AND logic.
Available Joiners
equal()
Match when property values are equal:
# Same timeslot
Joiners.equal(lambda lesson: lesson.timeslot)
# In a join, specify both sides
factory.for_each(Lesson).join(
Room,
Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)
less_than() / less_than_or_equal()
Match when first value is less than second:
# l1.priority < l2.priority
Joiners.less_than(lambda lesson: lesson.priority)
# l1.start_time <= l2.start_time
Joiners.less_than_or_equal(lambda lesson: lesson.start_time)
greater_than() / greater_than_or_equal()
Match when first value is greater than second:
# l1.priority > l2.priority
Joiners.greater_than(lambda lesson: lesson.priority)
# l1.end_time >= l2.end_time
Joiners.greater_than_or_equal(lambda lesson: lesson.end_time)
overlapping()
Match when ranges overlap:
# Time overlap: [start1, end1) overlaps [start2, end2)
Joiners.overlapping(
lambda l: l.start_time, # Start of range 1
lambda l: l.end_time, # End of range 1
lambda l: l.start_time, # Start of range 2
lambda l: l.end_time, # End of range 2
)
For a join between different types:
factory.for_each(Meeting).join(
Availability,
Joiners.overlapping(
lambda m: m.start_time,
lambda m: m.end_time,
lambda a: a.start_time,
lambda a: a.end_time,
)
)
filtering()
Custom filter function (less efficient, use as last resort):
# Custom logic that can't be expressed with other joiners
Joiners.filtering(lambda l1, l2: l1.is_compatible_with(l2))
Combining Joiners
Joiners are combined with AND:
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot), # Same timeslot AND
Joiners.equal(lambda l: l.room), # Same room
)
Index-Based Joiners (Preferred)
These joiners use internal indexes for O(1) or O(log n) lookup:
equal() - Hash indexless_than(), greater_than() - Tree indexoverlapping() - Interval tree
Filtering Joiner (Slower)
filtering() checks every pair, O(n²):
# Avoid when possible - checks all pairs
Joiners.filtering(lambda l1, l2: some_complex_check(l1, l2))
Optimization Tips
Good: Index joiners first, filtering last:
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot), # Index first
Joiners.filtering(lambda l1, l2: custom(l1, l2)) # Filter remaining
)
Bad: Only filtering (checks all pairs):
factory.for_each_unique_pair(
Lesson,
Joiners.filtering(lambda l1, l2: l1.timeslot == l2.timeslot and custom(l1, l2))
)
Examples
Time Conflict Detection
def time_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Shift,
Joiners.equal(lambda s: s.employee),
Joiners.overlapping(
lambda s: s.start_time,
lambda s: s.end_time,
lambda s: s.start_time,
lambda s: s.end_time,
),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Employee time conflict")
)
Same Day Sequential
def same_day_sequential(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Lesson)
.join(
Lesson,
Joiners.equal(lambda l: l.teacher),
Joiners.equal(lambda l: l.timeslot.day_of_week),
Joiners.less_than(lambda l: l.timeslot.start_time),
Joiners.filtering(lambda l1, l2:
(l2.timeslot.start_time - l1.timeslot.end_time).seconds <= 1800
),
)
.reward(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher consecutive lessons")
)
Resource Assignment
def resource_assignment(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Task)
.join(
Resource,
Joiners.equal(lambda t: t.required_skill, lambda r: r.skill),
Joiners.greater_than_or_equal(lambda t: t.priority, lambda r: r.min_priority),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Resource skill match")
)
Joiner vs Filter
| Use Joiner When | Use Filter When |
|---|
| Checking equality | Complex logic |
| Comparing values | Multiple conditions with OR |
| Range overlap | Calling methods on entities |
| Performance matters | Simple one-off checks |
Next Steps
4.3 - Collectors
Aggregate data in constraint streams using collectors.
Collectors aggregate data when grouping entities. They’re used with group_by() to compute counts, sums, lists, and other aggregations.
Basic Usage
from solverforge_legacy.solver.score import ConstraintCollectors
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee, # Group key
ConstraintCollectors.count() # Collector
)
# Result: BiStream of (employee, count)
Available Collectors
count()
Count items in each group:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
# (Employee, int)
count_distinct()
Count unique values:
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.count_distinct(lambda l: l.room)
)
# (Teacher, number of distinct rooms)
sum()
Sum numeric values:
factory.for_each(Visit)
.group_by(
lambda visit: visit.vehicle,
ConstraintCollectors.sum(lambda v: v.demand)
)
# (Vehicle, total demand)
min() / max()
Find minimum or maximum:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.min(lambda s: s.start_time)
)
# (Employee, earliest start time)
With comparator:
ConstraintCollectors.max(
lambda shift: shift,
key=lambda s: s.priority
)
# Returns the shift with highest priority
average()
Calculate average:
factory.for_each(Task)
.group_by(
lambda task: task.worker,
ConstraintCollectors.average(lambda t: t.duration)
)
# (Worker, average task duration)
to_list()
Collect into a list:
factory.for_each(Visit)
.group_by(
lambda visit: visit.vehicle,
ConstraintCollectors.to_list(lambda v: v)
)
# (Vehicle, list of visits)
to_set()
Collect into a set (unique values):
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.to_set(lambda l: l.room)
)
# (Teacher, set of rooms)
to_sorted_set()
Collect into a sorted set:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.to_sorted_set(lambda s: s.start_time)
)
# (Employee, sorted set of start times)
compose()
Combine multiple collectors:
ConstraintCollectors.compose(
ConstraintCollectors.count(),
ConstraintCollectors.sum(lambda s: s.hours),
lambda count, total_hours: (count, total_hours)
)
# Returns (count, sum) tuple
conditional()
Collect only matching items:
ConstraintCollectors.conditional(
lambda shift: shift.is_night,
ConstraintCollectors.count()
)
# Count only night shifts
Multiple Collectors
Use multiple collectors in one group_by:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count(),
ConstraintCollectors.sum(lambda s: s.hours),
ConstraintCollectors.min(lambda s: s.start_time),
)
# QuadStream: (Employee, count, total_hours, earliest_start)
Grouping Patterns
Count Per Category
def balance_shift_count(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
.filter(lambda employee, count: count > 5)
.penalize(
HardSoftScore.ONE_SOFT,
lambda employee, count: (count - 5) ** 2
)
.as_constraint("Balance shift count")
)
Sum with Threshold
def vehicle_capacity(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Visit)
.group_by(
lambda visit: visit.vehicle,
ConstraintCollectors.sum(lambda v: v.demand)
)
.filter(lambda vehicle, total: total > vehicle.capacity)
.penalize(
HardSoftScore.ONE_HARD,
lambda vehicle, total: total - vehicle.capacity
)
.as_constraint("Vehicle capacity")
)
Load Distribution
def fair_distribution(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Task)
.group_by(
lambda task: task.worker,
ConstraintCollectors.count()
)
.group_by(
ConstraintCollectors.min(lambda worker, count: count),
ConstraintCollectors.max(lambda worker, count: count),
)
.filter(lambda min_count, max_count: max_count - min_count > 2)
.penalize(
HardSoftScore.ONE_SOFT,
lambda min_count, max_count: max_count - min_count
)
.as_constraint("Fair task distribution")
)
Consecutive Detection
def consecutive_shifts(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.to_sorted_set(lambda s: s.date)
)
.filter(lambda employee, dates: has_consecutive_days(dates, 6))
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Max consecutive days")
)
def has_consecutive_days(dates: set, max_consecutive: int) -> bool:
sorted_dates = sorted(dates)
consecutive = 1
for i in range(1, len(sorted_dates)):
if (sorted_dates[i] - sorted_dates[i-1]).days == 1:
consecutive += 1
if consecutive > max_consecutive:
return True
else:
consecutive = 1
return False
Prefer count() over to_list()
# Good: Efficient counting
ConstraintCollectors.count()
# Avoid: Creates list just to count
ConstraintCollectors.to_list(lambda x: x).map(len)
Use conditional() for Filtered Counts
# Good: Single pass
ConstraintCollectors.conditional(
lambda s: s.is_weekend,
ConstraintCollectors.count()
)
# Avoid: Filter then count
factory.for_each(Shift)
.filter(lambda s: s.is_weekend)
.group_by(...)
Minimize Data in Collectors
# Good: Collect only needed data
ConstraintCollectors.to_list(lambda s: s.start_time)
# Avoid: Collect entire objects
ConstraintCollectors.to_list(lambda s: s)
Next Steps
4.4 - Score Types
Choose the right score type for your constraints.
Score types determine how constraint violations and rewards are measured. Choose the type that matches your problem’s structure.
Available Score Types
| Score Type | Levels | Use Case |
|---|
SimpleScore | 1 | Single optimization objective |
HardSoftScore | 2 | Feasibility + optimization |
HardMediumSoftScore | 3 | Hard + important + nice-to-have |
BendableScore | N | Custom number of levels |
*DecimalScore variants | - | Decimal precision |
SimpleScore
For single-objective optimization:
from solverforge_legacy.solver.score import SimpleScore
# In domain model
score: Annotated[SimpleScore, PlanningScore] = field(default=None)
# In constraints
.penalize(SimpleScore.ONE)
.reward(SimpleScore.of(10))
Use when: You only need to maximize or minimize one thing (e.g., total profit, total distance).
HardSoftScore
The most common type—separates feasibility from optimization:
from solverforge_legacy.solver.score import HardSoftScore
# In domain model
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
# In constraints
.penalize(HardSoftScore.ONE_HARD) # Broken constraint
.penalize(HardSoftScore.ONE_SOFT) # Suboptimal
.penalize(HardSoftScore.of_hard(5)) # Weighted hard
.penalize(HardSoftScore.of_soft(10)) # Weighted soft
Hard constraints:
- Must be satisfied for a feasible solution
- Score format:
Xhard/Ysoft 0hard/*soft = feasible
Soft constraints:
- Preferences to optimize
- Better soft scores are preferred among feasible solutions
Use when: You have rules that must be followed AND preferences to optimize.
HardMediumSoftScore
Three levels of priority:
from solverforge_legacy.solver.score import HardMediumSoftScore
# In domain model
score: Annotated[HardMediumSoftScore, PlanningScore] = field(default=None)
# In constraints
.penalize(HardMediumSoftScore.ONE_HARD) # Must satisfy
.penalize(HardMediumSoftScore.ONE_MEDIUM) # Important preference
.penalize(HardMediumSoftScore.ONE_SOFT) # Nice to have
Use when:
- Medium = “Assign as many as possible”
- Medium = “Important but not mandatory”
- Medium = “Prefer over soft, but not as critical as hard”
Example: Meeting scheduling where:
- Hard: Required attendees must be available
- Medium: Preferred attendees should attend
- Soft: Room size preferences
BendableScore
Custom number of hard and soft levels:
from solverforge_legacy.solver.score import BendableScore
# Configure levels (3 hard, 2 soft)
score: Annotated[BendableScore, PlanningScore] = field(default=None)
# In constraints
.penalize(BendableScore.of_hard(0, 1)) # First hard level
.penalize(BendableScore.of_hard(1, 1)) # Second hard level
.penalize(BendableScore.of_soft(0, 1)) # First soft level
Use when: You need more than 3 priority levels.
Decimal Score Variants
For precise calculations:
from solverforge_legacy.solver.score import HardSoftDecimalScore
score: Annotated[HardSoftDecimalScore, PlanningScore] = field(default=None)
# In constraints
from decimal import Decimal
.penalize(HardSoftDecimalScore.of_soft(Decimal("0.01")))
Available variants:
SimpleDecimalScoreHardSoftDecimalScoreHardMediumSoftDecimalScoreBendableDecimalScore
Use when: Integer scores aren’t precise enough (e.g., money, distances).
Score Constants
Common score values are predefined:
# SimpleScore
SimpleScore.ZERO
SimpleScore.ONE
SimpleScore.of(n)
# HardSoftScore
HardSoftScore.ZERO
HardSoftScore.ONE_HARD
HardSoftScore.ONE_SOFT
HardSoftScore.of_hard(n)
HardSoftScore.of_soft(n)
HardSoftScore.of(hard, soft)
# HardMediumSoftScore
HardMediumSoftScore.ZERO
HardMediumSoftScore.ONE_HARD
HardMediumSoftScore.ONE_MEDIUM
HardMediumSoftScore.ONE_SOFT
HardMediumSoftScore.of(hard, medium, soft)
Dynamic Weights
Apply weights based on entity properties:
def weighted_penalty(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Task)
.filter(lambda t: t.is_late())
.penalize(
HardSoftScore.ONE_SOFT,
lambda task: task.priority # High priority = bigger penalty
)
.as_constraint("Late task")
)
Score Comparison
Scores are compared level by level:
# Hard first, then soft
0hard/-100soft > -1hard/0soft (first is feasible)
-1hard/-50soft > -2hard/-10soft (first has better hard)
0hard/-50soft > 0hard/-100soft (same hard, better soft)
Score Properties
score = HardSoftScore.of(-2, -100)
score.is_feasible # False (hard < 0)
score.hard_score # -2
score.soft_score # -100
str(score) # "-2hard/-100soft"
HardSoftScore.parse("-2hard/-100soft") # Parse from string
Choosing a Score Type
| Question | Recommendation |
|---|
| Need feasibility check? | Use HardSoftScore |
| Single objective only? | Use SimpleScore |
| “Assign as many as possible”? | Use HardMediumSoftScore |
| More than 3 priority levels? | Use BendableScore |
| Need decimal precision? | Use *DecimalScore variant |
Best Practices
Do
- Use
HardSoftScore as default choice - Keep hard constraints truly hard (legal requirements, physical limits)
- Use consistent weight scales within each level
Don’t
- Use medium level for actual hard constraints
- Over-complicate with BendableScore when HardMediumSoftScore works
- Mix units in the same level (e.g., minutes and dollars)
Next Steps
4.5 - Score Analysis
Understand why a solution has its score.
Score analysis helps you understand why a solution received its score. This is essential for debugging constraints and explaining results to users.
SolutionManager
Use SolutionManager to analyze solutions:
from solverforge_legacy.solver import SolverFactory, SolutionManager
solver_factory = SolverFactory.create(solver_config)
solution_manager = SolutionManager.create(solver_factory)
# Analyze a solution
analysis = solution_manager.analyze(solution)
Score Explanation
Get a breakdown of constraint scores:
analysis = solution_manager.analyze(solution)
# Overall score
print(f"Score: {analysis.score}")
# Per-constraint breakdown
for constraint_analysis in analysis.constraint_analyses():
print(f"{constraint_analysis.constraint_name}: {constraint_analysis.score}")
print(f" Match count: {constraint_analysis.match_count}")
Example output:
Score: -2hard/-15soft
Room conflict: -2hard
Match count: 2
Teacher room stability: -10soft
Match count: 10
Teacher time efficiency: -5soft
Match count: 5
Constraint Matches
See exactly which entities triggered each constraint:
for constraint_analysis in analysis.constraint_analyses():
print(f"\n{constraint_analysis.constraint_name}:")
for match in constraint_analysis.matches():
print(f" Match: {match.justification}")
print(f" Score: {match.score}")
Indictments
Find which entities are causing problems:
# Get indictments (entities blamed for score impact)
for indictment in analysis.indictments():
print(f"\nEntity: {indictment.indicted_object}")
print(f"Total score impact: {indictment.score}")
for match in indictment.matches():
print(f" - {match.constraint_name}: {match.score}")
Example output:
Entity: Lesson(id=1, subject='Math')
Total score impact: -1hard/-3soft
- Room conflict: -1hard
- Teacher room stability: -3soft
Custom Justifications
Add explanations to your constraints:
@dataclass
class RoomConflictJustification:
lesson1: Lesson
lesson2: Lesson
timeslot: Timeslot
room: Room
def __str__(self):
return (f"{self.lesson1.subject} and {self.lesson2.subject} "
f"both scheduled in {self.room} at {self.timeslot}")
def room_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
.penalize(HardSoftScore.ONE_HARD)
.justify_with(lambda l1, l2, score: RoomConflictJustification(
l1, l2, l1.timeslot, l1.room
))
.as_constraint("Room conflict")
)
Debugging Constraints
Verify Score Calculation
# Calculate score without solving
score = solution_manager.update(solution)
print(f"Calculated score: {score}")
Find Missing Constraints
If a constraint isn’t firing when expected:
# Check if specific entities match
for constraint_analysis in analysis.constraint_analyses():
if constraint_analysis.constraint_name == "Room conflict":
if constraint_analysis.match_count == 0:
print("No room conflicts detected!")
# Check your joiners and filters
Verify Feasibility
if not solution.score.is_feasible:
print("Solution is infeasible!")
for ca in analysis.constraint_analyses():
if ca.score.hard_score < 0:
print(f"Hard constraint broken: {ca.constraint_name}")
for match in ca.matches():
print(f" {match.justification}")
Integration with FastAPI
Expose score analysis in your API:
from fastapi import FastAPI
@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
solution = solutions.get(job_id)
if not solution:
raise HTTPException(404, "Job not found")
analysis = solution_manager.analyze(solution)
return {
"score": str(analysis.score),
"is_feasible": analysis.score.is_feasible,
"constraints": [
{
"name": ca.constraint_name,
"score": str(ca.score),
"match_count": ca.match_count,
}
for ca in analysis.constraint_analyses()
]
}
Best Practices
Do
- Use
justify_with() for user-facing explanations - Check score analysis when debugging constraints
- Expose score breakdown in your UI
Don’t
- Analyze every solution during solving (performance)
- Ignore indictments when troubleshooting
- Forget to handle infeasible solutions
Score Comparison
Compare two solutions:
def compare_solutions(old: Timetable, new: Timetable):
old_analysis = solution_manager.analyze(old)
new_analysis = solution_manager.analyze(new)
print(f"Score improved: {old.score} -> {new.score}")
old_constraints = {ca.constraint_name: ca for ca in old_analysis.constraint_analyses()}
new_constraints = {ca.constraint_name: ca for ca in new_analysis.constraint_analyses()}
for name in old_constraints:
old_ca = old_constraints[name]
new_ca = new_constraints.get(name)
if new_ca and old_ca.score != new_ca.score:
print(f" {name}: {old_ca.score} -> {new_ca.score}")
Next Steps
4.6 - Constraint Performance
Optimize constraint evaluation for faster solving.
Efficient constraint evaluation is critical for solver performance. Most solving time is spent calculating scores, so optimizing constraints has a direct impact on solution quality.
1. Use Joiners Instead of Filters
Joiners use indexes for O(1) lookups. Filters check every item.
# Good: Uses index
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot)
)
# Bad: Checks all pairs
factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.timeslot == l2.timeslot)
2. Put Selective Joiners First
More selective joiners reduce the search space faster:
# Good: timeslot has few values, filters early
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot), # Few timeslots
Joiners.equal(lambda l: l.teacher), # More teachers
)
# Less efficient: teacher might have many values
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.teacher), # Many teachers
Joiners.equal(lambda l: l.timeslot), # Then timeslot
)
3. Avoid Expensive Lambda Operations
# Good: Simple property access
Joiners.equal(lambda l: l.timeslot)
# Bad: Complex calculation in joiner
Joiners.equal(lambda l: calculate_complex_hash(l))
4. Use Cached Properties
@planning_entity
@dataclass
class Lesson:
# Pre-calculate expensive values
@cached_property
def combined_key(self):
return (self.timeslot, self.room)
# Use cached property in constraint
Joiners.equal(lambda l: l.combined_key)
Common Optimizations
Replace for_each + filter with for_each_unique_pair
# Before: Inefficient
factory.for_each(Lesson)
.join(Lesson)
.filter(lambda l1, l2: l1.id != l2.id and l1.timeslot == l2.timeslot)
# After: Efficient
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot)
)
Use if_exists() Instead of Join + group_by
# Before: Creates pairs then groups
factory.for_each(Employee)
.join(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))
.group_by(lambda e, s: e, ConstraintCollectors.count())
.filter(lambda e, count: count > 0)
# After: Just checks existence
factory.for_each(Employee)
.if_exists(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))
Avoid Redundant Constraints
# Redundant: Two constraints that overlap
def constraint1(factory):
# Penalizes A and B in same room
...
def constraint2(factory):
# Penalizes A and B in same room and same timeslot
... # This overlaps with constraint1!
# Better: One specific constraint
def room_conflict(factory):
# Only penalizes same room AND same timeslot
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
Limit Collection Sizes in Collectors
# Bad: Collects everything
ConstraintCollectors.to_list(lambda s: s)
# Better: Collect only what's needed
ConstraintCollectors.to_list(lambda s: s.start_time)
# Best: Use aggregate if possible
ConstraintCollectors.count()
Incremental Score Calculation
SolverForge uses incremental score calculation—only recalculating affected constraints when a move is made. Help this work efficiently:
Keep Constraints Independent
# Good: Constraints don't share state
def room_conflict(factory):
return factory.for_each_unique_pair(...)
def teacher_conflict(factory):
return factory.for_each_unique_pair(...)
# Bad: Shared calculation affects both
shared_data = calculate_once() # Recalculated on every change!
Avoid Global State
# Bad: References external data
external_config = load_config()
def my_constraint(factory):
return factory.for_each(Lesson)
.filter(lambda l: l.priority > external_config.threshold) # External ref
Benchmarking Constraints
Enable Debug Logging
import logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)
Time Individual Constraints
import time
def timed_constraint(factory):
start = time.time()
result = actual_constraint(factory)
print(f"Constraint built in {time.time() - start:.3f}s")
return result
Use the Benchmarker
For systematic comparison, use the Benchmarker (see Benchmarking).
Score Corruption Detection
Enable environment mode for debugging:
from solverforge_legacy.solver.config import EnvironmentMode
SolverConfig(
environment_mode=EnvironmentMode.FULL_ASSERT, # Detects score corruption
...
)
Modes:
NON_REPRODUCIBLE - Fastest, no checksREPRODUCIBLE - Deterministic but no validationFAST_ASSERT - Quick validation checksFULL_ASSERT - Complete validation (slowest)
Use FULL_ASSERT during development, REPRODUCIBLE or NON_REPRODUCIBLE in production.
| Symptom | Likely Cause | Solution |
|---|
| Very slow start | Complex constraint building | Simplify or cache |
| Slow throughout | Filter instead of joiner | Use joiners |
| Memory issues | Large collections | Use aggregates |
| Score corruption | Incorrect incremental calc | Enable FULL_ASSERT |
Next Steps
4.7 - Testing Constraints
Test constraints in isolation for correctness.
Testing constraints ensures they behave correctly before integrating with the full solver. This catches bugs early and documents expected behavior.
Basic Constraint Testing
Test individual constraints with minimal data:
import pytest
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
from datetime import time
from my_app.domain import Timetable, Timeslot, Room, Lesson
from my_app.constraints import define_constraints
@pytest.fixture
def solution_manager():
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=1))
)
factory = SolverFactory.create(config)
return SolutionManager.create(factory)
def test_room_conflict(solution_manager):
"""Two lessons in the same room at the same time should penalize."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
# Two lessons in same room and timeslot
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should have -1 hard for the room conflict
assert analysis.score.hard_score == -1
def test_no_room_conflict(solution_manager):
"""Two lessons in different rooms should not conflict."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room_a = Room("Room A")
room_b = Room("Room B")
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room_a)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room_b)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room_a, room_b],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should have no hard constraint violations
assert analysis.score.hard_score == 0
Testing Constraint Weight
Verify the magnitude of penalties:
def test_teacher_room_stability_weight(solution_manager):
"""Teacher using multiple rooms should incur soft penalty per extra room."""
timeslot1 = Timeslot("MONDAY", time(8, 30), time(9, 30))
timeslot2 = Timeslot("MONDAY", time(9, 30), time(10, 30))
room_a = Room("Room A")
room_b = Room("Room B")
# Same teacher, different rooms
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot1, room_a)
lesson2 = Lesson("2", "Math", "Teacher A", "Group 2", timeslot2, room_b)
problem = Timetable(
id="test",
timeslots=[timeslot1, timeslot2],
rooms=[room_a, room_b],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should have soft penalty for room instability
assert analysis.score.soft_score < 0
# Verify specific constraint triggered
room_stability = next(
ca for ca in analysis.constraint_analyses()
if ca.constraint_name == "Teacher room stability"
)
assert room_stability.match_count == 1
Testing with Fixtures
Create reusable test fixtures:
@pytest.fixture
def timeslots():
return [
Timeslot("MONDAY", time(8, 30), time(9, 30)),
Timeslot("MONDAY", time(9, 30), time(10, 30)),
Timeslot("TUESDAY", time(8, 30), time(9, 30)),
]
@pytest.fixture
def rooms():
return [Room("A"), Room("B"), Room("C")]
@pytest.fixture
def empty_problem(timeslots, rooms):
return Timetable(
id="test",
timeslots=timeslots,
rooms=rooms,
lessons=[]
)
def test_empty_problem_is_feasible(solution_manager, empty_problem):
"""Empty problem should have zero score."""
analysis = solution_manager.analyze(empty_problem)
assert analysis.score == HardSoftScore.ZERO
Testing Edge Cases
Unassigned Variables
def test_unassigned_lesson(solution_manager):
"""Unassigned lessons should not cause conflicts."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
# One assigned, one not
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", None, None)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should not have room conflict (lesson2 is unassigned)
assert analysis.score.hard_score == 0
Multiple Violations
def test_multiple_conflicts(solution_manager):
"""Three lessons in same room/time should create multiple conflicts."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
lesson1 = Lesson("1", "Math", "A", "G1", timeslot, room)
lesson2 = Lesson("2", "Physics", "B", "G2", timeslot, room)
lesson3 = Lesson("3", "Chemistry", "C", "G3", timeslot, room)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2, lesson3]
)
analysis = solution_manager.analyze(problem)
# 3 lessons create 3 unique pairs: (1,2), (1,3), (2,3)
assert analysis.score.hard_score == -3
Feasibility Testing
Test that the solver can find a feasible solution:
def test_feasible_solution():
"""Solver should find a feasible solution for small problems."""
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=5))
)
factory = SolverFactory.create(config)
solver = factory.build_solver()
problem = generate_small_problem()
solution = solver.solve(problem)
assert solution.score.is_feasible, f"Solution infeasible: {solution.score}"
Parameterized Tests
Test multiple scenarios efficiently:
@pytest.mark.parametrize("num_lessons,expected_conflicts", [
(1, 0), # Single lesson: no conflicts
(2, 1), # Two lessons: one pair
(3, 3), # Three lessons: three pairs
(4, 6), # Four lessons: six pairs
])
def test_all_in_same_room_timeslot(solution_manager, num_lessons, expected_conflicts):
"""n lessons in same room/time should create n*(n-1)/2 conflicts."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
lessons = [
Lesson(str(i), f"Subject{i}", f"Teacher{i}", "Group", timeslot, room)
for i in range(num_lessons)
]
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=lessons
)
analysis = solution_manager.analyze(problem)
assert analysis.score.hard_score == -expected_conflicts
Testing Justifications
def test_constraint_justification(solution_manager):
"""Constraint should provide meaningful justification."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
room_conflict_ca = next(
ca for ca in analysis.constraint_analyses()
if ca.constraint_name == "Room conflict"
)
match = next(room_conflict_ca.matches())
assert "Room A" in str(match.justification)
assert "MONDAY" in str(match.justification)
Best Practices
Do
- Test each constraint in isolation
- Test both positive and negative cases
- Test edge cases (empty, unassigned, maximum)
- Use descriptive test names
Don’t
- Skip constraint testing
- Only test happy paths
- Use production data sizes in unit tests
- Ignore constraint weights
Next Steps
5 - Solver
Configure and run the solver to find optimal solutions.
The solver is the engine that finds optimal solutions to your planning problems. This section covers how to configure and run it.
Topics
Quick Example
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
# Configure the solver
solver_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
)
)
# Create and run the solver
solver_factory = SolverFactory.create(solver_config)
solver = solver_factory.build_solver()
problem = load_problem() # Your problem data
solution = solver.solve(problem)
print(f"Best score: {solution.score}")
Termination
The solver needs to know when to stop. Common termination conditions:
| Condition | Description |
|---|
spent_limit | Stop after a time limit (e.g., 30 seconds) |
best_score_limit | Stop when a target score is reached |
unimproved_spent_limit | Stop if no improvement for a duration |
step_count_limit | Stop after a number of optimization steps |
5.1 - Solver Configuration
Configure the solver with SolverConfig and related classes.
Configure the solver using Python dataclasses. This defines what to solve, how to score, and when to stop.
SolverConfig
The main configuration class:
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
solver_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
),
)
Required Fields
| Field | Description |
|---|
solution_class | The @planning_solution class |
entity_class_list | List of @planning_entity classes |
score_director_factory_config | How to calculate scores |
Optional Fields
| Field | Description | Default |
|---|
termination_config | When to stop | Never (manual termination) |
environment_mode | Validation level | REPRODUCIBLE |
random_seed | For reproducibility | Random |
ScoreDirectorFactoryConfig
Configures constraint evaluation:
ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
)
With Constraint Provider
from my_app.constraints import define_constraints
ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
)
TerminationConfig
Controls when the solver stops:
Time Limit
TerminationConfig(
spent_limit=Duration(seconds=30)
)
# Other duration units
Duration(minutes=5)
Duration(hours=1)
Duration(milliseconds=500)
Score Target
Stop when a target score is reached:
TerminationConfig(
best_score_limit="0hard/-10soft"
)
Step Limit
Stop after a number of steps:
TerminationConfig(
step_count_limit=10000
)
Unimproved Time
Stop if no improvement for a duration:
TerminationConfig(
unimproved_spent_limit=Duration(seconds=30)
)
Combining Conditions
Multiple conditions use OR logic:
TerminationConfig(
spent_limit=Duration(minutes=5),
best_score_limit="0hard/0soft", # OR achieves perfect
unimproved_spent_limit=Duration(seconds=60) # OR stuck
)
Environment Mode
Controls validation and reproducibility:
from solverforge_legacy.solver.config import EnvironmentMode
SolverConfig(
environment_mode=EnvironmentMode.REPRODUCIBLE,
...
)
| Mode | Description | Use Case |
|---|
NON_REPRODUCIBLE | Fastest, no validation | Production |
REPRODUCIBLE | Deterministic results | Default |
FAST_ASSERT | Quick validation | Testing |
FULL_ASSERT | Complete validation | Debugging |
Debugging Score Corruption
Use FULL_ASSERT to detect score calculation bugs:
SolverConfig(
environment_mode=EnvironmentMode.FULL_ASSERT,
...
)
This validates every score calculation but is slow.
Reproducibility
For reproducible results, set a random seed:
SolverConfig(
random_seed=42,
environment_mode=EnvironmentMode.REPRODUCIBLE,
...
)
Configuration Overrides
Override configuration when building a solver:
from solverforge_legacy.solver.config import SolverConfigOverride
solver_factory = SolverFactory.create(solver_config)
# Override termination for this solver instance
override = SolverConfigOverride(
termination_config=TerminationConfig(spent_limit=Duration(seconds=10))
)
solver = solver_factory.build_solver(override)
Complete Example
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
EnvironmentMode,
)
from my_app.domain import Timetable, Lesson
from my_app.constraints import define_constraints
def create_solver():
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(minutes=5),
best_score_limit="0hard/0soft",
),
environment_mode=EnvironmentMode.REPRODUCIBLE,
random_seed=42,
)
factory = SolverFactory.create(config)
return factory.build_solver()
Configuration Best Practices
Development
SolverConfig(
environment_mode=EnvironmentMode.FULL_ASSERT,
termination_config=TerminationConfig(spent_limit=Duration(seconds=10)),
...
)
Testing
SolverConfig(
environment_mode=EnvironmentMode.REPRODUCIBLE,
random_seed=42, # Reproducible tests
termination_config=TerminationConfig(spent_limit=Duration(seconds=5)),
...
)
Production
SolverConfig(
environment_mode=EnvironmentMode.NON_REPRODUCIBLE,
termination_config=TerminationConfig(
spent_limit=Duration(minutes=5),
unimproved_spent_limit=Duration(minutes=1),
),
...
)
Next Steps
5.2 - Running the Solver
Execute the solver synchronously with Solver.solve().
The simplest way to solve a problem is with Solver.solve(), which blocks until termination.
Basic Usage
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
# Configure
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)
# Create factory and solver
factory = SolverFactory.create(config)
solver = factory.build_solver()
# Load problem
problem = load_problem()
# Solve (blocks until done)
solution = solver.solve(problem)
# Use solution
print(f"Score: {solution.score}")
Event Listeners
Monitor progress with event listeners:
from solverforge_legacy.solver import BestSolutionChangedEvent
def on_best_solution_changed(event: BestSolutionChangedEvent):
print(f"New best score: {event.new_best_score}")
print(f"Time spent: {event.time_spent}")
solver.add_event_listener(on_best_solution_changed)
solution = solver.solve(problem)
BestSolutionChangedEvent Properties
| Property | Description |
|---|
new_best_score | The new best score |
new_best_solution | The new best solution |
time_spent | Duration since solving started |
is_new_best_solution_initialized | True if all variables are assigned |
Removing Listeners
solver.add_event_listener(listener)
# ... later ...
solver.remove_event_listener(listener)
Early Termination
Stop solving before the termination condition:
import threading
def timeout_termination(solver, timeout_seconds):
"""Terminate after timeout."""
time.sleep(timeout_seconds)
solver.terminate_early()
# Start termination thread
thread = threading.Thread(target=timeout_termination, args=(solver, 60))
thread.start()
solution = solver.solve(problem)
Manual Termination
# From another thread
solver.terminate_early()
# Check if termination was requested
if solver.is_terminate_early():
print("Termination was requested")
Checking Solver State
# Is the solver currently running?
if solver.is_solving():
print("Solver is running")
# Was early termination requested?
if solver.is_terminate_early():
print("Termination requested")
Problem Changes (Real-Time)
Modify the problem while solving:
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange[Timetable]):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, working_solution: Timetable, score_director):
# Add to working solution
working_solution.lessons.append(self.lesson)
# Notify score director
score_director.after_entity_added(self.lesson)
# Add change while solving
new_lesson = Lesson("new", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))
See Real-Time Planning for more details.
Solver Reuse
Don’t reuse a solver instance—create a new one for each solve:
# Correct: New solver each time
solver1 = factory.build_solver()
solution1 = solver1.solve(problem1)
solver2 = factory.build_solver()
solution2 = solver2.solve(problem2)
# Incorrect: Reusing solver
solver = factory.build_solver()
solution1 = solver.solve(problem1)
solution2 = solver.solve(problem2) # Don't do this!
Threading
Solver.solve() blocks the calling thread. For non-blocking operation, use:
Background thread:
thread = threading.Thread(target=lambda: solver.solve(problem))
thread.start()
SolverManager (recommended for production):
See SolverManager
Error Handling
try:
solution = solver.solve(problem)
except Exception as e:
print(f"Solving failed: {e}")
# Handle error (log, retry, etc.)
Complete Example
from solverforge_legacy.solver import SolverFactory, BestSolutionChangedEvent
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("solver")
def solve_timetable(problem: Timetable) -> Timetable:
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(minutes=5),
unimproved_spent_limit=Duration(seconds=60),
),
)
factory = SolverFactory.create(config)
solver = factory.build_solver()
# Log progress
def on_progress(event: BestSolutionChangedEvent):
logger.info(f"Score: {event.new_best_score} at {event.time_spent}")
solver.add_event_listener(on_progress)
# Solve
logger.info("Starting solver...")
solution = solver.solve(problem)
logger.info(f"Solving finished. Final score: {solution.score}")
return solution
if __name__ == "__main__":
problem = load_problem()
solution = solve_timetable(problem)
save_solution(solution)
Next Steps
5.3 - SolverManager
Manage concurrent and asynchronous solving jobs.
SolverManager handles concurrent solving jobs, making it ideal for web applications and services.
Creating a SolverManager
from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)
solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)
Basic Solving
solve()
Non-blocking solve that returns a future:
import uuid
job_id = str(uuid.uuid4())
# Start solving (non-blocking)
future = solver_manager.solve(job_id, problem)
# ... do other work ...
# Get result (blocks until done)
solution = future.get_final_best_solution()
print(f"Score: {solution.score}")
solve_and_listen()
Solve with progress callbacks:
def on_best_solution_changed(solution: Timetable):
print(f"New best: {solution.score}")
# Update UI, save to database, etc.
def on_exception(error):
print(f"Solving failed: {error}")
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_best_solution_changed,
exception_handler=on_exception,
)
Managing Jobs
Check Job Status
status = solver_manager.get_solver_status(job_id)
# Returns: NOT_SOLVING, SOLVING_ACTIVE, SOLVING_ENDED
Get Current Best Solution
solution = solver_manager.get_best_solution(job_id)
if solution:
print(f"Current best: {solution.score}")
Terminate Early
solver_manager.terminate_early(job_id)
FastAPI Integration
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid
solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
global solver_manager
config = SolverConfig(...)
factory = SolverFactory.create(config)
solver_manager = SolverManager.create(factory)
yield
solver_manager.close()
app = FastAPI(lifespan=lifespan)
@app.post("/solve")
async def start_solving(problem: TimetableRequest) -> str:
job_id = str(uuid.uuid4())
def on_best_solution(solution: Timetable):
solutions[job_id] = solution
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem.to_domain(),
best_solution_consumer=on_best_solution,
)
return job_id
@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
status = solver_manager.get_solver_status(job_id)
return {
"status": status.name,
"score": str(solution.score),
"solution": TimetableResponse.from_domain(solution),
}
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
solver_manager.terminate_early(job_id)
return {"status": "terminating"}
Concurrent Jobs
SolverManager handles multiple jobs concurrently:
# Start multiple jobs
job1 = solver_manager.solve("job1", problem1)
job2 = solver_manager.solve("job2", problem2)
job3 = solver_manager.solve("job3", problem3)
# Each runs in its own thread
# Results available when ready
solution1 = job1.get_final_best_solution()
Resource Limits
By default, jobs run with no limit on concurrent execution. For resource management:
# Limit concurrent solvers
solver_manager = SolverManager.create(
solver_factory,
parallel_solver_count=4, # Max 4 concurrent jobs
)
Problem Changes During Solving
Add changes to running jobs:
from solverforge_legacy.solver import ProblemChange
class AddEntity(ProblemChange[Timetable]):
def __init__(self, entity):
self.entity = entity
def do_change(self, working_solution, score_director):
working_solution.lessons.append(self.entity)
score_director.after_entity_added(self.entity)
# Add change to running job
solver_manager.add_problem_change(job_id, AddEntity(new_lesson))
Cleanup
Always close the SolverManager when done:
# Using context manager
with SolverManager.create(factory) as manager:
# ... use manager ...
# Automatically closed
# Manual cleanup
try:
# ... use manager ...
finally:
solver_manager.close()
Error Handling
def on_exception(job_id: str, exception: Exception):
logger.error(f"Job {job_id} failed: {exception}")
# Clean up, notify user, etc.
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_solution,
exception_handler=on_exception,
)
Best Practices
Do
- Use
solve_and_listen() for progress updates - Store solutions externally (database, cache)
- Handle exceptions properly
- Close SolverManager on shutdown
Don’t
- Block the main thread waiting for results
- Store solutions only in memory (lose on restart)
- Forget to handle job cleanup
Next Steps
5.4 - SolutionManager
Analyze and explain solutions with SolutionManager.
SolutionManager provides utilities for analyzing solutions without running the solver.
Creating a SolutionManager
from solverforge_legacy.solver import SolverFactory, SolutionManager
solver_factory = SolverFactory.create(config)
solution_manager = SolutionManager.create(solver_factory)
Or from a SolverManager:
solver_manager = SolverManager.create(solver_factory)
solution_manager = SolutionManager.create(solver_manager)
Score Calculation
Calculate the score of a solution without solving:
# Update score in place
solution_manager.update(solution)
print(f"Score: {solution.score}")
This is useful for:
- Validating manually created solutions
- Comparing before/after changes
- Testing constraint configurations
Score Analysis
Get a detailed breakdown of the score:
analysis = solution_manager.analyze(solution)
print(f"Overall score: {analysis.score}")
# Per-constraint breakdown
for constraint in analysis.constraint_analyses():
print(f"\n{constraint.constraint_name}:")
print(f" Score: {constraint.score}")
print(f" Matches: {constraint.match_count}")
Constraint Matches
See exactly which entities triggered each constraint:
for constraint in analysis.constraint_analyses():
print(f"\n{constraint.constraint_name}:")
for match in constraint.matches():
print(f" - {match.justification}: {match.score}")
Indictments
Find which entities are responsible for score impacts:
for indictment in analysis.indictments():
print(f"\nEntity: {indictment.indicted_object}")
print(f" Total impact: {indictment.score}")
for match in indictment.matches():
print(f" - {match.constraint_name}: {match.score}")
Use Cases
def validate_schedule(schedule: Schedule) -> list[str]:
"""Validate a manually created schedule."""
solution_manager.update(schedule)
if schedule.score.is_feasible:
return []
# Collect violations
violations = []
analysis = solution_manager.analyze(schedule)
for constraint in analysis.constraint_analyses():
if constraint.score.hard_score < 0:
for match in constraint.matches():
violations.append(str(match.justification))
return violations
Compare Solutions
def compare_solutions(old: Schedule, new: Schedule) -> dict:
"""Compare two solutions."""
old_analysis = solution_manager.analyze(old)
new_analysis = solution_manager.analyze(new)
return {
"old_score": str(old_analysis.score),
"new_score": str(new_analysis.score),
"improved": new_analysis.score > old_analysis.score,
"changes": get_constraint_changes(old_analysis, new_analysis),
}
def get_constraint_changes(old, new):
old_scores = {c.constraint_name: c.score for c in old.constraint_analyses()}
changes = []
for constraint in new.constraint_analyses():
old_score = old_scores.get(constraint.constraint_name)
if old_score != constraint.score:
changes.append({
"constraint": constraint.constraint_name,
"old": str(old_score),
"new": str(constraint.score),
})
return changes
Explain to Users
def explain_score(schedule: Schedule) -> dict:
"""Generate user-friendly score explanation."""
analysis = solution_manager.analyze(schedule)
hard_violations = []
soft_penalties = []
for constraint in analysis.constraint_analyses():
if constraint.score.hard_score < 0:
for match in constraint.matches():
hard_violations.append({
"rule": constraint.constraint_name,
"details": str(match.justification),
})
elif constraint.score.soft_score < 0:
soft_penalties.append({
"rule": constraint.constraint_name,
"impact": constraint.match_count,
})
return {
"is_valid": schedule.score.is_feasible,
"hard_violations": hard_violations,
"soft_penalties": soft_penalties,
"summary": generate_summary(analysis),
}
API Endpoint
from fastapi import FastAPI
@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
solution = solutions.get(job_id)
if not solution:
raise HTTPException(404)
analysis = solution_manager.analyze(solution)
return {
"score": str(analysis.score),
"is_feasible": analysis.score.is_feasible,
"constraints": [
{
"name": c.constraint_name,
"score": str(c.score),
"matches": c.match_count,
}
for c in analysis.constraint_analyses()
],
}
Debugging
Finding Score Corruption
def debug_score(solution):
"""Debug score calculation."""
# Calculate fresh
solution_manager.update(solution)
fresh_score = solution.score
# Analyze
analysis = solution_manager.analyze(solution)
analyzed_score = analysis.score
if fresh_score != analyzed_score:
print(f"Score mismatch: {fresh_score} vs {analyzed_score}")
# Check each constraint
total_hard = 0
total_soft = 0
for c in analysis.constraint_analyses():
total_hard += c.score.hard_score
total_soft += c.score.soft_score
print(f"{c.constraint_name}: {c.score}")
print(f"\nCalculated: {total_hard}hard/{total_soft}soft")
print(f"Reported: {analyzed_score}")
Finding Unexpected Matches
def find_unexpected_matches(solution, constraint_name):
"""Debug why a constraint is matching."""
analysis = solution_manager.analyze(solution)
for c in analysis.constraint_analyses():
if c.constraint_name == constraint_name:
print(f"\n{constraint_name} matches ({c.match_count}):")
for match in c.matches():
print(f" - {match.justification}")
return
print(f"Constraint '{constraint_name}' not found")
update() is fast (incremental calculation)analyze() is slower (collects all match details)- Cache analysis results if calling repeatedly
- Don’t analyze every solution during solving
Next Steps
5.5 - Benchmarking
Compare solver configurations and tune performance.
Benchmarking helps you compare different solver configurations and find the best settings for your problem.
Why Benchmark
- Compare algorithms: Find the best algorithm combination
- Tune parameters: Optimize termination times, moves, etc.
- Validate changes: Ensure improvements don’t regress
- Understand scaling: See how performance changes with problem size
Basic Benchmarking
Create a simple benchmark by running the solver multiple times:
import time
from statistics import mean, stdev
def benchmark_config(config: SolverConfig, problems: list, runs: int = 3):
"""Benchmark a solver configuration."""
results = []
for problem in problems:
problem_results = []
for run in range(runs):
factory = SolverFactory.create(config)
solver = factory.build_solver()
start = time.time()
solution = solver.solve(problem)
elapsed = time.time() - start
problem_results.append({
"score": solution.score,
"time": elapsed,
"feasible": solution.score.is_feasible,
})
results.append({
"problem": problem.id,
"avg_score": mean(r["score"].soft_score for r in problem_results),
"avg_time": mean(r["time"] for r in problem_results),
"feasibility_rate": sum(r["feasible"] for r in problem_results) / runs,
})
return results
Comparing Configurations
def compare_termination_times():
"""Compare different termination durations."""
base_config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
)
durations = [10, 30, 60, 120, 300] # seconds
problems = load_benchmark_problems()
results = {}
for duration in durations:
config = SolverConfig(
**vars(base_config),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=duration)
),
)
results[duration] = benchmark_config(config, problems)
return results
Benchmark Report
Generate a readable report:
def generate_report(results: dict):
"""Generate benchmark report."""
print("=" * 60)
print("BENCHMARK REPORT")
print("=" * 60)
for config_name, config_results in results.items():
print(f"\n{config_name}:")
print("-" * 40)
total_score = 0
total_time = 0
feasible_count = 0
for r in config_results:
print(f" {r['problem']}: score={r['avg_score']:.1f}, "
f"time={r['avg_time']:.1f}s, "
f"feasible={r['feasibility_rate']*100:.0f}%")
total_score += r["avg_score"]
total_time += r["avg_time"]
feasible_count += r["feasibility_rate"]
n = len(config_results)
print(f"\n Average: score={total_score/n:.1f}, "
f"time={total_time/n:.1f}s, "
f"feasible={feasible_count/n*100:.0f}%")
print("\n" + "=" * 60)
Problem Datasets
Create consistent benchmark datasets:
class BenchmarkDataset:
"""Collection of benchmark problems."""
@staticmethod
def small():
"""Small problems for quick testing."""
return [
generate_problem(lessons=20, rooms=3, timeslots=10),
generate_problem(lessons=30, rooms=4, timeslots=10),
]
@staticmethod
def medium():
"""Medium problems for standard benchmarks."""
return [
generate_problem(lessons=100, rooms=10, timeslots=25),
generate_problem(lessons=150, rooms=12, timeslots=25),
]
@staticmethod
def large():
"""Large problems for stress testing."""
return [
generate_problem(lessons=500, rooms=20, timeslots=50),
generate_problem(lessons=1000, rooms=30, timeslots=50),
]
Reproducible Benchmarks
For consistent results:
def reproducible_benchmark(config: SolverConfig, problem, seed: int = 42):
"""Run benchmark with fixed seed."""
config = SolverConfig(
**vars(config),
environment_mode=EnvironmentMode.REPRODUCIBLE,
random_seed=seed,
)
factory = SolverFactory.create(config)
solver = factory.build_solver()
return solver.solve(problem)
Metrics to Track
Primary Metrics
| Metric | Description |
|---|
| Best Score | Final solution quality |
| Time to Best | When best score was found |
| Feasibility Rate | % of runs finding feasible solution |
Secondary Metrics
| Metric | Description |
|---|
| Score Over Time | Score improvement curve |
| Steps per Second | Algorithm throughput |
| Memory Usage | Peak memory consumption |
Score Over Time
Track how score improves:
def benchmark_with_history(config: SolverConfig, problem):
"""Benchmark with score history."""
history = []
def on_progress(event):
history.append({
"time": event.time_spent.total_seconds(),
"score": event.new_best_score,
})
factory = SolverFactory.create(config)
solver = factory.build_solver()
solver.add_event_listener(on_progress)
solution = solver.solve(problem)
return {
"final_score": solution.score,
"history": history,
}
Visualization
Plot results with matplotlib:
import matplotlib.pyplot as plt
def plot_score_over_time(results: dict):
"""Plot score improvement over time."""
plt.figure(figsize=(10, 6))
for config_name, result in results.items():
times = [h["time"] for h in result["history"]]
scores = [h["score"].soft_score for h in result["history"]]
plt.plot(times, scores, label=config_name)
plt.xlabel("Time (seconds)")
plt.ylabel("Soft Score")
plt.title("Score Improvement Over Time")
plt.legend()
plt.grid(True)
plt.savefig("benchmark_results.png")
CI/CD Integration
Add benchmarks to your pipeline:
# test_benchmark.py
import pytest
def test_minimum_score():
"""Ensure solver achieves minimum score."""
config = load_production_config()
problem = BenchmarkDataset.small()[0]
factory = SolverFactory.create(config)
solver = factory.build_solver()
solution = solver.solve(problem)
assert solution.score.is_feasible, "Solution should be feasible"
assert solution.score.soft_score >= -100, "Score should be >= -100"
def test_performance_regression():
"""Check for performance regression."""
config = load_production_config()
problem = BenchmarkDataset.medium()[0]
start = time.time()
factory = SolverFactory.create(config)
solver = factory.build_solver()
solution = solver.solve(problem)
elapsed = time.time() - start
assert solution.score.is_feasible
assert elapsed < 120, "Should complete within 2 minutes"
Best Practices
Do
- Use consistent problem datasets
- Run multiple times (3-5) for statistical significance
- Track both score and time
- Use reproducible mode for comparisons
Don’t
- Compare results from different machines
- Use production data for benchmarks (privacy)
- Optimize for benchmark problems only
- Ignore feasibility rate
Next Steps
6 - Optimization Algorithms
Understand the algorithms that power SolverForge’s optimization.
SolverForge uses a combination of algorithms to find high-quality solutions efficiently. Understanding these algorithms helps you tune solver performance.
Topics
Algorithm Phases
SolverForge typically runs algorithms in phases:
1. Construction Heuristic
└── Builds initial solution (fast, may be suboptimal)
2. Local Search
└── Iteratively improves solution (most time spent here)
3. (Optional) Exhaustive Search
└── Proves optimality (only feasible for small problems)
Construction Heuristics
Build an initial feasible solution quickly:
| Algorithm | Description |
|---|
| First Fit | Assign first available value |
| First Fit Decreasing | Assign largest/most constrained entities first |
| Cheapest Insertion | Insert at lowest cost position |
| Allocate from Pool | Allocate entities from a pool |
Local Search Algorithms
Iteratively improve the solution:
| Algorithm | Description |
|---|
| Hill Climbing | Accept only improving moves |
| Tabu Search | Track recent moves to avoid cycles |
| Simulated Annealing | Accept worse moves with decreasing probability |
| Late Acceptance | Accept if better than solution from N steps ago |
| Great Deluge | Accept if within rising threshold |
Default Behavior
By default, SolverForge uses:
- First Fit Decreasing construction heuristic
- Late Acceptance local search
This works well for most problems. Advanced users can customize the algorithm configuration for specific use cases.
6.1 - Construction Heuristics
Build an initial solution quickly with construction heuristics.
A construction heuristic builds an initial solution by assigning values to all planning variables. It runs fast but may not find an optimal solution—that’s the job of local search.
Why Construction Heuristics?
- Fast initialization: Quickly assigns all variables
- Warm start: Gives local search a good starting point
- Automatic termination: Stops when all variables are assigned
First Fit
Algorithm
First Fit cycles through planning entities in default order, assigning each to the best available value:
- Take the first unassigned entity
- Try each possible value
- Assign the value with the best score
- Repeat until all entities are assigned
Behavior
Entity 1 → Best value found → Assigned (never changed)
Entity 2 → Best value found → Assigned (never changed)
Entity 3 → Best value found → Assigned (never changed)
...
Limitations
- Order matters: Early assignments may block better solutions
- No backtracking: Once assigned, values don’t change
- May not find feasible solution if early choices are poor
First Fit Decreasing
Algorithm
Like First Fit, but sorts entities by difficulty first:
- Sort entities by difficulty (hardest first)
- Assign difficult entities first
- Easy entities fit in remaining slots
Why It Helps
Difficult entities (those with fewer valid options) are assigned first while there are more options available. Easy entities can usually fit anywhere.
Example
For school timetabling:
- Teachers with many constraints → assigned first
- Teachers with few constraints → assigned last
Default Behavior
SolverForge uses First Fit Decreasing by default. This works well for most problems without configuration.
How It Works Internally
Phase: Construction Heuristic
├── Sort entities by difficulty
├── For each unassigned entity:
│ ├── Try each value from value range
│ ├── Calculate score impact
│ └── Assign best value
└── Done when all entities assigned
Construction vs Local Search
| Aspect | Construction | Local Search |
|---|
| Purpose | Build initial solution | Improve existing solution |
| Speed | Very fast | Runs until termination |
| Quality | Decent | Optimal/near-optimal |
| Changes | Assigns unassigned only | Modifies assigned values |
When Construction Fails
If construction can’t find a feasible solution:
- Overconstrained problem: Not enough resources for all entities
- Tight constraints: Early assignments block later ones
- Poor entity ordering: Important entities assigned last
Solutions
- Use medium constraints for “assign as many as possible”
- Add nullable planning variables
- Let local search fix infeasibilities
Monitoring Construction
from solverforge_legacy.solver import BestSolutionChangedEvent
def on_progress(event: BestSolutionChangedEvent):
if not event.is_new_best_solution_initialized:
print("Construction phase...")
else:
print("Local search phase...")
solver.add_event_listener(on_progress)
Entity Ordering
Entities are processed in declaration order by default. For better results:
- Define difficult entities first in your entity list
- Or implement difficulty comparison
Value Ordering
Values are tried in order. Better default values lead to faster construction.
Next Steps
6.2 - Local Search
Improve solutions iteratively with local search algorithms.
Local search algorithms iteratively improve a solution by making small changes called “moves.” This is where the solver spends most of its time finding better solutions.
How It Works
Start with initial solution
Repeat until termination:
1. Generate possible moves
2. Evaluate each move's score impact
3. Select a move based on acceptance criteria
4. Apply the move
5. Update best solution if improved
Local Search Algorithms
Late Acceptance
Default algorithm. Accepts moves that improve on the solution from N steps ago.
- Balances exploration and exploitation
- Escapes local optima by accepting slightly worse moves
- Simple and effective for most problems
Hill Climbing
Only accepts moves that improve the score:
- Fast convergence
- Gets stuck in local optima
- Best for easy problems or quick iterations
Tabu Search
Maintains a list of recently made moves and forbids reversing them:
- Avoids cycles and revisiting solutions
- Explores more of the search space
- Memory overhead for tabu list
Simulated Annealing
Accepts worse moves with probability that decreases over time:
- “Temperature” controls acceptance probability
- High temperature = more exploration
- Low temperature = more exploitation
- Inspired by metallurgy annealing process
Great Deluge
Accepts moves above a rising “water level” threshold:
- Threshold increases over time
- Forces gradual improvement
- Similar to simulated annealing
Move Selection
Local search evaluates moves generated by move selectors:
Move Examples:
├── Change Move: lesson.room = Room B → Room C
├── Swap Move: lesson1.room ↔ lesson2.room
├── 2-Opt Move: Reverse segment in route
└── Custom Move: Domain-specific change
See Move Selectors for details.
Understanding the Search
Score Improvement Curve
Score
^
| ****
| * **
| * ***
| * ****
| * ********
|* ***************
+---------------------------------> Time
Construction Local Search
Rapid improvement early, then diminishing returns.
Local Optima
A local optimum is a solution where no single move improves the score, but better solutions exist:
Score
^
| *
| * * Global optimum
| * * ↓
| * * *
| * * * *
| * ↑ ** *
| * Local *
| optimum
+------------------------→ Solution Space
Algorithms like Late Acceptance and Tabu Search help escape local optima.
Termination
Local search runs until termination:
TerminationConfig(
spent_limit=Duration(minutes=5), # Time limit
best_score_limit="0hard/0soft", # Score target
unimproved_spent_limit=Duration(seconds=60), # Plateau detection
)
Choosing Termination Time
| Problem Size | Suggested Time |
|---|
| Small (< 100 entities) | 10-60 seconds |
| Medium (100-1000) | 1-10 minutes |
| Large (> 1000) | 10-60 minutes |
More time generally means better scores, with diminishing returns.
Monitoring Progress
def on_progress(event: BestSolutionChangedEvent):
print(f"Time: {event.time_spent}")
print(f"Score: {event.new_best_score}")
print(f"Initialized: {event.is_new_best_solution_initialized}")
solver.add_event_listener(on_progress)
Score Plateaus
When the score stops improving:
- Stuck in local optimum: Algorithm can’t find better moves
- Near optimal: Little room for improvement
- Constraint conflict: Hard constraints blocking progress
Detecting Plateaus
TerminationConfig(
unimproved_spent_limit=Duration(seconds=60) # Stop if no improvement
)
Algorithm Selection
| Algorithm | Best For |
|---|
| Late Acceptance | Default choice, most problems |
| Hill Climbing | Simple problems, quick checks |
| Tabu Search | Problems with many local optima |
| Simulated Annealing | Complex landscapes |
Start with the default (Late Acceptance) and only change if benchmarking shows improvement.
1. Let It Run Longer
More time usually means better scores.
2. Optimize Constraints
Slow constraints = fewer moves evaluated per second.
3. Use Appropriate Moves
Some moves work better for certain problems (see Move Selectors).
4. Benchmark
Test different algorithms and parameters on your specific problem.
Next Steps
6.3 - Exhaustive Search
Find optimal solutions with exhaustive search (for small problems).
Exhaustive search algorithms explore all possible solutions to find the optimal one. They guarantee the best solution but are only practical for small problems.
When to Use
Exhaustive search is only feasible when:
- Problem is very small (< 20 entities, few values)
- You need a guaranteed optimal solution
- You have time to wait for completion
For most problems, local search finds near-optimal solutions much faster.
Branch and Bound
The main exhaustive search algorithm. It systematically explores the solution space while pruning branches that can’t improve on the best solution found.
How It Works
Root (no assignments)
/ | \
Entity1=A Entity1=B Entity1=C
/ \ | |
E2=A E2=B E2=A ...
/ \ | |
E3=A ... X (pruned)
|
(Best?)
- Build a tree of partial solutions
- At each node, try assigning a value to the next entity
- Calculate a score bound for the branch
- If bound is worse than best known solution, prune the branch
- Continue until all branches are explored or pruned
Pruning
Pruning is key to performance:
Best so far: -5hard/0soft
Current partial: -3hard/?soft
→ Continue (might improve)
Current partial: -10hard/?soft
→ Prune (can't beat best)
Brute Force
Tries every possible combination without pruning:
- Guarantees optimal solution
- Extremely slow (exponential time)
- Only for very small problems or validation
Complexity
For N entities with M possible values each:
- Combinations: M^N
- Example: 10 entities × 10 values = 10^10 = 10 billion combinations
Comparison
| Aspect | Branch and Bound | Brute Force |
|---|
| Optimality | Guaranteed | Guaranteed |
| Speed | Better (pruning) | Very slow |
| Memory | Higher | Lower |
| Use case | Small problems | Tiny problems |
Practical Limits
| Problem Size | Exhaustive Search Feasibility |
|---|
| < 10 entities | Possible (seconds to minutes) |
| 10-20 entities | Challenging (minutes to hours) |
| > 20 entities | Usually impractical |
When Local Search is Better
For most real problems, local search is the right choice:
| Problem | Entities | Exhaustive | Local Search |
|---|
| Small demo | 10 | 1 second | 1 second |
| School timetabling | 200 | Years | 30 seconds |
| Vehicle routing | 100 | Years | 1 minute |
Hybrid Approach
Use exhaustive search to validate local search:
def validate_optimality(problem):
"""
For small problems, verify local search finds optimal.
For testing only!
"""
# Run local search
local_solution = run_local_search(problem)
# Run exhaustive search (small problems only!)
optimal_solution = run_exhaustive(problem)
assert local_solution.score == optimal_solution.score
Best Practices
Do
- Use exhaustive search only for very small problems
- Use it to validate your constraint model on tiny examples
- Understand that it’s for special cases, not general use
Don’t
- Expect exhaustive search to scale
- Use it in production for real-world problems
- Wait for results on large problems (it won’t finish)
Next Steps
6.4 - Move Selectors
Reference for move types available in local search.
Move selectors generate the moves that local search evaluates. Different move types are effective for different problems.
Move Types
Change Move
Changes one planning variable to a different value:
Before: lesson.room = Room A
After: lesson.room = Room B
Best for: Assignment problems, scheduling
Swap Move
Swaps values between two entities:
Before: lesson1.room = Room A, lesson2.room = Room B
After: lesson1.room = Room B, lesson2.room = Room A
Best for: When both changes are needed for improvement
Pillar Change Move
Changes multiple entities with the same value simultaneously:
Before: [lesson1, lesson2, lesson3].room = Room A
After: [lesson1, lesson2, lesson3].room = Room B
Best for: Grouped entities that should move together
Pillar Swap Move
Swaps values between two groups of entities:
Before: [l1, l2].room = A, [l3, l4].room = B
After: [l1, l2].room = B, [l3, l4].room = A
Best for: Problems with entity groups
List Change Move (for List Variables)
Changes an element’s position in a list:
Before: vehicle.visits = [A, B, C, D]
Move: Move C from position 2 to position 0
After: vehicle.visits = [C, A, B, D]
Best for: Routing, sequencing
List Swap Move
Swaps two elements within or between lists:
Before: vehicle1.visits = [A, B], vehicle2.visits = [C, D]
Move: Swap B and C
After: vehicle1.visits = [A, C], vehicle2.visits = [B, D]
Best for: Rebalancing routes
2-Opt Move
Reverses a segment of a list:
Before: vehicle.visits = [A, B, C, D, E]
Move: Reverse [B, C, D]
After: vehicle.visits = [A, D, C, B, E]
Best for: Routing (reduces “crossing” paths)
Sublist Change Move
Moves a subsequence to a different position:
Before: vehicle.visits = [A, B, C, D, E]
Move: Move [B, C] to end
After: vehicle.visits = [A, D, E, B, C]
Best for: Batch relocations
Sublist Swap Move
Swaps two subsequences:
Before: vehicle1.visits = [A, B, C], vehicle2.visits = [X, Y, Z]
Move: Swap [B, C] and [Y, Z]
After: vehicle1.visits = [A, Y, Z], vehicle2.visits = [X, B, C]
Best for: Inter-route optimization
Default Move Selectors
SolverForge automatically selects appropriate moves based on your variable types:
| Variable Type | Default Moves |
|---|
PlanningVariable | Change, Swap |
PlanningListVariable | List Change, List Swap, 2-Opt |
Move Selection Process
1. Selector generates candidate moves
2. Each move is evaluated (score calculated)
3. Acceptance criteria decides to apply or not
4. Repeat
Move Efficiency
Incremental Scoring
Moves are scored incrementally—only recalculating affected constraints:
Change lesson.room = A → B
Only recalculate:
├── Room conflict (for A and B)
├── Teacher room stability
└── (Other constraints unaffected)
This makes move evaluation fast.
Move Speed
Typical moves evaluated per second:
| Scenario | Moves/Second |
|---|
| Simple constraints | 10,000+ |
| Complex constraints | 1,000-10,000 |
| Very complex | 100-1,000 |
More moves = more exploration = better solutions (usually).
Filtering Moves
The solver automatically filters invalid moves:
- Moves that don’t change anything (same value)
- Moves that violate pinning
- Moves on uninitialized variables
Move Caching
To avoid regenerating the same moves:
- Construction moves are cached
- Local search moves are regenerated (solution changes)
Move selection affects:
- Diversity: Different move types explore different parts of the search space
- Speed: Some moves are faster to evaluate
- Effectiveness: Some moves are more likely to find improvements
Problem-Specific Guidance
- Change moves: Reassign timeslot, room, employee
- Swap moves: Exchange assignments
- Default selection works well
Routing (VRP)
- List moves: Reorder visits
- 2-Opt: Eliminate crossing paths
- Sublist moves: Move segments between vehicles
Assignment (Task Assignment, Bin Packing)
- Change moves: Reassign to different resource
- Swap moves: Exchange assignments
- Pillar moves: Move groups together
Troubleshooting
Slow Moves
If moves are slow:
- Check constraint complexity
- Optimize filtering (use joiners)
- Reduce problem size
Poor Improvement
If solutions don’t improve:
- Run longer
- Ensure moves can reach better solutions
- Check if stuck in local optimum
Next Steps
7 - Design Patterns
Common patterns for handling real-world planning scenarios.
Real-world planning problems often require more than basic optimization. This section covers patterns for common scenarios.
Topics
Real-Time Planning
Handle dynamic changes during solving:
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange[Timetable]):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, working_solution: Timetable, score_director):
# Add the new lesson to the working solution
working_solution.lessons.append(self.lesson)
score_director.after_entity_added(self.lesson)
# Apply change while solver is running
solver.add_problem_change(AddLessonChange(new_lesson))
Continuous Planning
For problems that span long time periods, use a rolling horizon:
- Plan Window - Only optimize a subset of the timeline
- Published Window - Lock decisions that are being executed
- Draft Window - Future decisions that can still change
When to Use These Patterns
| Scenario | Pattern |
|---|
| New orders arrive during planning | Real-Time Planning |
| Plan extends into the future | Continuous Planning |
| Daily/weekly batch optimization | Repeated Planning |
| Vehicle breakdowns, cancellations | Real-Time Planning |
| Rolling weekly schedules | Continuous Planning |
7.1 - Real-Time Planning
Handle changes while the solver is running.
Real-time planning allows you to modify the problem while the solver is running. This is essential for handling dynamic changes like new orders, cancellations, or resource changes.
Problem Changes
Use ProblemChange to modify the working solution:
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange[Timetable]):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, working_solution: Timetable, score_director):
# Add to solution
working_solution.lessons.append(self.lesson)
# Notify score director
score_director.after_entity_added(self.lesson)
Applying Changes
# With Solver
new_lesson = Lesson("new-1", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))
# With SolverManager
solver_manager.add_problem_change(job_id, AddLessonChange(new_lesson))
Common Change Types
Add Entity
class AddVisitChange(ProblemChange[RoutePlan]):
def __init__(self, visit: Visit):
self.visit = visit
def do_change(self, solution: RoutePlan, score_director):
solution.visits.append(self.visit)
score_director.after_entity_added(self.visit)
Remove Entity
class RemoveVisitChange(ProblemChange[RoutePlan]):
def __init__(self, visit_id: str):
self.visit_id = visit_id
def do_change(self, solution: RoutePlan, score_director):
visit = next(v for v in solution.visits if v.id == self.visit_id)
# Remove from vehicle if assigned
if visit.vehicle:
score_director.before_list_variable_changed(
visit.vehicle, "visits", visit.vehicle.visits
)
visit.vehicle.visits.remove(visit)
score_director.after_list_variable_changed(
visit.vehicle, "visits", visit.vehicle.visits
)
# Remove from solution
score_director.before_entity_removed(visit)
solution.visits.remove(visit)
score_director.after_entity_removed(visit)
Modify Entity
class UpdateVisitDemandChange(ProblemChange[RoutePlan]):
def __init__(self, visit_id: str, new_demand: int):
self.visit_id = visit_id
self.new_demand = new_demand
def do_change(self, solution: RoutePlan, score_director):
visit = next(v for v in solution.visits if v.id == self.visit_id)
score_director.before_problem_property_changed(visit)
visit.demand = self.new_demand
score_director.after_problem_property_changed(visit)
Add Problem Fact
class AddVehicleChange(ProblemChange[RoutePlan]):
def __init__(self, vehicle: Vehicle):
self.vehicle = vehicle
def do_change(self, solution: RoutePlan, score_director):
solution.vehicles.append(self.vehicle)
score_director.after_problem_fact_added(self.vehicle)
Score Director Notifications
Always notify the score director of changes:
| Method | When to Use |
|---|
after_entity_added() | Added planning entity |
before/after_entity_removed() | Removing planning entity |
before/after_variable_changed() | Changed planning variable |
before/after_list_variable_changed() | Changed list variable |
before/after_problem_property_changed() | Changed entity property |
after_problem_fact_added() | Added problem fact |
before/after_problem_fact_removed() | Removing problem fact |
Order Matters
For removals and changes, call before_* first:
score_director.before_entity_removed(entity)
# Actually remove
solution.entities.remove(entity)
score_director.after_entity_removed(entity)
Real-Time API Example
from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager, ProblemChange
app = FastAPI()
solver_manager: SolverManager
@app.post("/visits")
async def add_visit(visit: VisitRequest, job_id: str):
"""Add a visit to an active solving job."""
new_visit = Visit(
id=str(uuid.uuid4()),
location=visit.location,
demand=visit.demand,
)
solver_manager.add_problem_change(
job_id,
AddVisitChange(new_visit)
)
return {"id": new_visit.id, "status": "added"}
@app.delete("/visits/{visit_id}")
async def remove_visit(visit_id: str, job_id: str):
"""Remove a visit from an active solving job."""
solver_manager.add_problem_change(
job_id,
RemoveVisitChange(visit_id)
)
return {"status": "removed"}
Change Ordering
Changes are applied in the order they’re submitted:
solver.add_problem_change(change1) # Applied first
solver.add_problem_change(change2) # Applied second
solver.add_problem_change(change3) # Applied third
Best Practices
Do
- Keep changes small and focused
- Notify score director of all modifications
- Use
before_* methods for removals/changes - Test changes in isolation
Don’t
- Make changes without notifying score director
- Modify multiple entities in one complex change
- Forget to handle entity relationships
Debugging Changes
class DebugChange(ProblemChange[Solution]):
def __init__(self, inner: ProblemChange):
self.inner = inner
def do_change(self, solution, score_director):
print(f"Before: {len(solution.entities)} entities")
self.inner.do_change(solution, score_director)
print(f"After: {len(solution.entities)} entities")
Next Steps
7.2 - Continuous Planning
Rolling horizon and replanning strategies.
Continuous planning handles problems that span long time periods by using a rolling planning window. Instead of planning everything at once, you plan a window and move it forward as time passes.
The Challenge
Planning a full year of shifts at once:
- Huge problem size
- Far-future plans become irrelevant
- Real-world changes invalidate long-term plans
Rolling Horizon
Plan only a window of time, then slide it forward:
Time ──────────────────────────────────────────►
Window 1: [====Plan====]
Window 2: [====Plan====]
Window 3: [====Plan====]
Implementation
from datetime import datetime, timedelta
def plan_window(start_date: date, window_days: int, problem: Schedule) -> Schedule:
"""Plan a time window."""
end_date = start_date + timedelta(days=window_days)
# Filter entities to window
window_shifts = [
s for s in problem.shifts
if start_date <= s.date < end_date
]
window_problem = Schedule(
employees=problem.employees,
shifts=window_shifts,
)
solver = create_solver()
return solver.solve(window_problem)
def continuous_plan(problem: Schedule, window_days: int = 14):
"""Run continuous planning with rolling windows."""
current_date = date.today()
end_date = max(s.date for s in problem.shifts)
while current_date < end_date:
solution = plan_window(current_date, window_days, problem)
save_solution(solution)
# Move window forward
current_date += timedelta(days=7) # Overlap
Published vs Draft
Divide the window into published (locked) and draft (changeable):
Time ──────────────────────────────────────────►
[Published][====Draft====]
(Locked) (Can change)
Implementation with Pinning
def prepare_window(problem: Schedule, publish_deadline: datetime):
"""Pin published shifts, leave draft unpinned."""
for shift in problem.shifts:
if shift.start_time < publish_deadline:
shift.pinned = True
else:
shift.pinned = False
return problem
Replanning Triggers
Replan when:
- Time-based: Every hour, day, or week
- Event-based: New orders, cancellations, resource changes
- Threshold-based: When score degrades below threshold
Event-Based Replanning
def on_new_order(order: Order, active_job_id: str):
"""Trigger replanning when new order arrives."""
solver_manager.terminate_early(active_job_id)
updated_problem = load_current_state()
updated_problem.orders.append(order)
new_job_id = start_solving(updated_problem)
return new_job_id
Warm Starting
Start from the previous solution to preserve good assignments:
def warm_start_plan(previous: Schedule, new_shifts: list[Shift]) -> Schedule:
"""Start from previous solution, add new shifts."""
# Keep previous assignments (pinned or as starting point)
for shift in previous.shifts:
if shift.employee is not None:
shift.pinned = True # Or just leave assigned
# Add new unassigned shifts
for shift in new_shifts:
shift.employee = None
shift.pinned = False
previous.shifts.append(shift)
return solve(previous)
Time Windows
Sliding Window
Week 1: Plan days 1-14
Week 2: Plan days 8-21 (7-day overlap)
Week 3: Plan days 15-28
The overlap allows replanning of near-future assignments.
Growing Window
For finite problems, grow the window:
Day 1: Plan days 1-7
Day 2: Plan days 1-14
Day 3: Plan days 1-21
...until complete
Handling Conflicts
When replanning conflicts with executed work:
def merge_with_reality(planned: Schedule, actual: Schedule) -> Schedule:
"""Merge planned schedule with actual execution."""
for planned_shift in planned.shifts:
actual_shift = find_actual(actual, planned_shift.id)
if actual_shift and actual_shift.is_started:
# Can't change started shifts
planned_shift.employee = actual_shift.employee
planned_shift.pinned = True
return planned
Best Practices
Do
- Use overlapping windows for smoother transitions
- Pin executed/committed work
- Warm start from previous solutions
- Handle edge cases (window boundaries)
Don’t
- Plan too far ahead (changes will invalidate)
- Forget to merge with reality
- Ignore the transition between windows
Example: Weekly Scheduling
class WeeklyScheduler:
def __init__(self):
self.solver_manager = create_solver_manager()
def plan_next_week(self):
"""Run weekly planning cycle."""
# Load current state
current = load_current_schedule()
# Determine window
today = date.today()
window_start = today + timedelta(days=(7 - today.weekday())) # Next Monday
window_end = window_start + timedelta(days=14)
# Pin this week (being executed)
for shift in current.shifts:
if shift.date < window_start:
shift.pinned = True
elif shift.date < window_end:
shift.pinned = False # Can replan
else:
continue # Outside window
# Solve
solution = self.solve(current)
# Publish next week
publish_week(solution, window_start, window_start + timedelta(days=7))
return solution
Next Steps
7.3 - Repeated Planning
Batch optimization and periodic replanning.
Repeated planning runs the solver on a regular schedule, optimizing batches of work. Unlike continuous planning, each run is independent.
Use Cases
- Daily route optimization
- Weekly shift scheduling
- Periodic resource allocation
- Batch order assignment
Basic Pattern
from datetime import datetime
import schedule
import time
def daily_optimization():
"""Run optimization every day at 2 AM."""
# Load today's problem
problem = load_todays_problem()
# Solve
solver = create_solver()
solution = solver.solve(problem)
# Save results
save_solution(solution)
notify_stakeholders(solution)
# Schedule daily run
schedule.every().day.at("02:00").do(daily_optimization)
while True:
schedule.run_pending()
time.sleep(60)
Batch Processing
Process multiple independent problems:
def optimize_all_regions():
"""Optimize each region independently."""
regions = load_regions()
results = {}
for region in regions:
problem = load_region_problem(region)
solution = solve(problem)
results[region] = solution
save_solution(region, solution)
return results
Parallel Batch Processing
from concurrent.futures import ThreadPoolExecutor
def optimize_regions_parallel():
"""Optimize regions in parallel."""
regions = load_regions()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(solve_region, region): region
for region in regions
}
results = {}
for future in futures:
region = futures[future]
results[region] = future.result()
return results
Time-Based Replanning
Fixed Schedule
# Every hour
schedule.every().hour.do(replan)
# Every day at specific time
schedule.every().day.at("06:00").do(replan)
# Every Monday
schedule.every().monday.at("00:00").do(weekly_plan)
Cron-Based
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Run at 2 AM every day
scheduler.add_job(daily_optimization, 'cron', hour=2)
# Run every 30 minutes
scheduler.add_job(frequent_replan, 'cron', minute='*/30')
scheduler.start()
Handling Failures
def robust_optimization():
"""Optimization with retry and fallback."""
max_retries = 3
for attempt in range(max_retries):
try:
problem = load_problem()
solution = solve(problem)
save_solution(solution)
return solution
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(60) # Wait before retry
else:
# Use previous solution as fallback
return load_previous_solution()
Comparing Solutions
Track solution quality over time:
def track_solution_quality(solution: Schedule):
"""Log solution metrics for analysis."""
metrics = {
"timestamp": datetime.now().isoformat(),
"score": str(solution.score),
"feasible": solution.score.is_feasible,
"entity_count": len(solution.shifts),
"assigned_count": sum(1 for s in solution.shifts if s.employee),
}
log_metrics(metrics)
# Alert if quality degrades
if not solution.score.is_feasible:
send_alert("Infeasible solution generated!")
Incremental vs Fresh
Fresh Start
Each run starts from scratch:
def fresh_optimization():
problem = load_problem()
# All entities unassigned
for entity in problem.entities:
entity.planning_variable = None
return solve(problem)
Incremental (Warm Start)
Start from previous solution:
def incremental_optimization():
previous = load_previous_solution()
# Keep good assignments, clear bad ones
for entity in previous.entities:
if should_keep(entity):
entity.pinned = True
else:
entity.planning_variable = None
entity.pinned = False
return solve(previous)
Monitoring
class OptimizationMonitor:
def __init__(self):
self.runs = []
def record_run(self, solution, duration):
self.runs.append({
"time": datetime.now(),
"score": solution.score,
"duration": duration,
"feasible": solution.score.is_feasible,
})
def get_statistics(self):
if not self.runs:
return None
feasible_rate = sum(r["feasible"] for r in self.runs) / len(self.runs)
avg_duration = sum(r["duration"] for r in self.runs) / len(self.runs)
return {
"total_runs": len(self.runs),
"feasibility_rate": feasible_rate,
"avg_duration_seconds": avg_duration,
}
Best Practices
Do
- Log all runs for analysis
- Implement retry logic
- Monitor solution quality trends
- Use appropriate scheduling library
Don’t
- Run optimization during peak hours
- Ignore failures silently
- Forget to save results
- Overload with too frequent replanning
Next Steps
8 - Integration
Integrate SolverForge with web frameworks and other systems.
SolverForge integrates easily with Python web frameworks and data systems.
Topics
FastAPI Example
from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager
app = FastAPI()
solver_manager = SolverManager.create(solver_factory)
@app.post("/solve")
async def solve(problem: Timetable) -> str:
job_id = str(uuid.uuid4())
solver_manager.solve_and_listen(
job_id,
lambda _: problem,
on_best_solution_changed
)
return job_id
@app.get("/solution/{job_id}")
async def get_solution(job_id: str) -> Timetable:
return solver_manager.get_best_solution(job_id)
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
solver_manager.terminate_early(job_id)
Serialization
SolverForge domain objects are standard Python dataclasses, making them easy to serialize:
import json
from dataclasses import asdict
# Serialize to JSON
json_str = json.dumps(asdict(solution))
# With Pydantic for validation
from pydantic.dataclasses import dataclass as pydantic_dataclass
@pydantic_dataclass
class TimetableDTO:
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
Database Integration
Use any Python ORM (SQLAlchemy, Django ORM, etc.) for persistence:
- Load data from database into domain objects
- Run the solver
- Save results back to database
The solver works with in-memory Python objects, so any data source that can produce those objects will work.
8.1 - FastAPI Integration
Build REST APIs for your solver with FastAPI.
FastAPI is a modern Python web framework that works well with SolverForge. This guide shows common patterns for building solver APIs.
Basic Setup
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid
from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
# Global state
solver_manager: SolverManager | None = None
solutions: dict[str, Solution] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize solver on startup, cleanup on shutdown."""
global solver_manager
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)
solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)
yield
if solver_manager:
solver_manager.close()
app = FastAPI(
title="Solver API",
description="Planning optimization API",
lifespan=lifespan,
)
API Endpoints
Submit Problem
@app.post("/solve", response_model=str)
async def submit_problem(request: ProblemRequest) -> str:
"""Submit a problem for solving. Returns job ID."""
job_id = str(uuid.uuid4())
problem = request.to_domain()
def on_best_solution(solution):
solutions[job_id] = solution
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_best_solution,
)
return job_id
Get Solution
@app.get("/solution/{job_id}", response_model=SolutionResponse)
async def get_solution(job_id: str) -> SolutionResponse:
"""Get the current best solution."""
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
status = solver_manager.get_solver_status(job_id)
return SolutionResponse.from_domain(solution, status)
Stop Solving
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
"""Stop solving early."""
solver_manager.terminate_early(job_id)
return {"status": "terminating"}
Get Score Analysis
@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
"""Get detailed score analysis."""
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
analysis = solution_manager.analyze(solution)
return {
"score": str(analysis.score),
"is_feasible": analysis.score.is_feasible,
"constraints": [
{
"name": c.constraint_name,
"score": str(c.score),
"matches": c.match_count,
}
for c in analysis.constraint_analyses()
],
}
Request/Response Models
Use Pydantic for validation:
from pydantic import BaseModel
from datetime import time
class TimeslotDTO(BaseModel):
day: str
start_time: str
end_time: str
def to_domain(self) -> Timeslot:
return Timeslot(
self.day,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
@classmethod
def from_domain(cls, timeslot: Timeslot) -> "TimeslotDTO":
return cls(
day=timeslot.day,
start_time=timeslot.start_time.isoformat(),
end_time=timeslot.end_time.isoformat(),
)
class ProblemRequest(BaseModel):
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
def to_domain(self) -> Timetable:
timeslots = [t.to_domain() for t in self.timeslots]
rooms = [r.to_domain() for r in self.rooms]
lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
return Timetable("api", timeslots, rooms, lessons)
class SolutionResponse(BaseModel):
status: str
score: str | None
is_feasible: bool | None
lessons: list[LessonDTO]
@classmethod
def from_domain(cls, solution: Timetable, status) -> "SolutionResponse":
return cls(
status=status.name,
score=str(solution.score) if solution.score else None,
is_feasible=solution.score.is_feasible if solution.score else None,
lessons=[LessonDTO.from_domain(l) for l in solution.lessons],
)
Real-Time Updates
Problem Changes
@app.post("/solve/{job_id}/lessons")
async def add_lesson(job_id: str, lesson: LessonDTO):
"""Add a lesson to an active job."""
new_lesson = lesson.to_domain()
solver_manager.add_problem_change(
job_id,
AddLessonChange(new_lesson)
)
return {"status": "added", "id": new_lesson.id}
WebSocket Updates
from fastapi import WebSocket
@app.websocket("/ws/{job_id}")
async def websocket_updates(websocket: WebSocket, job_id: str):
await websocket.accept()
async def send_update(solution):
await websocket.send_json({
"score": str(solution.score),
"timestamp": datetime.now().isoformat(),
})
# Register listener
# (Implementation depends on your event system)
try:
while True:
await asyncio.sleep(1)
if job_id in solutions:
await send_update(solutions[job_id])
except WebSocketDisconnect:
pass
Error Handling
from fastapi import HTTPException
from fastapi.responses import JSONResponse
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
return JSONResponse(
status_code=500,
content={"error": str(exc)},
)
@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
if job_id not in solutions:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} not found"
)
# ...
Testing
from fastapi.testclient import TestClient
def test_submit_and_get():
client = TestClient(app)
# Submit problem
response = client.post("/solve", json=problem_data)
assert response.status_code == 200
job_id = response.json()
# Wait for solving
time.sleep(5)
# Get solution
response = client.get(f"/solution/{job_id}")
assert response.status_code == 200
assert response.json()["is_feasible"]
Deployment
Docker
FROM python:3.11-slim
# Install JDK
RUN apt-get update && apt-get install -y openjdk-17-jdk
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Running
# Development
uvicorn main:app --reload
# Production
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Next Steps
8.2 - Serialization
JSON serialization with dataclasses and Pydantic.
SolverForge domain objects are Python dataclasses, making them easy to serialize to JSON for APIs and storage.
Basic JSON Serialization
Using dataclasses
from dataclasses import dataclass, asdict
import json
@dataclass
class Timeslot:
day: str
start_time: str
end_time: str
@dataclass
class Room:
name: str
# Serialize
timeslot = Timeslot("MONDAY", "08:30", "09:30")
json_str = json.dumps(asdict(timeslot))
# {"day": "MONDAY", "start_time": "08:30", "end_time": "09:30"}
# Deserialize
data = json.loads(json_str)
timeslot = Timeslot(**data)
Handling Complex Types
For types like datetime and time:
from dataclasses import dataclass
from datetime import time
import json
class TimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, time):
return obj.isoformat()
return super().default(obj)
def time_decoder(dct):
for key, value in dct.items():
if key.endswith('_time') and isinstance(value, str):
try:
dct[key] = time.fromisoformat(value)
except ValueError:
pass
return dct
# Serialize
json_str = json.dumps(asdict(obj), cls=TimeEncoder)
# Deserialize
data = json.loads(json_str, object_hook=time_decoder)
Pydantic Integration
Pydantic provides automatic validation and serialization:
DTO Pattern
Separate API models from domain models:
from pydantic import BaseModel
from datetime import time
# API model
class TimeslotDTO(BaseModel):
day: str
start_time: str
end_time: str
def to_domain(self) -> Timeslot:
return Timeslot(
self.day,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
@classmethod
def from_domain(cls, timeslot: Timeslot) -> "TimeslotDTO":
return cls(
day=timeslot.day,
start_time=timeslot.start_time.isoformat(),
end_time=timeslot.end_time.isoformat(),
)
# Domain model (unchanged)
@dataclass
class Timeslot:
day: str
start_time: time
end_time: time
Full Example
from pydantic import BaseModel
from datetime import time
class LessonDTO(BaseModel):
id: str
subject: str
teacher: str
student_group: str
timeslot: TimeslotDTO | None = None
room: RoomDTO | None = None
def to_domain(self, timeslots: list[Timeslot], rooms: list[Room]) -> Lesson:
ts = None
if self.timeslot:
ts = find_timeslot(timeslots, self.timeslot)
rm = None
if self.room:
rm = find_room(rooms, self.room)
return Lesson(
id=self.id,
subject=self.subject,
teacher=self.teacher,
student_group=self.student_group,
timeslot=ts,
room=rm,
)
@classmethod
def from_domain(cls, lesson: Lesson) -> "LessonDTO":
return cls(
id=lesson.id,
subject=lesson.subject,
teacher=lesson.teacher,
student_group=lesson.student_group,
timeslot=TimeslotDTO.from_domain(lesson.timeslot) if lesson.timeslot else None,
room=RoomDTO.from_domain(lesson.room) if lesson.room else None,
)
class TimetableDTO(BaseModel):
id: str
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
score: str | None = None
def to_domain(self) -> Timetable:
timeslots = [t.to_domain() for t in self.timeslots]
rooms = [r.to_domain() for r in self.rooms]
lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
return Timetable(self.id, timeslots, rooms, lessons)
@classmethod
def from_domain(cls, timetable: Timetable) -> "TimetableDTO":
return cls(
id=timetable.id,
timeslots=[TimeslotDTO.from_domain(t) for t in timetable.timeslots],
rooms=[RoomDTO.from_domain(r) for r in timetable.rooms],
lessons=[LessonDTO.from_domain(l) for l in timetable.lessons],
score=str(timetable.score) if timetable.score else None,
)
Reference Resolution
When deserializing, resolve references to shared objects:
def find_timeslot(timeslots: list[Timeslot], dto: TimeslotDTO) -> Timeslot:
"""Find matching timeslot by properties."""
for ts in timeslots:
if (ts.day == dto.day and
ts.start_time.isoformat() == dto.start_time):
return ts
raise ValueError(f"Timeslot not found: {dto}")
def find_room(rooms: list[Room], dto: RoomDTO) -> Room:
"""Find matching room by name."""
for room in rooms:
if room.name == dto.name:
return room
raise ValueError(f"Room not found: {dto}")
Score Serialization
from solverforge_legacy.solver.score import HardSoftScore
# To string
score_str = str(solution.score) # "-2hard/-15soft"
# From string
score = HardSoftScore.parse("-2hard/-15soft")
# To dict
score_dict = {
"hard": solution.score.hard_score,
"soft": solution.score.soft_score,
"feasible": solution.score.is_feasible,
}
Database Persistence
SQLAlchemy Example
from sqlalchemy import Column, String, Integer, ForeignKey
from sqlalchemy.orm import relationship
class TimeslotModel(Base):
__tablename__ = "timeslots"
id = Column(Integer, primary_key=True)
day = Column(String)
start_time = Column(String)
end_time = Column(String)
def to_domain(self) -> Timeslot:
return Timeslot(
self.day,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
class LessonModel(Base):
__tablename__ = "lessons"
id = Column(String, primary_key=True)
subject = Column(String)
teacher = Column(String)
student_group = Column(String)
timeslot_id = Column(Integer, ForeignKey("timeslots.id"), nullable=True)
room_id = Column(Integer, ForeignKey("rooms.id"), nullable=True)
timeslot = relationship("TimeslotModel")
room = relationship("RoomModel")
Best Practices
Do
- Use DTOs for API boundaries
- Validate input with Pydantic
- Handle None values explicitly
- Use consistent naming conventions
Don’t
- Serialize domain objects directly (may expose internals)
- Forget to handle score serialization
- Ignore reference resolution
- Mix API and domain models
Next Steps
8.3 - Logging
Configure logging for debugging and monitoring.
Configure Python logging to monitor solver behavior and debug issues.
Basic Configuration
import logging
# Configure root logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Get logger for your app
logger = logging.getLogger("my_app")
Solver Logging
The solver uses the ai.timefold logger hierarchy:
# Enable solver debug logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)
# Or just specific components
logging.getLogger("ai.timefold.solver").setLevel(logging.DEBUG)
Log Levels
| Level | Use Case |
|---|
| DEBUG | Detailed solver internals |
| INFO | Progress updates, scores |
| WARNING | Potential issues |
| ERROR | Failures |
Progress Logging
Log solver progress with event listeners:
from solverforge_legacy.solver import BestSolutionChangedEvent
logger = logging.getLogger("solver")
def on_progress(event: BestSolutionChangedEvent):
logger.info(
f"Score: {event.new_best_score} | "
f"Time: {event.time_spent} | "
f"Initialized: {event.is_new_best_solution_initialized}"
)
solver.add_event_listener(on_progress)
File Logging
Write logs to a file:
import logging
# Create file handler
file_handler = logging.FileHandler("solver.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
# Add to logger
logging.getLogger().addHandler(file_handler)
Structured Logging
For production, use structured logging:
import json
import logging
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "score"):
log_data["score"] = record.score
if hasattr(record, "job_id"):
log_data["job_id"] = record.job_id
return json.dumps(log_data)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.getLogger().addHandler(handler)
Logging with Context
def log_with_context(logger, job_id, message, **kwargs):
extra = {"job_id": job_id, **kwargs}
logger.info(message, extra=extra)
# Usage
log_with_context(logger, "job-123", "Solving started", entities=100)
FastAPI Logging
from fastapi import FastAPI, Request
import logging
import time
logger = logging.getLogger("api")
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
logger.info(
f"{request.method} {request.url.path} "
f"- {response.status_code} - {duration:.3f}s"
)
return response
Debugging Tips
Enable Verbose Logging
# Maximum verbosity
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)
Log Constraint Matches
def debug_constraints(solution):
logger = logging.getLogger("constraints")
analysis = solution_manager.analyze(solution)
for constraint in analysis.constraint_analyses():
logger.debug(
f"{constraint.constraint_name}: "
f"score={constraint.score}, matches={constraint.match_count}"
)
for match in constraint.matches():
logger.debug(f" - {match.justification}")
Log Configuration
def log_config(config: SolverConfig):
logger = logging.getLogger("config")
logger.info(f"Solution class: {config.solution_class}")
logger.info(f"Entity classes: {config.entity_class_list}")
logger.info(f"Termination: {config.termination_config}")
Production Recommendations
Log Aggregation
Send logs to a central system:
# Example with Python logging to stdout (for container orchestration)
logging.basicConfig(
level=logging.INFO,
format='%(message)s', # JSON formatted
stream=sys.stdout,
)
Metrics
Track key metrics:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SolveMetrics:
job_id: str
start_time: datetime
end_time: datetime | None = None
final_score: str | None = None
is_feasible: bool | None = None
def log(self):
duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0
logger.info(
f"Job {self.job_id}: "
f"duration={duration:.1f}s, "
f"score={self.final_score}, "
f"feasible={self.is_feasible}"
)
Alerting
Alert on issues:
def check_solution_quality(solution, job_id):
if not solution.score.is_feasible:
logger.warning(f"Job {job_id} produced infeasible solution!")
send_alert(f"Infeasible solution for job {job_id}")
if solution.score.soft_score < -10000:
logger.warning(f"Job {job_id} has poor soft score: {solution.score}")
Next Steps
9 - Reference
API reference and frequently asked questions.
Quick reference guides and answers to common questions.
Topics
- API Summary - Quick reference for key classes and functions
- FAQ - Frequently asked questions
Key Imports
# Domain modeling
from solverforge_legacy.solver.domain import (
planning_entity,
planning_solution,
PlanningId,
PlanningVariable,
PlanningListVariable,
PlanningEntityCollectionProperty,
ProblemFactCollectionProperty,
ValueRangeProvider,
PlanningScore,
PlanningPin,
InverseRelationShadowVariable,
PreviousElementShadowVariable,
NextElementShadowVariable,
CascadingUpdateShadowVariable,
)
# Constraints
from solverforge_legacy.solver.score import (
constraint_provider,
ConstraintFactory,
Constraint,
Joiners,
ConstraintCollectors,
HardSoftScore,
HardMediumSoftScore,
SimpleScore,
)
# Solver
from solverforge_legacy.solver import (
SolverFactory,
SolverManager,
SolutionManager,
ProblemChange,
)
# Configuration
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
Score Types
| Score Type | Levels | Use Case |
|---|
SimpleScore | 1 | Single optimization objective |
HardSoftScore | 2 | Feasibility (hard) + optimization (soft) |
HardMediumSoftScore | 3 | Hard + important preferences + nice-to-have |
BendableScore | N | Custom number of levels |
*DecimalScore | - | Decimal precision variants |
9.1 - API Summary
Quick reference for SolverForge Python API.
Quick reference for commonly used SolverForge APIs.
Domain Decorators
from solverforge_legacy.solver.domain import (
planning_entity,
planning_solution,
)
| Decorator | Purpose |
|---|
@planning_entity | Mark a class as a planning entity |
@planning_solution | Mark a class as the planning solution |
Type Annotations
from solverforge_legacy.solver.domain import (
PlanningId,
PlanningVariable,
PlanningListVariable,
PlanningEntityCollectionProperty,
ProblemFactCollectionProperty,
ValueRangeProvider,
PlanningScore,
PlanningPin,
PlanningPinToIndex,
)
| Annotation | Use With | Purpose |
|---|
PlanningId | Entity field | Unique identifier |
PlanningVariable | Entity field | Variable to optimize |
PlanningListVariable | Entity field | Ordered list of entities |
PlanningEntityCollectionProperty | Solution field | Collection of entities |
ProblemFactCollectionProperty | Solution field | Immutable input data |
ValueRangeProvider | Solution field | Possible values for variables |
PlanningScore | Solution field | Where score is stored |
PlanningPin | Entity field | Lock entity assignment |
PlanningPinToIndex | Entity field | Lock list position |
Usage Pattern
from typing import Annotated
from dataclasses import dataclass, field
@planning_entity
@dataclass
class Lesson:
id: Annotated[str, PlanningId]
timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
Shadow Variable Annotations
from solverforge_legacy.solver.domain import (
InverseRelationShadowVariable,
PreviousElementShadowVariable,
NextElementShadowVariable,
CascadingUpdateShadowVariable,
)
| Annotation | Purpose |
|---|
InverseRelationShadowVariable | Back-reference to list owner |
PreviousElementShadowVariable | Previous element in list |
NextElementShadowVariable | Next element in list |
CascadingUpdateShadowVariable | Computed value that cascades |
Score Types
from solverforge_legacy.solver.score import (
SimpleScore,
HardSoftScore,
HardMediumSoftScore,
HardSoftDecimalScore,
)
| Type | Levels | Example |
|---|
SimpleScore | 1 | -5 |
HardSoftScore | 2 | -2hard/-15soft |
HardMediumSoftScore | 3 | -1hard/-3medium/-10soft |
HardSoftDecimalScore | 2 (decimal) | -2hard/-15.5soft |
Common Operations
score = HardSoftScore.of(-2, -15)
score.hard_score # -2
score.soft_score # -15
score.is_feasible # False (hard_score < 0)
# Constants
HardSoftScore.ZERO
HardSoftScore.ONE_HARD
HardSoftScore.ONE_SOFT
Constraint Streams
from solverforge_legacy.solver.score import (
constraint_provider,
ConstraintFactory,
Constraint,
Joiners,
ConstraintCollectors,
)
ConstraintFactory Methods
| Method | Purpose |
|---|
for_each(Class) | Start stream with all instances |
for_each_unique_pair(Class, *Joiners) | All unique pairs |
for_each_including_unassigned(Class) | Include entities with null variables |
Stream Operations
| Method | Purpose |
|---|
.filter(predicate) | Filter elements |
.join(Class, *Joiners) | Join with another class |
.if_exists(Class, *Joiners) | Keep if matching exists |
.if_not_exists(Class, *Joiners) | Keep if no matching exists |
.group_by(groupKey, collector) | Group and aggregate |
.flatten_last(mapper) | Expand collection |
.map(mapper) | Transform elements |
.complement(Class, filler) | Add missing elements |
Terminal Operations
| Method | Purpose |
|---|
.penalize(Score) | Add penalty |
.penalize(Score, weigher) | Weighted penalty |
.reward(Score) | Add reward |
.reward(Score, weigher) | Weighted reward |
.penalize_decimal(Score, weigher) | Decimal penalty |
.as_constraint(name) | Name the constraint |
Joiners
from solverforge_legacy.solver.score import Joiners
| Joiner | Purpose |
|---|
Joiners.equal(extractor) | Match on equality |
Joiners.equal(extractorA, extractorB) | Match properties |
Joiners.less_than(extractorA, extractorB) | A < B |
Joiners.less_than_or_equal(extractorA, extractorB) | A <= B |
Joiners.greater_than(extractorA, extractorB) | A > B |
Joiners.greater_than_or_equal(extractorA, extractorB) | A >= B |
Joiners.overlapping(startA, endA, startB, endB) | Time overlap |
Joiners.overlapping(startA, endA) | Same start/end extractors |
Joiners.filtering(predicate) | Custom filter |
Collectors
from solverforge_legacy.solver.score import ConstraintCollectors
| Collector | Result |
|---|
count() | Number of items |
count_distinct(mapper) | Distinct count |
sum(mapper) | Sum of values |
min(mapper) | Minimum value |
max(mapper) | Maximum value |
average(mapper) | Average value |
to_list(mapper) | Collect to list |
to_set(mapper) | Collect to set |
load_balance(keyMapper, loadMapper) | Fairness measure |
compose(c1, c2, combiner) | Combine collectors |
Solver Configuration
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
SolverConfig
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
),
)
TerminationConfig Options
| Property | Type | Purpose |
|---|
spent_limit | Duration | Time limit |
unimproved_spent_limit | Duration | Time without improvement |
best_score_limit | str | Target score |
best_score_feasible | bool | Stop when feasible |
Solver API
from solverforge_legacy.solver import (
SolverFactory,
SolverManager,
SolutionManager,
SolverStatus,
)
SolverFactory
solver_factory = SolverFactory.create(config)
solver = solver_factory.build_solver()
solution = solver.solve(problem)
SolverManager
solver_manager = SolverManager.create(solver_factory)
# Async solving
solver_manager.solve_and_listen(
problem_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_best_solution,
)
# Control
solver_manager.terminate_early(problem_id)
status = solver_manager.get_solver_status(problem_id)
solver_manager.close()
SolverStatus
| Status | Meaning |
|---|
NOT_SOLVING | Not started |
SOLVING_ACTIVE | Currently solving |
SOLVING_SCHEDULED | Queued |
SolutionManager
solution_manager = SolutionManager.create(solver_factory)
analysis = solution_manager.analyze(solution)
score = solution_manager.update(solution)
Duration
from solverforge_legacy.solver.config import Duration
Duration(seconds=30)
Duration(minutes=5)
Duration(hours=1)
Event Listeners
from solverforge_legacy.solver import BestSolutionChangedEvent
def on_best_solution(event: BestSolutionChangedEvent):
print(f"Score: {event.new_best_score}")
print(f"Time: {event.time_spent}")
Problem Changes
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, solution: Timetable, problem_change_director):
problem_change_director.add_entity(
self.lesson,
lambda l: solution.lessons.append(l)
)
Import Summary
# Domain modeling
from solverforge_legacy.solver.domain import (
planning_entity,
planning_solution,
PlanningId,
PlanningVariable,
PlanningListVariable,
PlanningEntityCollectionProperty,
ProblemFactCollectionProperty,
ValueRangeProvider,
PlanningScore,
PlanningPin,
InverseRelationShadowVariable,
PreviousElementShadowVariable,
NextElementShadowVariable,
CascadingUpdateShadowVariable,
)
# Scores and constraints
from solverforge_legacy.solver.score import (
constraint_provider,
ConstraintFactory,
Constraint,
Joiners,
ConstraintCollectors,
HardSoftScore,
HardMediumSoftScore,
SimpleScore,
)
# Solver
from solverforge_legacy.solver import (
SolverFactory,
SolverManager,
SolutionManager,
SolverStatus,
BestSolutionChangedEvent,
ProblemChange,
)
# Configuration
from solverforge_legacy.solver.config import (
SolverConfig,
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
9.2 - FAQ
Frequently asked questions about SolverForge.
General
What is SolverForge?
SolverForge is a Python constraint solver for planning and scheduling optimization problems. It uses constraint streams to define rules and metaheuristic algorithms to find high-quality solutions.
What problems can SolverForge solve?
SolverForge excels at:
- Employee scheduling: Shift assignment with skills, availability, fairness
- Vehicle routing: Route optimization with capacity, time windows
- School timetabling: Class scheduling with room and teacher constraints
- Meeting scheduling: Room booking with attendee conflicts
- Task assignment: Job shop, bin packing, resource allocation
How is SolverForge licensed?
SolverForge is open source software released under the Apache License 2.0. This allows commercial use, modification, and distribution.
Installation
What are the requirements?
- Python 3.10 or later
- JDK 17 or later (SolverForge uses the JVM for solving)
Why does SolverForge need Java?
SolverForge’s solving engine runs on the JVM for performance. The Python API communicates with the JVM transparently via JPype.
How do I install it?
pip install solverforge-legacy
Make sure JAVA_HOME is set or Java is on your PATH.
Modeling
What’s the difference between problem facts and planning entities?
| Type | Changes During Solving | Example |
|---|
| Problem facts | No (input data) | Rooms, Timeslots, Employees |
| Planning entities | Yes (variables assigned) | Lessons, Shifts, Visits |
When should I use PlanningVariable vs PlanningListVariable?
| Type | Use Case | Example |
|---|
PlanningVariable | Assign one value | Lesson → Timeslot |
PlanningListVariable | Ordered list of entities | Vehicle → list of Visits |
Can I use Pydantic instead of dataclasses?
Yes. Both dataclasses and Pydantic models work. The quickstart examples show both patterns.
How do I pin (lock) an assignment?
Add PlanningPin to a boolean field:
@planning_entity
class Lesson:
pinned: Annotated[bool, PlanningPin] = False
Set pinned=True to prevent the solver from changing that entity’s variables.
Constraints
What’s the difference between hard and soft constraints?
| Type | Meaning | Example |
|---|
| Hard | Must not violate | No room conflicts |
| Soft | Prefer to satisfy | Teacher prefers certain room |
Hard constraints define feasibility. Soft constraints define quality.
Why use for_each_unique_pair instead of for_each + join?
for_each_unique_pair is more efficient and avoids counting conflicts twice:
# Good - each pair counted once
constraint_factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
# Less efficient - (A,B) and (B,A) both matched
constraint_factory.for_each(Lesson).join(Lesson, ...)
How do I debug a constraint?
- Use
SolutionManager.analyze() to see the score breakdown:
analysis = solution_manager.analyze(solution)
for c in analysis.constraint_analyses():
print(f"{c.constraint_name}: {c.score}")
- Examine individual matches:
for match in constraint_analysis.matches():
print(f" {match.justification}")
Why is my score always infeasible?
Common causes:
- Not enough resources (rooms, timeslots, employees) for entities
- Conflicting hard constraints that can’t all be satisfied
- Uninitialized entities (variables still
None)
Try:
- Increasing termination time
- Relaxing some hard constraints to soft
- Adding more resources
Solving
How long should I let the solver run?
Depends on problem size and complexity:
| Problem Size | Typical Time |
|---|
| Small (< 100 entities) | 10-60 seconds |
| Medium (100-1000 entities) | 1-10 minutes |
| Large (> 1000 entities) | 10+ minutes |
Use benchmarking to find the optimal time for your problem.
Why isn’t the score improving?
Possible causes:
- Stuck in local optimum (try different algorithm)
- All hard constraints satisfied (now optimizing soft)
- Constraints are too restrictive
Try:
- Simulated Annealing or Late Acceptance instead of Tabu Search
- Longer termination time
- Review constraint design
How do I stop solving early?
With Solver:
# External termination (from another thread)
solver.terminate_early()
With SolverManager:
solver_manager.terminate_early(problem_id)
Can I get progress updates during solving?
Yes, use SolverManager with a listener:
solver_manager.solve_and_listen(
problem_id,
problem_finder=lambda _: problem,
best_solution_consumer=lambda solution: print(f"Score: {solution.score}"),
)
How do I make constraints faster?
- Use Joiners instead of
filter():
# Fast - indexing
Joiners.equal(lambda lesson: lesson.timeslot)
# Slower - Python filter
.filter(lambda l1, l2: l1.timeslot == l2.timeslot)
- Cache computed values in entity fields
- Avoid expensive operations in lambdas
How do I scale to larger problems?
- Increase termination time
- Use more efficient constraints
- Consider partitioning large problems
- Use
PlanningListVariable for routing problems
Should I use multiple threads?
The solver is single-threaded by design for score calculation consistency. Use SolverManager to solve multiple problems concurrently.
Integration
Can I use SolverForge with FastAPI?
Yes! See the FastAPI Integration guide. Key pattern:
@asynccontextmanager
async def lifespan(app: FastAPI):
global solver_manager
solver_manager = SolverManager.create(solver_factory)
yield
solver_manager.close()
How do I serialize solutions to JSON?
Use Pydantic models or dataclasses.asdict():
# With dataclasses
import json
from dataclasses import asdict
json.dumps(asdict(solution))
# With Pydantic
solution.model_dump_json()
See Serialization for handling references.
Can I save and load solutions?
Yes, serialize to JSON and deserialize back:
# Save
with open("solution.json", "w") as f:
json.dump(solution_to_dict(solution), f)
# Load
with open("solution.json") as f:
problem = dict_to_solution(json.load(f))
Troubleshooting
“No JVM shared library file (libjvm.so) found”
Java isn’t installed or JAVA_HOME isn’t set:
# Check Java
java -version
# Set JAVA_HOME (example for Linux)
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
“Score corruption detected”
A constraint is producing inconsistent scores. Common causes:
- Non-deterministic lambdas
- External state changes
- Incorrect shadow variable updates
Run with RUST_LOG=debug to see details.
“OutOfMemoryError” in the JVM
Increase JVM heap:
export JAVA_TOOL_OPTIONS="-Xmx4g"
More Help