This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

SolverForge (Legacy)

Technical documentation for SolverForge Legacy — the pure Python constraint solver using the Timefold backend.

Every organization faces planning problems: providing products or services with a limited set of constrained resources (employees, assets, time, and money). SolverForge’s Planning AI optimizes these problems to do more business with fewer resources using Constraint Satisfaction Programming.

SolverForge is a lightweight, embeddable constraint satisfaction engine which optimizes planning problems. Example use cases include:

  • Vehicle Routing - Optimize delivery routes for fleets of vehicles
  • Employee Scheduling - Assign shifts to employees based on skills and availability
  • School Timetabling - Schedule lessons to timeslots and rooms
  • Meeting Scheduling - Find optimal times and rooms for meetings
  • Bin Packing - Efficiently pack items into containers
  • Task Assignment - Assign tasks to resources optimally

Use Case Overview

Python-First Design

SolverForge provides a Pythonic API using:

  • Decorators for domain modeling (@planning_entity, @planning_solution)
  • Type annotations with Annotated for constraint and property marking
  • Dataclasses for clean, readable domain models
  • Fluent constraint stream API for intuitive constraint definition
from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import (
    planning_entity, planning_solution,
    PlanningId, PlanningVariable, PlanningEntityCollectionProperty,
    ProblemFactCollectionProperty, ValueRangeProvider, PlanningScore
)
from solverforge_legacy.solver.score import HardSoftScore

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    teacher: str
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)

@planning_solution
@dataclass
class Timetable:
    timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
    rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
    lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
    score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

Requirements

  • Python 3.10+ (3.11 or 3.12 recommended)
  • JDK 17+ (for the optimization engine backend)

Next Steps

1 - Concepts

Understand the fundamental concepts of planning optimization and constraint solving.

Before diving into SolverForge, it’s helpful to understand the core concepts behind planning optimization. This section covers:

1.1 - What is Planning Optimization?

Introduction to planning optimization and constraint satisfaction.

Planning

The need to create plans generally arises from a desire to achieve a goal:

  • Build a house
  • Correctly staff a hospital shift
  • Complete work at all customer locations
  • Deliver packages efficiently

Achieving those goals involves organizing the available resources. To correctly staff a hospital you need enough qualified personnel in a variety of fields and specializations to cover the opening hours of the hospital.

Any plan to deploy resources, whether to staff a hospital shift or to assemble the building materials for a new house, is done under constraints.

Constraints could be:

  • Physical laws - People can’t work two shifts in two separate locations at the same time, and you can’t mount a roof on a house that doesn’t exist
  • Regulations - Employees need a certain number of hours between shifts or are only allowed to work a maximum number of hours per week
  • Preferences - Certain employees prefer to work specific shift patterns

Feasible Plans

Any plan needs to consider all three elements—goals, resources, and constraints—in balance to be a feasible plan. A plan that fails to account for all the elements of the problem is an infeasible plan.

For instance, if a hospital staff roster covers all shifts, but assigns employees back-to-back shifts with no breaks for sleep or life outside work, it is not a valid plan.

Why Planning Problems are Hard

Planning problems become harder to solve as the number of resources and constraints increase. Creating an employee shift schedule for a small team of four employees is fairly straightforward. However, if each employee performs a specific function within the business and those functions need to be performed in a specific order, changes that affect one employee quickly cascade and affect everybody on the team.

As more employees and different work specializations are added, things become much more complicated.

Example: For a trivial field service routing problem with 4 vehicles and 8 visits, the number of possibilities that a brute force algorithm considers is 19,958,418.

What would take a team of planners many hours to schedule can be automatically scheduled by SolverForge in a fraction of the time.

Operations Research

Operations Research (OR) is a field of research focused on finding optimal (or near optimal) solutions to problems with techniques that improve decision-making.

Constraint satisfaction programming is part of Operations Research that aims to satisfy all the constraints of a problem.

Planning AI

Planning AI is a type of artificial intelligence designed specifically to handle complex planning and scheduling tasks, and to satisfy the constraints of planning problems. Instead of just automating simple, repetitive tasks, it helps you make better decisions by sorting through countless possibilities to find the best solutions—saving you time, reducing costs, and improving efficiency.

Why Planning AI is Different

Traditional methods of planning often involve manually sifting through options or relying on basic tools that can’t keep up with the complexity of real-world problems. Planning AI, on the other hand, uses advanced strategies to quickly focus on the most promising solutions, even when the situation is extremely complicated.

Planning AI also makes it possible to understand the final solution with a breakdown of:

  • Which constraints have been violated
  • Scores for individual constraints
  • An overall score

This makes Planning AI incredibly valuable in industries where getting the right plan is crucial—whether that’s scheduling workers, routing deliveries, or managing resources in a factory.

Constraints and Scoring

Constraints can be considered hard, medium, or soft.

Hard Constraints

Hard constraints represent rules and limitations of the real world that any planning solution has to respect. For instance, there are only 24 hours in a day and people can only be in one place at a time. Hard constraints also include rules that must be adhered to, such as employee contracts and the order in which dependent tasks are completed.

Breaking hard constraints results in infeasible plans.

Medium Constraints

Medium constraints help manage plans when resources are limited (for instance, when there aren’t enough technicians to complete all the customer visits or there aren’t enough employees to work all the available shifts). Medium constraints incentivize SolverForge to assign as many entities (visits or shifts) as possible.

Soft Constraints

Soft constraints help optimize plans based on the business goals, for instance:

  • Minimize travel time between customer visits
  • Assign employees to their preferred shifts
  • Keep teachers in the same room for consecutive lessons

Understanding Scores

To help determine the quality of the solution, plans are assigned a score with values for hard, medium, and soft constraints.

0hard/-257medium/-6119520soft

From this example score we can see:

  • Zero hard constraints were broken (feasible!)
  • Medium and soft scores have negative values (room for optimization)

Note: The scores do not show how many constraints were broken, but weighted values associated with those constraints.

Score Comparison

Because breaking hard constraints would result in an infeasible solution, a solution that breaks zero hard constraints and has a soft constraint score of -1,000,000 is better than a solution that breaks one hard constraint and has a soft constraint score of 0.

The weight of constraints can be tweaked to adjust their impact on the solution.

1.2 - Problem Types

Common categories of planning and scheduling problems.

SolverForge can solve a wide variety of planning and scheduling problems. Here are some common categories:

Scheduling Problems

Assign activities to time slots and resources.

Employee Scheduling (Rostering)

Assign employees to shifts based on:

  • Skills and qualifications
  • Availability and preferences
  • Labor regulations (max hours, rest periods)
  • Fairness (balanced workload)

Examples: Hospital nurse scheduling, retail staff scheduling, call center scheduling

School Timetabling

Assign lessons to timeslots and rooms:

  • Teachers can only teach one class at a time
  • Rooms have limited capacity
  • Student groups shouldn’t have conflicts
  • Preference for consecutive lessons

Examples: University course scheduling, school class scheduling

Meeting Scheduling

Find optimal times for meetings:

  • Required attendees must be available
  • Rooms must be available and large enough
  • Minimize conflicts with other meetings
  • Consider timezone differences

Job Shop Scheduling

Schedule jobs on machines:

  • Operations must follow a specific order
  • Machines can only do one job at a time
  • Minimize total completion time (makespan)

Examples: Manufacturing scheduling, print shop scheduling

Routing Problems

Plan routes and sequences for vehicles or resources.

Vehicle Routing Problem (VRP)

Plan delivery or service routes:

  • Vehicle capacity constraints
  • Time windows for deliveries
  • Minimize total travel distance/time
  • Multiple depots possible

Variants:

  • CVRP - Capacitated VRP
  • VRPTW - VRP with Time Windows
  • PDPTW - Pickup and Delivery with Time Windows

Examples: Delivery route planning, field service scheduling, waste collection

Traveling Salesman Problem (TSP)

Visit all locations exactly once with minimum travel:

  • Single vehicle
  • Return to starting point
  • Minimize total distance

Examples: Sales territory planning, circuit board drilling

Assignment Problems

Assign entities to resources or positions.

Task Assignment

Assign tasks to workers or machines:

  • Match skills/capabilities
  • Balance workload
  • Meet deadlines
  • Minimize cost

Examples: Project team assignment, warehouse task allocation

Bin Packing

Pack items into containers:

  • Items have sizes/weights
  • Containers have capacity limits
  • Minimize number of containers used

Examples: Truck loading, cloud server allocation, cutting stock

Resource Allocation

Allocate limited resources to competing demands:

  • Budget allocation
  • Equipment assignment
  • Space allocation

Complex Planning Problems

Real-world problems often combine multiple problem types:

Field Service Scheduling

Combines:

  • Routing - Travel between customer locations
  • Scheduling - Time windows and appointment slots
  • Assignment - Match technician skills to job requirements

Project Planning

Combines:

  • Task scheduling - Activities with durations and dependencies
  • Resource assignment - Assign people/equipment to tasks
  • Constraint satisfaction - Deadlines, budgets, availability

Problem Characteristics

When modeling your problem, consider these characteristics:

CharacteristicDescriptionExample
Hard constraintsMust be satisfiedLegal requirements
Soft constraintsShould be optimizedCustomer preferences
Planning entitiesWhat gets assignedLessons, visits, shifts
Planning variablesThe assignmentsTimeslot, room, vehicle
Problem factsFixed dataEmployees, rooms, skills

Choosing the Right Model

When modeling your problem:

  1. Identify entities - What things need to be assigned or scheduled?
  2. Identify variables - What values are you assigning?
  3. Identify constraints - What rules must be followed?
  4. Define the score - How do you measure solution quality?

The Quickstarts section provides complete examples for common problem types.

1.3 - Terminology

Glossary of terms used in SolverForge documentation.

Core Concepts

Planning Problem

The input to the solver: a set of planning entities with uninitialized planning variables, plus all problem facts and constraints.

Planning Solution

The container class that holds all problem data (entities and facts) and the resulting score. Decorated with @planning_solution.

Planning Entity

A class whose instances are modified during solving. Planning entities contain planning variables. Decorated with @planning_entity.

Planning Variable

A property of a planning entity that the solver changes during optimization. Annotated with PlanningVariable.

Problem Fact

Immutable data that defines the problem but is not changed by the solver (e.g., rooms, timeslots, employees). Annotated with ProblemFactCollectionProperty or ProblemFactProperty.

Value Range

The set of possible values for a planning variable. Provided via ValueRangeProvider.

Scoring

Score

A measure of solution quality. Higher scores are better. Common types: SimpleScore, HardSoftScore, HardMediumSoftScore.

Hard Constraint

A constraint that must be satisfied for a solution to be feasible. Broken hard constraints make a solution invalid.

Soft Constraint

A constraint that should be optimized but isn’t required. Used for preferences and optimization goals.

Medium Constraint

A constraint between hard and soft, typically used for “assign as many as possible” scenarios.

Feasible Solution

A solution with no broken hard constraints (hard score of 0 or positive).

Optimal Solution

A feasible solution with the best possible soft score. May be impractical to find for large problems.

Constraint Stream

The fluent API for defining constraints. Starts with ConstraintFactory.for_each().

Algorithms

Construction Heuristic

An algorithm that builds an initial solution quickly by assigning values to all planning variables.

An algorithm that improves an existing solution by making incremental changes (moves).

Move

A change to the solution, such as swapping two assignments or changing a single variable.

Step

One iteration of the optimization algorithm, consisting of selecting and applying a move.

Termination

The condition that stops the solver (time limit, score target, no improvement, etc.).

Advanced Concepts

Shadow Variable

A planning variable whose value is calculated from other variables, not directly assigned by the solver. Used for derived values like arrival times.

Inverse Shadow Variable

A shadow variable that maintains a reverse reference (e.g., a visit knowing which vehicle it belongs to).

Previous/Next Element Shadow Variable

Shadow variables that track the previous or next element in a list variable.

Cascading Update Shadow Variable

A shadow variable that triggers recalculation when upstream variables change.

List Variable

A planning variable that holds an ordered list of values (used for routing problems). Annotated with PlanningListVariable.

Pinning

Locking certain assignments so the solver cannot change them. Useful for preserving manual decisions or already-executed plans.

Problem Change

A modification to the problem while the solver is running (real-time planning).

Solver Components

Solver

The main component that performs optimization. Created via SolverFactory.

SolverFactory

Factory for creating Solver instances from configuration.

SolverConfig

Configuration object specifying solution class, entities, constraints, and termination.

SolverManager

Manages multiple concurrent solving jobs. Useful for web applications.

SolutionManager

Analyzes solutions: explains scores, identifies constraint violations.

ScoreDirector

Internal component that calculates scores efficiently. Used in problem changes.

Constraint Provider

A function decorated with @constraint_provider that returns a list of constraints.

Constraint Stream Operations

for_each / forEach

Start a constraint stream by iterating over all instances of a class.

for_each_unique_pair

Iterate over all unique pairs of instances (A,B where A != B, without duplicates like (B,A)).

filter

Remove items that don’t match a predicate.

join

Combine two streams by matching on joiners.

Joiner

A condition for matching items in joins (e.g., Joiners.equal(), Joiners.overlapping()).

group_by

Aggregate items by key with collectors.

Collector

Aggregation function (count, sum, min, max, toList, etc.).

penalize / reward

Apply score impact for matching items.

as_constraint

Finalize the constraint with a name.

Score Analysis

Score Explanation

Breakdown of which constraints contributed to the score.

Constraint Match

A single instance of a constraint being triggered.

Indictment

List of constraint violations associated with a specific entity.

Justification

Explanation of why a constraint was triggered.

2 - Getting Started

Install SolverForge and solve your first planning problem.

Get up and running with SolverForge in minutes.

Quick Start

  1. Installation - Set up Python, JDK, and install SolverForge
  2. Hello World - Build a simple school timetabling solver (CLI)
  3. Hello World with FastAPI - Add a REST API to your solver

What You’ll Learn

In the Hello World tutorial, you’ll build a school timetabling application that:

  • Assigns lessons to timeslots and rooms
  • Avoids scheduling conflicts (same teacher, room, or student group at the same time)
  • Optimizes for teacher preferences (room stability, consecutive lessons)

This introduces the core concepts you’ll use in any SolverForge application:

  • Planning entities - The things being scheduled (lessons)
  • Planning variables - The values being assigned (timeslot, room)
  • Constraints - The rules that define a valid solution
  • Solver configuration - How to run the optimization

2.1 - Installation

Set up Python, JDK, and install SolverForge.

Prerequisites

SolverForge requires:

  • Python 3.10 or higher (3.11 or 3.12 recommended)
  • JDK 17 or higher (for the optimization engine backend)

Check Python Version

python --version
# Python 3.11.0 or higher

If you need to install Python, visit python.org or use your system’s package manager.

Check JDK Version

java -version
# openjdk version "17.0.x" or higher

If you need to install a JDK:

  • macOS: brew install openjdk@17
  • Ubuntu/Debian: sudo apt install openjdk-17-jdk
  • Fedora: sudo dnf install java-17-openjdk
  • Windows: Download from Adoptium or Oracle

Make sure JAVA_HOME is set:

echo $JAVA_HOME
# Should output path to JDK installation

Install SolverForge

pip install solverforge-legacy

In a Virtual Environment

# Create virtual environment
python -m venv .venv

# Activate it
source .venv/bin/activate  # Linux/macOS
# or
.venv\Scripts\activate     # Windows

# Install SolverForge
pip install solverforge-legacy

Verify Installation

python -c "from solverforge_legacy.solver import SolverFactory; print('SolverForge installed successfully!')"

Project Setup

For a new project, create a pyproject.toml:

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "my-solver-project"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
    "solverforge-legacy == 1.24.1",
    "pytest == 8.2.2",  # For testing
]

Then install your project in development mode:

pip install -e .

IDE Setup

VS Code

Install the Python extension and configure your interpreter to use the virtual environment.

PyCharm

  1. Open your project
  2. Go to Settings > Project > Python Interpreter
  3. Select the virtual environment interpreter

Troubleshooting

JVM Not Found

If you see errors about JVM not found:

  1. Verify Java is installed: java -version
  2. Set JAVA_HOME environment variable
  3. Ensure JAVA_HOME/bin is in your PATH

Import Errors

If imports fail:

  1. Verify you’re in the correct virtual environment
  2. Re-install: pip install --force-reinstall solverforge-legacy

Memory Issues

For large problems, you may need to increase JVM memory. This is configured automatically, but you can adjust if needed.

Next Steps

Now that SolverForge is installed, follow the Hello World Tutorial to build your first solver.

2.2 - Hello World

Build a school timetabling solver from scratch.

In this tutorial, you’ll build a school timetabling application that assigns lessons to timeslots and rooms while avoiding conflicts.

The Problem

A school needs to schedule lessons:

  • Each lesson has a subject, teacher, and student group
  • Available timeslots (e.g., Monday 08:30, Monday 09:30, …)
  • Available rooms (Room A, Room B, Room C)

Constraints:

  • Hard: No room, teacher, or student group conflicts
  • Soft: Teachers prefer the same room, consecutive lessons

Project Structure

Create the following files:

hello_world/
├── domain.py       # Data model
├── constraints.py  # Constraint definitions
├── main.py         # Entry point
└── pyproject.toml  # Dependencies

Step 1: Define the Domain Model

Create domain.py with the problem facts and planning entities:

from dataclasses import dataclass, field
from datetime import time
from typing import Annotated

from solverforge_legacy.solver.domain import (
    planning_entity,
    planning_solution,
    PlanningId,
    PlanningVariable,
    PlanningEntityCollectionProperty,
    ProblemFactCollectionProperty,
    ValueRangeProvider,
    PlanningScore,
)
from solverforge_legacy.solver.score import HardSoftScore


# Problem facts (immutable data)
@dataclass
class Timeslot:
    day_of_week: str
    start_time: time
    end_time: time

    def __str__(self):
        return f"{self.day_of_week} {self.start_time.strftime('%H:%M')}"


@dataclass
class Room:
    name: str

    def __str__(self):
        return self.name


# Planning entity (modified by the solver)
@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    teacher: str
    student_group: str
    # Planning variables - assigned by the solver
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)


# Planning solution (container for all data)
@planning_solution
@dataclass
class Timetable:
    id: str
    # Problem facts with value range providers
    timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
    rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
    # Planning entities
    lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
    # Score calculated by constraints
    score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

Key Concepts

  • @planning_entity marks Lesson as something the solver will modify
  • PlanningVariable marks timeslot and room as values to assign
  • @planning_solution marks Timetable as the container
  • ValueRangeProvider tells the solver which values are available

Step 2: Define Constraints

Create constraints.py with the scoring rules:

from solverforge_legacy.solver.score import (
    constraint_provider,
    ConstraintFactory,
    Constraint,
    Joiners,
    HardSoftScore,
)

from .domain import Lesson


@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
    return [
        # Hard constraints
        room_conflict(constraint_factory),
        teacher_conflict(constraint_factory),
        student_group_conflict(constraint_factory),
        # Soft constraints
        teacher_room_stability(constraint_factory),
    ]


def room_conflict(constraint_factory: ConstraintFactory) -> Constraint:
    """A room can accommodate at most one lesson at the same time."""
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.timeslot),
            Joiners.equal(lambda lesson: lesson.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Room conflict")
    )


def teacher_conflict(constraint_factory: ConstraintFactory) -> Constraint:
    """A teacher can teach at most one lesson at the same time."""
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.timeslot),
            Joiners.equal(lambda lesson: lesson.teacher),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Teacher conflict")
    )


def student_group_conflict(constraint_factory: ConstraintFactory) -> Constraint:
    """A student group can attend at most one lesson at the same time."""
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.timeslot),
            Joiners.equal(lambda lesson: lesson.student_group),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Student group conflict")
    )


def teacher_room_stability(constraint_factory: ConstraintFactory) -> Constraint:
    """A teacher prefers to teach in a single room."""
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.teacher),
        )
        .filter(lambda lesson1, lesson2: lesson1.room != lesson2.room)
        .penalize(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher room stability")
    )

Constraint Stream Pattern

Each constraint follows this pattern:

  1. Select entities with for_each() or for_each_unique_pair()
  2. Filter to matching cases with Joiners or .filter()
  3. Penalize (or reward) with a score impact
  4. Name the constraint with as_constraint()

Step 3: Configure and Run the Solver

Create main.py:

from datetime import time

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

from .domain import Timetable, Timeslot, Room, Lesson
from .constraints import define_constraints


def main():
    # Configure the solver
    solver_config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(seconds=5)
        ),
    )

    # Create the solver
    solver_factory = SolverFactory.create(solver_config)
    solver = solver_factory.build_solver()

    # Generate a problem
    problem = generate_demo_data()

    # Solve it!
    solution = solver.solve(problem)

    # Print the result
    print_timetable(solution)


def generate_demo_data() -> Timetable:
    """Create a small demo problem."""
    timeslots = [
        Timeslot("MONDAY", time(8, 30), time(9, 30)),
        Timeslot("MONDAY", time(9, 30), time(10, 30)),
        Timeslot("MONDAY", time(10, 30), time(11, 30)),
        Timeslot("TUESDAY", time(8, 30), time(9, 30)),
        Timeslot("TUESDAY", time(9, 30), time(10, 30)),
    ]

    rooms = [
        Room("Room A"),
        Room("Room B"),
        Room("Room C"),
    ]

    lessons = [
        Lesson("1", "Math", "A. Turing", "9th grade"),
        Lesson("2", "Physics", "M. Curie", "9th grade"),
        Lesson("3", "Chemistry", "M. Curie", "9th grade"),
        Lesson("4", "Biology", "C. Darwin", "9th grade"),
        Lesson("5", "History", "I. Jones", "9th grade"),
        Lesson("6", "Math", "A. Turing", "10th grade"),
        Lesson("7", "Physics", "M. Curie", "10th grade"),
        Lesson("8", "Geography", "C. Darwin", "10th grade"),
    ]

    return Timetable("demo", timeslots, rooms, lessons)


def print_timetable(timetable: Timetable) -> None:
    """Print the solution in a readable format."""
    print(f"\nScore: {timetable.score}\n")

    for lesson in timetable.lessons:
        print(f"{lesson.subject} ({lesson.teacher}, {lesson.student_group})")
        print(f"  -> {lesson.timeslot} in {lesson.room}")
        print()


if __name__ == "__main__":
    main()

Step 4: Run It

python -m hello_world.main

You should see output like:

Score: 0hard/-3soft

Math (A. Turing, 9th grade)
  -> MONDAY 08:30 in Room A

Physics (M. Curie, 9th grade)
  -> MONDAY 09:30 in Room B

Chemistry (M. Curie, 9th grade)
  -> TUESDAY 08:30 in Room B

...

A score of 0hard means all hard constraints are satisfied (no conflicts). The negative soft score indicates room for optimization of preferences.

Understanding the Output

  • 0hard = No conflicts (feasible solution!)
  • -3soft = 3 soft constraint violations (teachers using multiple rooms)

The solver found a valid timetable where:

  • No room has two lessons at the same time
  • No teacher teaches two lessons at the same time
  • No student group has two lessons at the same time

Next Steps

2.3 - Hello World with FastAPI

Add a REST API to your school timetabling solver.

This tutorial extends the Hello World example by adding a REST API using FastAPI. This is closer to how you’d deploy a solver in production.

Prerequisites

  • Completed the Hello World tutorial
  • FastAPI and Uvicorn installed:
pip install fastapi uvicorn

Project Structure

Extend your project:

hello_world/
├── domain.py           # Same as before
├── constraints.py      # Same as before
├── main.py             # CLI version (optional)
├── rest_api.py         # NEW: FastAPI application
└── pyproject.toml      # Add fastapi, uvicorn

Step 1: Update Dependencies

Add FastAPI to your pyproject.toml:

[project]
dependencies = [
    "solverforge-legacy == 1.24.1",
    "fastapi >= 0.100.0",
    "uvicorn >= 0.23.0",
    "pytest == 8.2.2",
]

Step 2: Create the REST API

Create rest_api.py:

import uuid
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import time

from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

from .domain import Timetable, Timeslot, Room, Lesson
from .constraints import define_constraints


# Pydantic models for API validation
class TimeslotDTO(BaseModel):
    day_of_week: str
    start_time: str
    end_time: str

    def to_domain(self) -> Timeslot:
        return Timeslot(
            self.day_of_week,
            time.fromisoformat(self.start_time),
            time.fromisoformat(self.end_time),
        )


class RoomDTO(BaseModel):
    name: str

    def to_domain(self) -> Room:
        return Room(self.name)


class LessonDTO(BaseModel):
    id: str
    subject: str
    teacher: str
    student_group: str
    timeslot: TimeslotDTO | None = None
    room: RoomDTO | None = None

    def to_domain(self, timeslots: list[Timeslot], rooms: list[Room]) -> Lesson:
        ts = None
        if self.timeslot:
            ts = next(
                (t for t in timeslots if t.day_of_week == self.timeslot.day_of_week
                 and t.start_time.isoformat() == self.timeslot.start_time),
                None
            )
        rm = None
        if self.room:
            rm = next((r for r in rooms if r.name == self.room.name), None)

        return Lesson(self.id, self.subject, self.teacher, self.student_group, ts, rm)


class TimetableDTO(BaseModel):
    id: str
    timeslots: list[TimeslotDTO]
    rooms: list[RoomDTO]
    lessons: list[LessonDTO]
    score: str | None = None

    def to_domain(self) -> Timetable:
        timeslots = [ts.to_domain() for ts in self.timeslots]
        rooms = [r.to_domain() for r in self.rooms]
        lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
        return Timetable(self.id, timeslots, rooms, lessons)

    @classmethod
    def from_domain(cls, timetable: Timetable) -> "TimetableDTO":
        return cls(
            id=timetable.id,
            timeslots=[
                TimeslotDTO(
                    day_of_week=ts.day_of_week,
                    start_time=ts.start_time.isoformat(),
                    end_time=ts.end_time.isoformat(),
                )
                for ts in timetable.timeslots
            ],
            rooms=[RoomDTO(name=r.name) for r in timetable.rooms],
            lessons=[
                LessonDTO(
                    id=l.id,
                    subject=l.subject,
                    teacher=l.teacher,
                    student_group=l.student_group,
                    timeslot=TimeslotDTO(
                        day_of_week=l.timeslot.day_of_week,
                        start_time=l.timeslot.start_time.isoformat(),
                        end_time=l.timeslot.end_time.isoformat(),
                    ) if l.timeslot else None,
                    room=RoomDTO(name=l.room.name) if l.room else None,
                )
                for l in timetable.lessons
            ],
            score=str(timetable.score) if timetable.score else None,
        )


# Global solver manager
solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize solver manager on startup."""
    global solver_manager

    solver_config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(seconds=30)
        ),
    )

    solver_factory = SolverFactory.create(solver_config)
    solver_manager = SolverManager.create(solver_factory)

    yield

    # Cleanup on shutdown
    if solver_manager:
        solver_manager.close()


app = FastAPI(
    title="School Timetabling API",
    description="Optimize school timetables using SolverForge",
    lifespan=lifespan,
)


@app.post("/timetables", response_model=str)
async def submit_problem(timetable_dto: TimetableDTO) -> str:
    """Submit a timetabling problem for solving."""
    job_id = str(uuid.uuid4())
    problem = timetable_dto.to_domain()

    def on_best_solution(solution: Timetable):
        solutions[job_id] = solution

    solver_manager.solve_and_listen(
        job_id,
        lambda _: problem,
        on_best_solution,
    )

    return job_id


@app.get("/timetables/{job_id}", response_model=TimetableDTO)
async def get_solution(job_id: str) -> TimetableDTO:
    """Get the current best solution for a job."""
    if job_id not in solutions:
        raise HTTPException(status_code=404, detail="Job not found")

    return TimetableDTO.from_domain(solutions[job_id])


@app.delete("/timetables/{job_id}")
async def stop_solving(job_id: str):
    """Stop solving a problem early."""
    solver_manager.terminate_early(job_id)
    return {"status": "terminated"}


@app.get("/demo-data", response_model=TimetableDTO)
async def get_demo_data() -> TimetableDTO:
    """Get demo problem data for testing."""
    timeslots = [
        Timeslot("MONDAY", time(8, 30), time(9, 30)),
        Timeslot("MONDAY", time(9, 30), time(10, 30)),
        Timeslot("TUESDAY", time(8, 30), time(9, 30)),
        Timeslot("TUESDAY", time(9, 30), time(10, 30)),
    ]
    rooms = [Room("Room A"), Room("Room B")]
    lessons = [
        Lesson("1", "Math", "A. Turing", "9th grade"),
        Lesson("2", "Physics", "M. Curie", "9th grade"),
        Lesson("3", "History", "I. Jones", "9th grade"),
        Lesson("4", "Math", "A. Turing", "10th grade"),
    ]

    return TimetableDTO.from_domain(Timetable("demo", timeslots, rooms, lessons))

Step 3: Run the API

uvicorn hello_world.rest_api:app --reload

The API is now running at http://localhost:8000.

Step 4: Test the API

Get Demo Data

curl http://localhost:8000/demo-data

Submit a Problem

# Get demo data and submit it for solving
curl http://localhost:8000/demo-data | curl -X POST \
  -H "Content-Type: application/json" \
  -d @- \
  http://localhost:8000/timetables

This returns a job ID like "a1b2c3d4-...".

Check the Solution

curl http://localhost:8000/timetables/{job_id}

Stop Solving Early

curl -X DELETE http://localhost:8000/timetables/{job_id}

API Documentation

FastAPI automatically generates interactive API docs:

  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc

Architecture Notes

SolverManager

SolverManager handles concurrent solving jobs:

  • Each job runs in its own thread
  • Multiple problems can be solved simultaneously
  • Solutions are updated as the solver improves them

Pydantic Models

We use separate Pydantic DTOs for:

  • API request/response validation
  • JSON serialization
  • Decoupling API schema from domain model

Production Considerations

For production deployments:

  1. Persistence: Store solutions in a database
  2. Scaling: Use a message queue for distributed solving
  3. Monitoring: Add logging and metrics
  4. Security: Add authentication and rate limiting

Next Steps

3 - Domain Modeling

Model your planning problem with entities, variables, and solutions.

Domain modeling is the foundation of any SolverForge application. You define your problem structure using Python dataclasses and type annotations.

Core Concepts

Model Structure

A typical SolverForge model consists of:

Planning Solution
├── Problem Facts (immutable data)
│   ├── Timeslots, Rooms, Employees, etc.
│   └── Value Range Providers
├── Planning Entities (mutable)
│   └── Planning Variables (assigned by solver)
└── Score (calculated from constraints)

Example

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    teacher: str
    # Planning variables - assigned by the solver
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)

@planning_solution
@dataclass
class Timetable:
    # Problem facts - immutable
    timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
    rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
    # Planning entities - contain variables to optimize
    lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
    # Score - calculated by constraints
    score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

3.1 - Planning Entities

Define planning entities that the solver will optimize.

A planning entity is a class whose instances the solver can change during optimization. Planning entities contain planning variables that get assigned values.

The @planning_entity Decorator

Mark a class as a planning entity with @planning_entity:

from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import planning_entity, PlanningId, PlanningVariable

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    teacher: str
    student_group: str
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)

Planning ID

Every planning entity must have a unique identifier marked with PlanningId:

id: Annotated[str, PlanningId]

The ID is used for:

  • Tracking entities during solving
  • Cloning solutions
  • Score explanation

The ID type can be str, int, or any hashable type.

Genuine vs Shadow Entities

There are two types of planning entities:

Genuine Entities

A genuine planning entity has at least one genuine planning variable that the solver directly assigns:

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    # Genuine variable - solver assigns this
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)

Shadow-Only Entities

A shadow-only entity has only shadow variables (calculated from other entities):

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    location: Location
    # Shadow variable - calculated from vehicle's visit list
    vehicle: Annotated[Vehicle | None, InverseRelationShadowVariable(...)] = field(default=None)

Entity Properties

Immutable Properties

Properties without PlanningVariable annotations are immutable during solving:

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str          # Immutable
    teacher: str          # Immutable
    student_group: str    # Immutable
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)  # Mutable

The solver never changes subject, teacher, or student_group.

Default Values

Planning variables should have default values (typically None) for uninitialized state:

timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)

Multiple Planning Variables

An entity can have multiple planning variables:

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    teacher: str
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)

Each variable is assigned independently by the solver.

Entity Collections in Solution

Planning entities are collected in the planning solution:

@planning_solution
@dataclass
class Timetable:
    lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]

The solver iterates over this collection to find entities to optimize.

Nullable Variables

By default, planning variables must be assigned. For nullable variables (when some entities might be unassigned), see Planning Variables.

Best Practices

Do

  • Use @dataclass for clean, simple entity definitions
  • Give each entity a unique, stable ID
  • Initialize planning variables to None
  • Keep entities focused on a single concept

Don’t

  • Put business logic in entities (use constraints instead)
  • Make planning variables required in __init__
  • Use mutable default arguments (use field(default_factory=...) instead)

Example: Shift Assignment

@planning_entity
@dataclass
class Shift:
    id: Annotated[str, PlanningId]
    start_time: datetime
    end_time: datetime
    required_skill: str
    # Assigned by solver
    employee: Annotated[Employee | None, PlanningVariable] = field(default=None)

Example: Vehicle Routing

@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    capacity: int
    home_location: Location
    # List of visits assigned to this vehicle
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

Next Steps

3.2 - Planning Variables

Define what the solver assigns: simple variables and list variables.

A planning variable is a property of a planning entity that the solver assigns values to during optimization.

Simple Planning Variable

The most common type assigns a single value from a value range:

from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import planning_entity, PlanningId, PlanningVariable

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    # Simple planning variable
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)

How It Works

  1. The solver sees timeslot needs a value
  2. It looks for a ValueRangeProvider for Timeslot in the solution
  3. It tries different values and evaluates the score
  4. It assigns the best value found within the time limit

Planning List Variable

For routing problems where order matters, use PlanningListVariable:

from solverforge_legacy.solver.domain import PlanningListVariable

@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    capacity: int
    home_location: Location
    # List variable - ordered sequence of visits
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

How It Works

The solver:

  • Assigns visits to vehicles
  • Determines the order of visits within each vehicle’s route
  • Uses moves like insert, swap, and 2-opt for optimization

When to Use List Variables

Use PlanningListVariable when:

  • Order matters (routing, sequencing)
  • Entities belong to groups (visits per vehicle, tasks per worker)
  • Chain relationships exist (predecessor/successor patterns)

Nullable Variables

By default, all planning variables must be assigned. For optional assignments:

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    location: Location
    # This visit might not be assigned to any vehicle
    vehicle: Annotated[Vehicle | None, PlanningVariable(allows_unassigned=True)] = field(default=None)

Note: When using nullable variables, add medium constraints to penalize unassigned entities.

Value Range Providers

Planning variables need a source of possible values. This is configured in the planning solution:

@planning_solution
@dataclass
class Timetable:
    # This list provides values for 'timeslot' variables
    timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
    # This list provides values for 'room' variables
    rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
    lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
    score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

The solver matches variables to value ranges by type:

  • timeslot: Annotated[Timeslot | None, PlanningVariable] uses list[Timeslot]
  • room: Annotated[Room | None, PlanningVariable] uses list[Room]

Variable Configuration Options

Strength Comparator

For construction heuristics, you can specify how to order values:

# Stronger values tried first during construction
timeslot: Annotated[
    Timeslot | None,
    PlanningVariable(value_range_provider_refs=["timeslots"])
] = field(default=None)

Multiple Variables on One Entity

Entities can have multiple independent variables:

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    # Two independent variables
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)

Each variable is optimized independently—assigning timeslot doesn’t affect room.

Chained Variables (Alternative to List)

For simpler routing without list variables, you can use chained planning variables. However, PlanningListVariable is generally easier and more efficient.

Variable Listener Pattern

When one variable affects another, use shadow variables:

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    location: Location
    # Calculated from vehicle's visit list
    vehicle: Annotated[Vehicle | None, InverseRelationShadowVariable(source_variable_name="visits")] = field(default=None)
    # Calculated from previous visit
    arrival_time: Annotated[datetime | None, CascadingUpdateShadowVariable(target_method_name="update_arrival_time")] = field(default=None)

See Shadow Variables for details.

Best Practices

Do

  • Initialize variables to None or empty list
  • Use type hints with | None for nullable types
  • Match value range types exactly

Don’t

  • Mix list variables with simple variables for the same concept
  • Use complex types as planning variables (use references instead)
  • Forget to provide a value range

Common Patterns

Scheduling

timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)

Assignment

employee: Annotated[Employee | None, PlanningVariable] = field(default=None)

Routing

visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

Next Steps

3.3 - Planning Solutions

Define the container for problem data and solution score.

A planning solution is the container class that holds all problem data, planning entities, and the solution score.

The @planning_solution Decorator

from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import (
    planning_solution,
    ProblemFactCollectionProperty,
    ProblemFactProperty,
    PlanningEntityCollectionProperty,
    ValueRangeProvider,
    PlanningScore,
)
from solverforge_legacy.solver.score import HardSoftScore

@planning_solution
@dataclass
class Timetable:
    id: str
    timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
    rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
    lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]
    score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

Solution Components

A planning solution contains:

  1. Problem Facts - Immutable input data
  2. Planning Entities - Mutable entities with planning variables
  3. Score - Quality measure of the solution

Problem Facts

Problem facts are immutable data that define the problem:

Collection Property

For lists of facts:

timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty]
rooms: Annotated[list[Room], ProblemFactCollectionProperty]
employees: Annotated[list[Employee], ProblemFactCollectionProperty]

Single Property

For single facts:

config: Annotated[ScheduleConfig, ProblemFactProperty]
start_date: Annotated[date, ProblemFactProperty]

Value Range Providers

Value ranges provide possible values for planning variables. Combine with problem fact annotations:

# This list provides values for Timeslot planning variables
timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]

# This list provides values for Room planning variables
rooms: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]

The solver automatically matches variables to value ranges by type:

  • PlanningVariable of type Timeslot uses list[Timeslot]
  • PlanningVariable of type Room uses list[Room]

Multiple Ranges for Same Type

If you have multiple value ranges of the same type, use explicit references:

@planning_solution
@dataclass
class Schedule:
    preferred_timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]
    backup_timeslots: Annotated[list[Timeslot], ProblemFactCollectionProperty, ValueRangeProvider]

Planning Entities

Collect planning entities in the solution:

lessons: Annotated[list[Lesson], PlanningEntityCollectionProperty]

For a single entity:

main_vehicle: Annotated[Vehicle, PlanningEntityProperty]

Score

Every solution needs a score field:

score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

Common score types:

  • SimpleScore - Single level
  • HardSoftScore - Feasibility + optimization
  • HardMediumSoftScore - Three levels
  • BendableScore - Custom levels

The solver calculates and updates this field automatically.

Solution Identity

Include an identifier for tracking:

@planning_solution
@dataclass
class Timetable:
    id: str  # For tracking in SolverManager
    ...

Complete Example

from dataclasses import dataclass, field
from typing import Annotated
from datetime import date

from solverforge_legacy.solver.domain import (
    planning_solution,
    ProblemFactCollectionProperty,
    ProblemFactProperty,
    PlanningEntityCollectionProperty,
    ValueRangeProvider,
    PlanningScore,
)
from solverforge_legacy.solver.score import HardSoftScore


@planning_solution
@dataclass
class EmployeeSchedule:
    # Identity
    id: str

    # Problem facts
    schedule_start: Annotated[date, ProblemFactProperty]
    employees: Annotated[list[Employee], ProblemFactCollectionProperty, ValueRangeProvider]
    skills: Annotated[list[Skill], ProblemFactCollectionProperty]

    # Planning entities
    shifts: Annotated[list[Shift], PlanningEntityCollectionProperty]

    # Score
    score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

Creating Problem Instances

Load or generate problem data:

def load_problem() -> Timetable:
    timeslots = [
        Timeslot("MONDAY", time(8, 30), time(9, 30)),
        Timeslot("MONDAY", time(9, 30), time(10, 30)),
        # ...
    ]

    rooms = [
        Room("Room A"),
        Room("Room B"),
    ]

    lessons = [
        Lesson("1", "Math", "A. Turing", "9th grade"),
        Lesson("2", "Physics", "M. Curie", "9th grade"),
        # ...
    ]

    return Timetable(
        id="problem-001",
        timeslots=timeslots,
        rooms=rooms,
        lessons=lessons,
        score=None,  # Solver will calculate this
    )

Accessing the Solved Solution

After solving, the solution contains assigned variables and score:

solution = solver.solve(problem)

print(f"Score: {solution.score}")
print(f"Is feasible: {solution.score.is_feasible}")

for lesson in solution.lessons:
    print(f"{lesson.subject}: {lesson.timeslot} in {lesson.room}")

Solution Cloning

The solver internally clones solutions to track the best solution. This happens automatically with @dataclass entities.

For custom classes, ensure proper cloning behavior or use @deep_planning_clone:

from solverforge_legacy.solver.domain import deep_planning_clone

@deep_planning_clone
class CustomClass:
    # This class will be deeply cloned during solving
    pass

Best Practices

Do

  • Use @dataclass for solutions
  • Initialize score to None
  • Include all data needed for constraint evaluation
  • Use descriptive field names

Don’t

  • Include data not used in constraints (performance impact)
  • Modify problem facts during solving
  • Forget value range providers for planning variables

Next Steps

3.4 - Shadow Variables

Define calculated variables that update automatically.

A shadow variable is a planning variable whose value is calculated from other variables, not directly assigned by the solver. Shadow variables update automatically when their source variables change.

When to Use Shadow Variables

Use shadow variables for:

  • Derived values - Arrival times calculated from routes
  • Inverse relationships - A visit knowing which vehicle it belongs to
  • Cascading calculations - End times derived from start times and durations

Shadow Variable Types

Inverse Relation Shadow Variable

Maintains a reverse reference when using list variables:

from solverforge_legacy.solver.domain import InverseRelationShadowVariable

@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    location: Location
    # Automatically set to the vehicle that contains this visit
    vehicle: Annotated[
        Vehicle | None,
        InverseRelationShadowVariable(source_variable_name="visits")
    ] = field(default=None)

When a visit is added to vehicle.visits, visit.vehicle is automatically set.

Previous Element Shadow Variable

Tracks the previous element in a list variable:

from solverforge_legacy.solver.domain import PreviousElementShadowVariable

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    # The visit that comes before this one in the route
    previous_visit: Annotated[
        Visit | None,
        PreviousElementShadowVariable(source_variable_name="visits")
    ] = field(default=None)

Next Element Shadow Variable

Tracks the next element in a list variable:

from solverforge_legacy.solver.domain import NextElementShadowVariable

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    # The visit that comes after this one in the route
    next_visit: Annotated[
        Visit | None,
        NextElementShadowVariable(source_variable_name="visits")
    ] = field(default=None)

Index Shadow Variable

Tracks the position in a list variable:

from solverforge_legacy.solver.domain import IndexShadowVariable

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    # Position in the vehicle's visit list (0-based)
    index: Annotated[
        int | None,
        IndexShadowVariable(source_variable_name="visits")
    ] = field(default=None)

Cascading Update Shadow Variable

For custom calculations that depend on other variables:

from solverforge_legacy.solver.domain import CascadingUpdateShadowVariable
from datetime import datetime, timedelta

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    location: Location
    service_duration: timedelta

    vehicle: Annotated[
        Vehicle | None,
        InverseRelationShadowVariable(source_variable_name="visits")
    ] = field(default=None)

    previous_visit: Annotated[
        Visit | None,
        PreviousElementShadowVariable(source_variable_name="visits")
    ] = field(default=None)

    # Calculated arrival time
    arrival_time: Annotated[
        datetime | None,
        CascadingUpdateShadowVariable(target_method_name="update_arrival_time")
    ] = field(default=None)

    def update_arrival_time(self):
        """Called automatically when previous_visit or vehicle changes."""
        if self.vehicle is None:
            self.arrival_time = None
        elif self.previous_visit is None:
            # First visit: departure from depot
            travel_time = self.vehicle.depot.driving_time_to(self.location)
            self.arrival_time = self.vehicle.departure_time + travel_time
        else:
            # Subsequent visit: after previous visit's departure
            travel_time = self.previous_visit.location.driving_time_to(self.location)
            self.arrival_time = self.previous_visit.departure_time + travel_time

    @property
    def departure_time(self) -> datetime | None:
        """Time when service at this visit completes."""
        if self.arrival_time is None:
            return None
        return self.arrival_time + self.service_duration

Piggyback Shadow Variable

For variables that should be updated at the same time as another shadow variable:

from solverforge_legacy.solver.domain import PiggybackShadowVariable

@planning_entity
@dataclass
class Visit:
    arrival_time: Annotated[
        datetime | None,
        CascadingUpdateShadowVariable(target_method_name="update_times")
    ] = field(default=None)

    # Updated by the same method as arrival_time
    departure_time: Annotated[
        datetime | None,
        PiggybackShadowVariable(shadow_variable_name="arrival_time")
    ] = field(default=None)

    def update_times(self):
        # Update both arrival_time and departure_time
        if self.vehicle is None:
            self.arrival_time = None
            self.departure_time = None
        else:
            self.arrival_time = self.calculate_arrival()
            self.departure_time = self.arrival_time + self.service_duration

Complete Vehicle Routing Example

from dataclasses import dataclass, field
from typing import Annotated
from datetime import datetime, timedelta

from solverforge_legacy.solver.domain import (
    planning_entity,
    PlanningId,
    PlanningListVariable,
    InverseRelationShadowVariable,
    PreviousElementShadowVariable,
    NextElementShadowVariable,
    CascadingUpdateShadowVariable,
)


@dataclass
class Location:
    latitude: float
    longitude: float

    def driving_time_to(self, other: "Location") -> timedelta:
        # Simplified: assume 1 second per km
        distance = ((self.latitude - other.latitude)**2 +
                   (self.longitude - other.longitude)**2) ** 0.5
        return timedelta(seconds=int(distance * 1000))


@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    depot: Location
    departure_time: datetime
    capacity: int
    visits: Annotated[list["Visit"], PlanningListVariable] = field(default_factory=list)


@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    location: Location
    demand: int
    service_duration: timedelta
    ready_time: datetime    # Earliest arrival
    due_time: datetime      # Latest arrival

    # Shadow variables
    vehicle: Annotated[
        Vehicle | None,
        InverseRelationShadowVariable(source_variable_name="visits")
    ] = field(default=None)

    previous_visit: Annotated[
        "Visit | None",
        PreviousElementShadowVariable(source_variable_name="visits")
    ] = field(default=None)

    next_visit: Annotated[
        "Visit | None",
        NextElementShadowVariable(source_variable_name="visits")
    ] = field(default=None)

    arrival_time: Annotated[
        datetime | None,
        CascadingUpdateShadowVariable(target_method_name="update_arrival_time")
    ] = field(default=None)

    def update_arrival_time(self):
        if self.vehicle is None:
            self.arrival_time = None
            return

        if self.previous_visit is None:
            # First visit in route
            travel = self.vehicle.depot.driving_time_to(self.location)
            self.arrival_time = self.vehicle.departure_time + travel
        else:
            # After previous visit
            prev_departure = self.previous_visit.departure_time
            if prev_departure is None:
                self.arrival_time = None
                return
            travel = self.previous_visit.location.driving_time_to(self.location)
            self.arrival_time = prev_departure + travel

    @property
    def departure_time(self) -> datetime | None:
        if self.arrival_time is None:
            return None
        # Wait until ready_time if arriving early
        start = max(self.arrival_time, self.ready_time)
        return start + self.service_duration

    def is_late(self) -> bool:
        return self.arrival_time is not None and self.arrival_time > self.due_time

Shadow Variable Evaluation Order

Shadow variables are evaluated in dependency order:

  1. InverseRelationShadowVariable - First (depends only on list variable)
  2. PreviousElementShadowVariable - Second
  3. NextElementShadowVariable - Second
  4. IndexShadowVariable - Second
  5. CascadingUpdateShadowVariable - After their dependencies
  6. PiggybackShadowVariable - With their shadow source

Using Shadow Variables in Constraints

Shadow variables can be used in constraints just like regular properties:

def arrival_after_due_time(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Visit)
        .filter(lambda visit: visit.is_late())
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda visit: int((visit.arrival_time - visit.due_time).total_seconds())
        )
        .as_constraint("Arrival after due time")
    )

Best Practices

Do

  • Use InverseRelationShadowVariable when entities need to know their container
  • Use CascadingUpdateShadowVariable for calculated values like arrival times
  • Keep update methods simple and fast

Don’t

  • Create circular shadow variable dependencies
  • Do expensive calculations in update methods
  • Forget to handle None cases

Next Steps

3.5 - Pinning

Lock specific assignments to prevent the solver from changing them.

Pinning locks certain assignments so the solver cannot change them. This is useful for:

  • Preserving manual decisions
  • Locking in-progress or completed work
  • Incremental planning with fixed history

PlanningPin Annotation

Mark an entity as pinned using the PlanningPin annotation:

from dataclasses import dataclass, field
from typing import Annotated
from solverforge_legacy.solver.domain import (
    planning_entity,
    PlanningId,
    PlanningVariable,
    PlanningPin,
)

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    subject: str
    teacher: str
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)
    room: Annotated[Room | None, PlanningVariable] = field(default=None)
    # When True, solver won't change this lesson's assignments
    pinned: Annotated[bool, PlanningPin] = field(default=False)

When pinned=True, the solver will not modify timeslot or room for this lesson.

Setting Pinned State

At Problem Creation

lessons = [
    Lesson("1", "Math", "A. Turing", timeslot=monday_8am, room=room_a, pinned=True),  # Fixed
    Lesson("2", "Physics", "M. Curie", pinned=False),  # Solver will assign
]

Based on Time

Pin lessons that are already in progress or past:

from datetime import datetime

def create_problem(lessons: list[Lesson], current_time: datetime) -> Timetable:
    for lesson in lessons:
        if lesson.timeslot and lesson.timeslot.start_time <= current_time:
            lesson.pinned = True
    return Timetable(...)

Based on User Decisions

def pin_manual_assignments(lesson: Lesson, is_manual: bool):
    lesson.pinned = is_manual

PlanningPinToIndex for List Variables

For list variables (routing), you can pin elements up to a certain index:

from solverforge_legacy.solver.domain import PlanningPinToIndex

@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
    # Elements at index 0, 1, ..., (pinned_index-1) are pinned
    pinned_index: Annotated[int, PlanningPinToIndex] = field(default=0)

Example:

  • pinned_index=0 - No visits are pinned (all can be reordered)
  • pinned_index=3 - First 3 visits are locked in place
  • pinned_index=len(visits) - All visits are pinned

Updating Pinned Index

def update_pinned_for_in_progress(vehicle: Vehicle, current_time: datetime):
    """Pin visits that have already started."""
    pinned_count = 0
    for visit in vehicle.visits:
        if visit.arrival_time and visit.arrival_time <= current_time:
            pinned_count += 1
        else:
            break  # Stop at first unstarted visit
    vehicle.pinned_index = pinned_count

Use Cases

Continuous Planning

In continuous planning, pin the past and near future:

def prepare_for_replanning(solution: Schedule, current_time: datetime, buffer: timedelta):
    """
    Pin assignments that:
    - Have already started (in the past)
    - Are starting soon (within buffer time)
    """
    publish_deadline = current_time + buffer

    for shift in solution.shifts:
        if shift.start_time < publish_deadline:
            shift.pinned = True
        else:
            shift.pinned = False

Respecting User Decisions

def load_schedule_with_pins(raw_data) -> Schedule:
    shifts = []
    for data in raw_data:
        shift = Shift(
            id=data["id"],
            employee=find_employee(data["employee_id"]),
            pinned=data.get("manually_assigned", False)
        )
        shifts.append(shift)
    return Schedule(shifts=shifts)

Incremental Solving

Pin everything except new entities:

def add_new_lessons(solution: Timetable, new_lessons: list[Lesson]) -> Timetable:
    # Pin all existing lessons
    for lesson in solution.lessons:
        lesson.pinned = True

    # Add new lessons (unpinned)
    for lesson in new_lessons:
        lesson.pinned = False
        solution.lessons.append(lesson)

    return solution

Behavior Notes

Pinned Entities Still Affect Score

Pinned entities participate in constraint evaluation:

# This constraint still fires if a pinned lesson conflicts with an unpinned one
def room_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(Lesson, ...)
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Room conflict")
    )

Initialization

Pinned entities must have their planning variables already assigned:

# Correct: pinned entity has assigned values
Lesson("1", "Math", "Teacher", timeslot=slot, room=room, pinned=True)

# Incorrect: pinned entity without assignment (will cause issues)
Lesson("2", "Physics", "Teacher", timeslot=None, room=None, pinned=True)

Constraints with Pinning

You might want different constraint behavior for pinned vs unpinned:

def prefer_unpinned_over_pinned(factory: ConstraintFactory) -> Constraint:
    """If there's a conflict, prefer to move the unpinned lesson."""
    return (
        factory.for_each(Lesson)
        .filter(lambda lesson: lesson.pinned)
        .join(
            Lesson,
            Joiners.equal(lambda l: l.timeslot),
            Joiners.equal(lambda l: l.room),
            Joiners.filtering(lambda pinned, other: not other.pinned)
        )
        # Penalize the unpinned lesson in conflict
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Conflict with pinned lesson")
    )

Best Practices

Do

  • Pin entities that represent completed or in-progress work
  • Use PlanningPinToIndex for routing problems
  • Ensure pinned entities have valid assignments

Don’t

  • Pin too many entities (solver has less freedom)
  • Forget to unpin entities when requirements change
  • Create infeasible problems by pinning conflicting entities

Next Steps

4 - Constraints

Define constraints using the fluent Constraint Streams API.

Constraints define the rules that make a solution valid and optimal. SolverForge uses a fluent Constraint Streams API that lets you express constraints declaratively.

Topics

Constraint Types

TypePurposeExample
HardMust be satisfied for feasibilityNo two lessons in the same room at the same time
SoftPreferences to optimizeTeachers prefer consecutive lessons
MediumBetween hard and soft (optional)Important but not mandatory constraints

Example

from solverforge_legacy.solver.score import (
    constraint_provider, ConstraintFactory, Constraint, Joiners, HardSoftScore
)

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
    return [
        room_conflict(constraint_factory),
        teacher_conflict(constraint_factory),
        teacher_room_stability(constraint_factory),
    ]

def room_conflict(constraint_factory: ConstraintFactory) -> Constraint:
    # Hard constraint: No two lessons in the same room at the same time
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.timeslot),
            Joiners.equal(lambda lesson: lesson.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Room conflict")
    )

def teacher_room_stability(constraint_factory: ConstraintFactory) -> Constraint:
    # Soft constraint: Teachers prefer teaching in the same room
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.teacher),
        )
        .filter(lambda lesson1, lesson2: lesson1.room != lesson2.room)
        .penalize(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher room stability")
    )

4.1 - Constraint Streams

Build constraints using the fluent Constraint Streams API.

The Constraint Streams API is a fluent, declarative way to define constraints. It’s inspired by Java Streams and SQL, allowing you to express complex scoring logic concisely.

Basic Structure

Every constraint follows this pattern:

from solverforge_legacy.solver.score import (
    constraint_provider, ConstraintFactory, Constraint, HardSoftScore
)

@constraint_provider
def define_constraints(factory: ConstraintFactory) -> list[Constraint]:
    return [
        my_constraint(factory),
    ]

def my_constraint(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(MyEntity)        # 1. Select entities
        .filter(lambda e: e.is_active)    # 2. Filter matches
        .penalize(HardSoftScore.ONE_HARD) # 3. Apply score impact
        .as_constraint("My constraint")   # 4. Name the constraint
    )

Stream Types

Streams are typed by the number of entities they carry:

Stream TypeEntitiesExample Use
UniConstraintStream1Single entity constraints
BiConstraintStream2Pair constraints
TriConstraintStream3Triple constraints
QuadConstraintStream4Quad constraints

Starting a Stream

for_each()

Start with all instances of an entity class:

factory.for_each(Lesson)
# Stream of: Lesson1, Lesson2, Lesson3, ...

for_each_unique_pair()

Get all unique pairs (no duplicates, no self-pairs):

factory.for_each_unique_pair(Lesson)
# Stream of: (L1,L2), (L1,L3), (L2,L3), ...
# NOT: (L1,L1), (L2,L1), ...

With joiners for efficient filtering:

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),
    Joiners.equal(lambda l: l.room),
)
# Only pairs with same timeslot AND same room

for_each_including_unassigned()

Include entities with unassigned planning variables:

factory.for_each_including_unassigned(Lesson)
# Includes lessons where timeslot=None or room=None

Filtering

filter()

Remove non-matching items:

factory.for_each(Lesson)
.filter(lambda lesson: lesson.teacher == "A. Turing")

For bi-streams:

factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.room != l2.room)

Joining

join()

Combine streams:

factory.for_each(Lesson)
.join(Room)
# BiStream of (Lesson, Room) for all combinations

With joiners:

factory.for_each(Lesson)
.join(
    Room,
    Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)
# BiStream of (Lesson, Room) where lesson.room == room

See Joiners for available joiner types.

if_exists() / if_not_exists()

Check for existence without creating pairs:

# Lessons that have at least one other lesson in the same room
factory.for_each(Lesson)
.if_exists(
    Lesson,
    Joiners.equal(lambda l: l.room),
    Joiners.filtering(lambda l1, l2: l1.id != l2.id)
)
# Employees not assigned to any shift
factory.for_each(Employee)
.if_not_exists(
    Shift,
    Joiners.equal(lambda emp: emp, lambda shift: shift.employee)
)

Grouping

group_by()

Aggregate entities:

from solverforge_legacy.solver.score import ConstraintCollectors

# Count lessons per teacher
factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.count()
)
# BiStream of (teacher, count)

Multiple collectors:

# Get count and list of lessons per teacher
factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.count(),
    ConstraintCollectors.to_list(lambda l: l)
)
# TriStream of (teacher, count, lesson_list)

See Collectors for available collector types.

Mapping

map()

Transform stream elements:

factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
# UniStream of teachers (with duplicates)

expand()

Add derived values:

factory.for_each(Lesson)
.expand(lambda lesson: lesson.duration_minutes)
# BiStream of (Lesson, duration)

distinct()

Remove duplicates:

factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
.distinct()
# UniStream of unique teachers

Scoring

penalize()

Apply negative score for matches:

# Hard constraint
.penalize(HardSoftScore.ONE_HARD)

# Soft constraint
.penalize(HardSoftScore.ONE_SOFT)

# Dynamic weight
.penalize(HardSoftScore.ONE_SOFT, lambda lesson: lesson.priority)

reward()

Apply positive score for matches:

# Reward preferred assignments
.reward(HardSoftScore.ONE_SOFT, lambda lesson: lesson.preference_score)

impact()

Apply positive or negative score based on value:

# Positive values reward, negative values penalize
.impact(HardSoftScore.ONE_SOFT, lambda l: l.score_impact)

Finalizing

as_constraint()

Name the constraint (required):

.as_constraint("Room conflict")

justify_with()

Add custom justification for score explanation:

.penalize(HardSoftScore.ONE_HARD)
.justify_with(lambda l1, l2, score: RoomConflictJustification(l1, l2, score))
.as_constraint("Room conflict")

indict_with()

Specify which entities to blame:

.penalize(HardSoftScore.ONE_HARD)
.indict_with(lambda l1, l2: [l1, l2])
.as_constraint("Room conflict")

Complete Examples

Room Conflict (Hard)

def room_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda l: l.timeslot),
            Joiners.equal(lambda l: l.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Room conflict")
    )

Teacher Room Stability (Soft)

def teacher_room_stability(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda l: l.teacher)
        )
        .filter(lambda l1, l2: l1.room != l2.room)
        .penalize(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher room stability")
    )

Balance Workload (Soft)

def balance_workload(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Shift)
        .group_by(
            lambda shift: shift.employee,
            ConstraintCollectors.count()
        )
        .filter(lambda employee, count: count > 5)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda employee, count: count - 5  # Penalize excess shifts
        )
        .as_constraint("Balance workload")
    )

Best Practices

Do

  • Use joiners in for_each_unique_pair() for efficiency
  • Name constraints descriptively
  • Break complex constraints into helper functions

Don’t

  • Use filter() when a joiner would work (less efficient)
  • Create overly complex single constraints (split them)
  • Forget to call as_constraint()

Next Steps

4.2 - Joiners

Efficiently filter and match entities in constraint streams.

Joiners efficiently filter pairs of entities during joins and unique pair operations. They’re more efficient than post-join filtering because they use indexing.

Basic Usage

from solverforge_legacy.solver.score import Joiners

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda lesson: lesson.timeslot),
    Joiners.equal(lambda lesson: lesson.room),
)

Multiple joiners are combined with AND logic.

Available Joiners

equal()

Match when property values are equal:

# Same timeslot
Joiners.equal(lambda lesson: lesson.timeslot)

# In a join, specify both sides
factory.for_each(Lesson).join(
    Room,
    Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)

less_than() / less_than_or_equal()

Match when first value is less than second:

# l1.priority < l2.priority
Joiners.less_than(lambda lesson: lesson.priority)

# l1.start_time <= l2.start_time
Joiners.less_than_or_equal(lambda lesson: lesson.start_time)

greater_than() / greater_than_or_equal()

Match when first value is greater than second:

# l1.priority > l2.priority
Joiners.greater_than(lambda lesson: lesson.priority)

# l1.end_time >= l2.end_time
Joiners.greater_than_or_equal(lambda lesson: lesson.end_time)

overlapping()

Match when ranges overlap:

# Time overlap: [start1, end1) overlaps [start2, end2)
Joiners.overlapping(
    lambda l: l.start_time,   # Start of range 1
    lambda l: l.end_time,     # End of range 1
    lambda l: l.start_time,   # Start of range 2
    lambda l: l.end_time,     # End of range 2
)

For a join between different types:

factory.for_each(Meeting).join(
    Availability,
    Joiners.overlapping(
        lambda m: m.start_time,
        lambda m: m.end_time,
        lambda a: a.start_time,
        lambda a: a.end_time,
    )
)

filtering()

Custom filter function (less efficient, use as last resort):

# Custom logic that can't be expressed with other joiners
Joiners.filtering(lambda l1, l2: l1.is_compatible_with(l2))

Combining Joiners

Joiners are combined with AND:

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),    # Same timeslot AND
    Joiners.equal(lambda l: l.room),         # Same room
)

Performance Considerations

Index-Based Joiners (Preferred)

These joiners use internal indexes for O(1) or O(log n) lookup:

  • equal() - Hash index
  • less_than(), greater_than() - Tree index
  • overlapping() - Interval tree

Filtering Joiner (Slower)

filtering() checks every pair, O(n²):

# Avoid when possible - checks all pairs
Joiners.filtering(lambda l1, l2: some_complex_check(l1, l2))

Optimization Tips

Good: Index joiners first, filtering last:

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),       # Index first
    Joiners.filtering(lambda l1, l2: custom(l1, l2))  # Filter remaining
)

Bad: Only filtering (checks all pairs):

factory.for_each_unique_pair(
    Lesson,
    Joiners.filtering(lambda l1, l2: l1.timeslot == l2.timeslot and custom(l1, l2))
)

Examples

Time Conflict Detection

def time_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Shift,
            Joiners.equal(lambda s: s.employee),
            Joiners.overlapping(
                lambda s: s.start_time,
                lambda s: s.end_time,
                lambda s: s.start_time,
                lambda s: s.end_time,
            ),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Employee time conflict")
    )

Same Day Sequential

def same_day_sequential(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Lesson)
        .join(
            Lesson,
            Joiners.equal(lambda l: l.teacher),
            Joiners.equal(lambda l: l.timeslot.day_of_week),
            Joiners.less_than(lambda l: l.timeslot.start_time),
            Joiners.filtering(lambda l1, l2:
                (l2.timeslot.start_time - l1.timeslot.end_time).seconds <= 1800
            ),
        )
        .reward(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher consecutive lessons")
    )

Resource Assignment

def resource_assignment(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Task)
        .join(
            Resource,
            Joiners.equal(lambda t: t.required_skill, lambda r: r.skill),
            Joiners.greater_than_or_equal(lambda t: t.priority, lambda r: r.min_priority),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Resource skill match")
    )

Joiner vs Filter

Use Joiner WhenUse Filter When
Checking equalityComplex logic
Comparing valuesMultiple conditions with OR
Range overlapCalling methods on entities
Performance mattersSimple one-off checks

Next Steps

4.3 - Collectors

Aggregate data in constraint streams using collectors.

Collectors aggregate data when grouping entities. They’re used with group_by() to compute counts, sums, lists, and other aggregations.

Basic Usage

from solverforge_legacy.solver.score import ConstraintCollectors

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,       # Group key
    ConstraintCollectors.count()        # Collector
)
# Result: BiStream of (employee, count)

Available Collectors

count()

Count items in each group:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.count()
)
# (Employee, int)

count_distinct()

Count unique values:

factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.count_distinct(lambda l: l.room)
)
# (Teacher, number of distinct rooms)

sum()

Sum numeric values:

factory.for_each(Visit)
.group_by(
    lambda visit: visit.vehicle,
    ConstraintCollectors.sum(lambda v: v.demand)
)
# (Vehicle, total demand)

min() / max()

Find minimum or maximum:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.min(lambda s: s.start_time)
)
# (Employee, earliest start time)

With comparator:

ConstraintCollectors.max(
    lambda shift: shift,
    key=lambda s: s.priority
)
# Returns the shift with highest priority

average()

Calculate average:

factory.for_each(Task)
.group_by(
    lambda task: task.worker,
    ConstraintCollectors.average(lambda t: t.duration)
)
# (Worker, average task duration)

to_list()

Collect into a list:

factory.for_each(Visit)
.group_by(
    lambda visit: visit.vehicle,
    ConstraintCollectors.to_list(lambda v: v)
)
# (Vehicle, list of visits)

to_set()

Collect into a set (unique values):

factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.to_set(lambda l: l.room)
)
# (Teacher, set of rooms)

to_sorted_set()

Collect into a sorted set:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.to_sorted_set(lambda s: s.start_time)
)
# (Employee, sorted set of start times)

compose()

Combine multiple collectors:

ConstraintCollectors.compose(
    ConstraintCollectors.count(),
    ConstraintCollectors.sum(lambda s: s.hours),
    lambda count, total_hours: (count, total_hours)
)
# Returns (count, sum) tuple

conditional()

Collect only matching items:

ConstraintCollectors.conditional(
    lambda shift: shift.is_night,
    ConstraintCollectors.count()
)
# Count only night shifts

Multiple Collectors

Use multiple collectors in one group_by:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.count(),
    ConstraintCollectors.sum(lambda s: s.hours),
    ConstraintCollectors.min(lambda s: s.start_time),
)
# QuadStream: (Employee, count, total_hours, earliest_start)

Grouping Patterns

Count Per Category

def balance_shift_count(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Shift)
        .group_by(
            lambda shift: shift.employee,
            ConstraintCollectors.count()
        )
        .filter(lambda employee, count: count > 5)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda employee, count: (count - 5) ** 2
        )
        .as_constraint("Balance shift count")
    )

Sum with Threshold

def vehicle_capacity(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Visit)
        .group_by(
            lambda visit: visit.vehicle,
            ConstraintCollectors.sum(lambda v: v.demand)
        )
        .filter(lambda vehicle, total: total > vehicle.capacity)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda vehicle, total: total - vehicle.capacity
        )
        .as_constraint("Vehicle capacity")
    )

Load Distribution

def fair_distribution(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Task)
        .group_by(
            lambda task: task.worker,
            ConstraintCollectors.count()
        )
        .group_by(
            ConstraintCollectors.min(lambda worker, count: count),
            ConstraintCollectors.max(lambda worker, count: count),
        )
        .filter(lambda min_count, max_count: max_count - min_count > 2)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda min_count, max_count: max_count - min_count
        )
        .as_constraint("Fair task distribution")
    )

Consecutive Detection

def consecutive_shifts(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Shift)
        .group_by(
            lambda shift: shift.employee,
            ConstraintCollectors.to_sorted_set(lambda s: s.date)
        )
        .filter(lambda employee, dates: has_consecutive_days(dates, 6))
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Max consecutive days")
    )

def has_consecutive_days(dates: set, max_consecutive: int) -> bool:
    sorted_dates = sorted(dates)
    consecutive = 1
    for i in range(1, len(sorted_dates)):
        if (sorted_dates[i] - sorted_dates[i-1]).days == 1:
            consecutive += 1
            if consecutive > max_consecutive:
                return True
        else:
            consecutive = 1
    return False

Performance Tips

Prefer count() over to_list()

# Good: Efficient counting
ConstraintCollectors.count()

# Avoid: Creates list just to count
ConstraintCollectors.to_list(lambda x: x).map(len)

Use conditional() for Filtered Counts

# Good: Single pass
ConstraintCollectors.conditional(
    lambda s: s.is_weekend,
    ConstraintCollectors.count()
)

# Avoid: Filter then count
factory.for_each(Shift)
.filter(lambda s: s.is_weekend)
.group_by(...)

Minimize Data in Collectors

# Good: Collect only needed data
ConstraintCollectors.to_list(lambda s: s.start_time)

# Avoid: Collect entire objects
ConstraintCollectors.to_list(lambda s: s)

Next Steps

4.4 - Score Types

Choose the right score type for your constraints.

Score types determine how constraint violations and rewards are measured. Choose the type that matches your problem’s structure.

Available Score Types

Score TypeLevelsUse Case
SimpleScore1Single optimization objective
HardSoftScore2Feasibility + optimization
HardMediumSoftScore3Hard + important + nice-to-have
BendableScoreNCustom number of levels
*DecimalScore variants-Decimal precision

SimpleScore

For single-objective optimization:

from solverforge_legacy.solver.score import SimpleScore

# In domain model
score: Annotated[SimpleScore, PlanningScore] = field(default=None)

# In constraints
.penalize(SimpleScore.ONE)
.reward(SimpleScore.of(10))

Use when: You only need to maximize or minimize one thing (e.g., total profit, total distance).

HardSoftScore

The most common type—separates feasibility from optimization:

from solverforge_legacy.solver.score import HardSoftScore

# In domain model
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

# In constraints
.penalize(HardSoftScore.ONE_HARD)     # Broken constraint
.penalize(HardSoftScore.ONE_SOFT)     # Suboptimal
.penalize(HardSoftScore.of_hard(5))   # Weighted hard
.penalize(HardSoftScore.of_soft(10))  # Weighted soft

Hard constraints:

  • Must be satisfied for a feasible solution
  • Score format: Xhard/Ysoft
  • 0hard/*soft = feasible

Soft constraints:

  • Preferences to optimize
  • Better soft scores are preferred among feasible solutions

Use when: You have rules that must be followed AND preferences to optimize.

HardMediumSoftScore

Three levels of priority:

from solverforge_legacy.solver.score import HardMediumSoftScore

# In domain model
score: Annotated[HardMediumSoftScore, PlanningScore] = field(default=None)

# In constraints
.penalize(HardMediumSoftScore.ONE_HARD)    # Must satisfy
.penalize(HardMediumSoftScore.ONE_MEDIUM)  # Important preference
.penalize(HardMediumSoftScore.ONE_SOFT)    # Nice to have

Use when:

  • Medium = “Assign as many as possible”
  • Medium = “Important but not mandatory”
  • Medium = “Prefer over soft, but not as critical as hard”

Example: Meeting scheduling where:

  • Hard: Required attendees must be available
  • Medium: Preferred attendees should attend
  • Soft: Room size preferences

BendableScore

Custom number of hard and soft levels:

from solverforge_legacy.solver.score import BendableScore

# Configure levels (3 hard, 2 soft)
score: Annotated[BendableScore, PlanningScore] = field(default=None)

# In constraints
.penalize(BendableScore.of_hard(0, 1))   # First hard level
.penalize(BendableScore.of_hard(1, 1))   # Second hard level
.penalize(BendableScore.of_soft(0, 1))   # First soft level

Use when: You need more than 3 priority levels.

Decimal Score Variants

For precise calculations:

from solverforge_legacy.solver.score import HardSoftDecimalScore

score: Annotated[HardSoftDecimalScore, PlanningScore] = field(default=None)

# In constraints
from decimal import Decimal
.penalize(HardSoftDecimalScore.of_soft(Decimal("0.01")))

Available variants:

  • SimpleDecimalScore
  • HardSoftDecimalScore
  • HardMediumSoftDecimalScore
  • BendableDecimalScore

Use when: Integer scores aren’t precise enough (e.g., money, distances).

Score Constants

Common score values are predefined:

# SimpleScore
SimpleScore.ZERO
SimpleScore.ONE
SimpleScore.of(n)

# HardSoftScore
HardSoftScore.ZERO
HardSoftScore.ONE_HARD
HardSoftScore.ONE_SOFT
HardSoftScore.of_hard(n)
HardSoftScore.of_soft(n)
HardSoftScore.of(hard, soft)

# HardMediumSoftScore
HardMediumSoftScore.ZERO
HardMediumSoftScore.ONE_HARD
HardMediumSoftScore.ONE_MEDIUM
HardMediumSoftScore.ONE_SOFT
HardMediumSoftScore.of(hard, medium, soft)

Dynamic Weights

Apply weights based on entity properties:

def weighted_penalty(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Task)
        .filter(lambda t: t.is_late())
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda task: task.priority  # High priority = bigger penalty
        )
        .as_constraint("Late task")
    )

Score Comparison

Scores are compared level by level:

# Hard first, then soft
0hard/-100soft > -1hard/0soft    (first is feasible)
-1hard/-50soft > -2hard/-10soft  (first has better hard)
0hard/-50soft > 0hard/-100soft   (same hard, better soft)

Score Properties

score = HardSoftScore.of(-2, -100)

score.is_feasible          # False (hard < 0)
score.hard_score           # -2
score.soft_score           # -100
str(score)                 # "-2hard/-100soft"

HardSoftScore.parse("-2hard/-100soft")  # Parse from string

Choosing a Score Type

QuestionRecommendation
Need feasibility check?Use HardSoftScore
Single objective only?Use SimpleScore
“Assign as many as possible”?Use HardMediumSoftScore
More than 3 priority levels?Use BendableScore
Need decimal precision?Use *DecimalScore variant

Best Practices

Do

  • Use HardSoftScore as default choice
  • Keep hard constraints truly hard (legal requirements, physical limits)
  • Use consistent weight scales within each level

Don’t

  • Use medium level for actual hard constraints
  • Over-complicate with BendableScore when HardMediumSoftScore works
  • Mix units in the same level (e.g., minutes and dollars)

Next Steps

4.5 - Score Analysis

Understand why a solution has its score.

Score analysis helps you understand why a solution received its score. This is essential for debugging constraints and explaining results to users.

SolutionManager

Use SolutionManager to analyze solutions:

from solverforge_legacy.solver import SolverFactory, SolutionManager

solver_factory = SolverFactory.create(solver_config)
solution_manager = SolutionManager.create(solver_factory)

# Analyze a solution
analysis = solution_manager.analyze(solution)

Score Explanation

Get a breakdown of constraint scores:

analysis = solution_manager.analyze(solution)

# Overall score
print(f"Score: {analysis.score}")

# Per-constraint breakdown
for constraint_analysis in analysis.constraint_analyses():
    print(f"{constraint_analysis.constraint_name}: {constraint_analysis.score}")
    print(f"  Match count: {constraint_analysis.match_count}")

Example output:

Score: -2hard/-15soft
Room conflict: -2hard
  Match count: 2
Teacher room stability: -10soft
  Match count: 10
Teacher time efficiency: -5soft
  Match count: 5

Constraint Matches

See exactly which entities triggered each constraint:

for constraint_analysis in analysis.constraint_analyses():
    print(f"\n{constraint_analysis.constraint_name}:")
    for match in constraint_analysis.matches():
        print(f"  Match: {match.justification}")
        print(f"  Score: {match.score}")

Indictments

Find which entities are causing problems:

# Get indictments (entities blamed for score impact)
for indictment in analysis.indictments():
    print(f"\nEntity: {indictment.indicted_object}")
    print(f"Total score impact: {indictment.score}")
    for match in indictment.matches():
        print(f"  - {match.constraint_name}: {match.score}")

Example output:

Entity: Lesson(id=1, subject='Math')
Total score impact: -1hard/-3soft
  - Room conflict: -1hard
  - Teacher room stability: -3soft

Custom Justifications

Add explanations to your constraints:

@dataclass
class RoomConflictJustification:
    lesson1: Lesson
    lesson2: Lesson
    timeslot: Timeslot
    room: Room

    def __str__(self):
        return (f"{self.lesson1.subject} and {self.lesson2.subject} "
                f"both scheduled in {self.room} at {self.timeslot}")

def room_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda l: l.timeslot),
            Joiners.equal(lambda l: l.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .justify_with(lambda l1, l2, score: RoomConflictJustification(
            l1, l2, l1.timeslot, l1.room
        ))
        .as_constraint("Room conflict")
    )

Debugging Constraints

Verify Score Calculation

# Calculate score without solving
score = solution_manager.update(solution)
print(f"Calculated score: {score}")

Find Missing Constraints

If a constraint isn’t firing when expected:

# Check if specific entities match
for constraint_analysis in analysis.constraint_analyses():
    if constraint_analysis.constraint_name == "Room conflict":
        if constraint_analysis.match_count == 0:
            print("No room conflicts detected!")
            # Check your joiners and filters

Verify Feasibility

if not solution.score.is_feasible:
    print("Solution is infeasible!")
    for ca in analysis.constraint_analyses():
        if ca.score.hard_score < 0:
            print(f"Hard constraint broken: {ca.constraint_name}")
            for match in ca.matches():
                print(f"  {match.justification}")

Integration with FastAPI

Expose score analysis in your API:

from fastapi import FastAPI

@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
    solution = solutions.get(job_id)
    if not solution:
        raise HTTPException(404, "Job not found")

    analysis = solution_manager.analyze(solution)

    return {
        "score": str(analysis.score),
        "is_feasible": analysis.score.is_feasible,
        "constraints": [
            {
                "name": ca.constraint_name,
                "score": str(ca.score),
                "match_count": ca.match_count,
            }
            for ca in analysis.constraint_analyses()
        ]
    }

Best Practices

Do

  • Use justify_with() for user-facing explanations
  • Check score analysis when debugging constraints
  • Expose score breakdown in your UI

Don’t

  • Analyze every solution during solving (performance)
  • Ignore indictments when troubleshooting
  • Forget to handle infeasible solutions

Score Comparison

Compare two solutions:

def compare_solutions(old: Timetable, new: Timetable):
    old_analysis = solution_manager.analyze(old)
    new_analysis = solution_manager.analyze(new)

    print(f"Score improved: {old.score} -> {new.score}")

    old_constraints = {ca.constraint_name: ca for ca in old_analysis.constraint_analyses()}
    new_constraints = {ca.constraint_name: ca for ca in new_analysis.constraint_analyses()}

    for name in old_constraints:
        old_ca = old_constraints[name]
        new_ca = new_constraints.get(name)
        if new_ca and old_ca.score != new_ca.score:
            print(f"  {name}: {old_ca.score} -> {new_ca.score}")

Next Steps

4.6 - Constraint Performance

Optimize constraint evaluation for faster solving.

Efficient constraint evaluation is critical for solver performance. Most solving time is spent calculating scores, so optimizing constraints has a direct impact on solution quality.

Performance Principles

1. Use Joiners Instead of Filters

Joiners use indexes for O(1) lookups. Filters check every item.

# Good: Uses index
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot)
)

# Bad: Checks all pairs
factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.timeslot == l2.timeslot)

2. Put Selective Joiners First

More selective joiners reduce the search space faster:

# Good: timeslot has few values, filters early
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),  # Few timeslots
    Joiners.equal(lambda l: l.teacher),    # More teachers
)

# Less efficient: teacher might have many values
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.teacher),    # Many teachers
    Joiners.equal(lambda l: l.timeslot),   # Then timeslot
)

3. Avoid Expensive Lambda Operations

# Good: Simple property access
Joiners.equal(lambda l: l.timeslot)

# Bad: Complex calculation in joiner
Joiners.equal(lambda l: calculate_complex_hash(l))

4. Use Cached Properties

@planning_entity
@dataclass
class Lesson:
    # Pre-calculate expensive values
    @cached_property
    def combined_key(self):
        return (self.timeslot, self.room)

# Use cached property in constraint
Joiners.equal(lambda l: l.combined_key)

Common Optimizations

Replace for_each + filter with for_each_unique_pair

# Before: Inefficient
factory.for_each(Lesson)
.join(Lesson)
.filter(lambda l1, l2: l1.id != l2.id and l1.timeslot == l2.timeslot)

# After: Efficient
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot)
)

Use if_exists() Instead of Join + group_by

# Before: Creates pairs then groups
factory.for_each(Employee)
.join(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))
.group_by(lambda e, s: e, ConstraintCollectors.count())
.filter(lambda e, count: count > 0)

# After: Just checks existence
factory.for_each(Employee)
.if_exists(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))

Avoid Redundant Constraints

# Redundant: Two constraints that overlap
def constraint1(factory):
    # Penalizes A and B in same room
    ...

def constraint2(factory):
    # Penalizes A and B in same room and same timeslot
    ...  # This overlaps with constraint1!

# Better: One specific constraint
def room_conflict(factory):
    # Only penalizes same room AND same timeslot
    factory.for_each_unique_pair(
        Lesson,
        Joiners.equal(lambda l: l.timeslot),
        Joiners.equal(lambda l: l.room),
    )

Limit Collection Sizes in Collectors

# Bad: Collects everything
ConstraintCollectors.to_list(lambda s: s)

# Better: Collect only what's needed
ConstraintCollectors.to_list(lambda s: s.start_time)

# Best: Use aggregate if possible
ConstraintCollectors.count()

Incremental Score Calculation

SolverForge uses incremental score calculation—only recalculating affected constraints when a move is made. Help this work efficiently:

Keep Constraints Independent

# Good: Constraints don't share state
def room_conflict(factory):
    return factory.for_each_unique_pair(...)

def teacher_conflict(factory):
    return factory.for_each_unique_pair(...)

# Bad: Shared calculation affects both
shared_data = calculate_once()  # Recalculated on every change!

Avoid Global State

# Bad: References external data
external_config = load_config()

def my_constraint(factory):
    return factory.for_each(Lesson)
    .filter(lambda l: l.priority > external_config.threshold)  # External ref

Benchmarking Constraints

Enable Debug Logging

import logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)

Time Individual Constraints

import time

def timed_constraint(factory):
    start = time.time()
    result = actual_constraint(factory)
    print(f"Constraint built in {time.time() - start:.3f}s")
    return result

Use the Benchmarker

For systematic comparison, use the Benchmarker (see Benchmarking).

Score Corruption Detection

Enable environment mode for debugging:

from solverforge_legacy.solver.config import EnvironmentMode

SolverConfig(
    environment_mode=EnvironmentMode.FULL_ASSERT,  # Detects score corruption
    ...
)

Modes:

  • NON_REPRODUCIBLE - Fastest, no checks
  • REPRODUCIBLE - Deterministic but no validation
  • FAST_ASSERT - Quick validation checks
  • FULL_ASSERT - Complete validation (slowest)

Use FULL_ASSERT during development, REPRODUCIBLE or NON_REPRODUCIBLE in production.

Common Performance Issues

SymptomLikely CauseSolution
Very slow startComplex constraint buildingSimplify or cache
Slow throughoutFilter instead of joinerUse joiners
Memory issuesLarge collectionsUse aggregates
Score corruptionIncorrect incremental calcEnable FULL_ASSERT

Next Steps

4.7 - Testing Constraints

Test constraints in isolation for correctness.

Testing constraints ensures they behave correctly before integrating with the full solver. This catches bugs early and documents expected behavior.

Basic Constraint Testing

Test individual constraints with minimal data:

import pytest
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
from datetime import time

from my_app.domain import Timetable, Timeslot, Room, Lesson
from my_app.constraints import define_constraints


@pytest.fixture
def solution_manager():
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(spent_limit=Duration(seconds=1))
    )
    factory = SolverFactory.create(config)
    return SolutionManager.create(factory)


def test_room_conflict(solution_manager):
    """Two lessons in the same room at the same time should penalize."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    # Two lessons in same room and timeslot
    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should have -1 hard for the room conflict
    assert analysis.score.hard_score == -1


def test_no_room_conflict(solution_manager):
    """Two lessons in different rooms should not conflict."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room_a = Room("Room A")
    room_b = Room("Room B")

    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room_a)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room_b)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room_a, room_b],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should have no hard constraint violations
    assert analysis.score.hard_score == 0

Testing Constraint Weight

Verify the magnitude of penalties:

def test_teacher_room_stability_weight(solution_manager):
    """Teacher using multiple rooms should incur soft penalty per extra room."""
    timeslot1 = Timeslot("MONDAY", time(8, 30), time(9, 30))
    timeslot2 = Timeslot("MONDAY", time(9, 30), time(10, 30))
    room_a = Room("Room A")
    room_b = Room("Room B")

    # Same teacher, different rooms
    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot1, room_a)
    lesson2 = Lesson("2", "Math", "Teacher A", "Group 2", timeslot2, room_b)

    problem = Timetable(
        id="test",
        timeslots=[timeslot1, timeslot2],
        rooms=[room_a, room_b],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should have soft penalty for room instability
    assert analysis.score.soft_score < 0

    # Verify specific constraint triggered
    room_stability = next(
        ca for ca in analysis.constraint_analyses()
        if ca.constraint_name == "Teacher room stability"
    )
    assert room_stability.match_count == 1

Testing with Fixtures

Create reusable test fixtures:

@pytest.fixture
def timeslots():
    return [
        Timeslot("MONDAY", time(8, 30), time(9, 30)),
        Timeslot("MONDAY", time(9, 30), time(10, 30)),
        Timeslot("TUESDAY", time(8, 30), time(9, 30)),
    ]


@pytest.fixture
def rooms():
    return [Room("A"), Room("B"), Room("C")]


@pytest.fixture
def empty_problem(timeslots, rooms):
    return Timetable(
        id="test",
        timeslots=timeslots,
        rooms=rooms,
        lessons=[]
    )


def test_empty_problem_is_feasible(solution_manager, empty_problem):
    """Empty problem should have zero score."""
    analysis = solution_manager.analyze(empty_problem)
    assert analysis.score == HardSoftScore.ZERO

Testing Edge Cases

Unassigned Variables

def test_unassigned_lesson(solution_manager):
    """Unassigned lessons should not cause conflicts."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    # One assigned, one not
    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", None, None)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should not have room conflict (lesson2 is unassigned)
    assert analysis.score.hard_score == 0

Multiple Violations

def test_multiple_conflicts(solution_manager):
    """Three lessons in same room/time should create multiple conflicts."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    lesson1 = Lesson("1", "Math", "A", "G1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "B", "G2", timeslot, room)
    lesson3 = Lesson("3", "Chemistry", "C", "G3", timeslot, room)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2, lesson3]
    )

    analysis = solution_manager.analyze(problem)

    # 3 lessons create 3 unique pairs: (1,2), (1,3), (2,3)
    assert analysis.score.hard_score == -3

Feasibility Testing

Test that the solver can find a feasible solution:

def test_feasible_solution():
    """Solver should find a feasible solution for small problems."""
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(spent_limit=Duration(seconds=5))
    )

    factory = SolverFactory.create(config)
    solver = factory.build_solver()

    problem = generate_small_problem()
    solution = solver.solve(problem)

    assert solution.score.is_feasible, f"Solution infeasible: {solution.score}"

Parameterized Tests

Test multiple scenarios efficiently:

@pytest.mark.parametrize("num_lessons,expected_conflicts", [
    (1, 0),  # Single lesson: no conflicts
    (2, 1),  # Two lessons: one pair
    (3, 3),  # Three lessons: three pairs
    (4, 6),  # Four lessons: six pairs
])
def test_all_in_same_room_timeslot(solution_manager, num_lessons, expected_conflicts):
    """n lessons in same room/time should create n*(n-1)/2 conflicts."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    lessons = [
        Lesson(str(i), f"Subject{i}", f"Teacher{i}", "Group", timeslot, room)
        for i in range(num_lessons)
    ]

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=lessons
    )

    analysis = solution_manager.analyze(problem)
    assert analysis.score.hard_score == -expected_conflicts

Testing Justifications

def test_constraint_justification(solution_manager):
    """Constraint should provide meaningful justification."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    room_conflict_ca = next(
        ca for ca in analysis.constraint_analyses()
        if ca.constraint_name == "Room conflict"
    )

    match = next(room_conflict_ca.matches())
    assert "Room A" in str(match.justification)
    assert "MONDAY" in str(match.justification)

Best Practices

Do

  • Test each constraint in isolation
  • Test both positive and negative cases
  • Test edge cases (empty, unassigned, maximum)
  • Use descriptive test names

Don’t

  • Skip constraint testing
  • Only test happy paths
  • Use production data sizes in unit tests
  • Ignore constraint weights

Next Steps

5 - Solver

Configure and run the solver to find optimal solutions.

The solver is the engine that finds optimal solutions to your planning problems. This section covers how to configure and run it.

Topics

Quick Example

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)

# Configure the solver
solver_config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(
        spent_limit=Duration(seconds=30)
    )
)

# Create and run the solver
solver_factory = SolverFactory.create(solver_config)
solver = solver_factory.build_solver()

problem = load_problem()  # Your problem data
solution = solver.solve(problem)

print(f"Best score: {solution.score}")

Termination

The solver needs to know when to stop. Common termination conditions:

ConditionDescription
spent_limitStop after a time limit (e.g., 30 seconds)
best_score_limitStop when a target score is reached
unimproved_spent_limitStop if no improvement for a duration
step_count_limitStop after a number of optimization steps

5.1 - Solver Configuration

Configure the solver with SolverConfig and related classes.

Configure the solver using Python dataclasses. This defines what to solve, how to score, and when to stop.

SolverConfig

The main configuration class:

from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

solver_config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(
        spent_limit=Duration(seconds=30)
    ),
)

Required Fields

FieldDescription
solution_classThe @planning_solution class
entity_class_listList of @planning_entity classes
score_director_factory_configHow to calculate scores

Optional Fields

FieldDescriptionDefault
termination_configWhen to stopNever (manual termination)
environment_modeValidation levelREPRODUCIBLE
random_seedFor reproducibilityRandom

ScoreDirectorFactoryConfig

Configures constraint evaluation:

ScoreDirectorFactoryConfig(
    constraint_provider_function=define_constraints
)

With Constraint Provider

from my_app.constraints import define_constraints

ScoreDirectorFactoryConfig(
    constraint_provider_function=define_constraints
)

TerminationConfig

Controls when the solver stops:

Time Limit

TerminationConfig(
    spent_limit=Duration(seconds=30)
)

# Other duration units
Duration(minutes=5)
Duration(hours=1)
Duration(milliseconds=500)

Score Target

Stop when a target score is reached:

TerminationConfig(
    best_score_limit="0hard/-10soft"
)

Step Limit

Stop after a number of steps:

TerminationConfig(
    step_count_limit=10000
)

Unimproved Time

Stop if no improvement for a duration:

TerminationConfig(
    unimproved_spent_limit=Duration(seconds=30)
)

Combining Conditions

Multiple conditions use OR logic:

TerminationConfig(
    spent_limit=Duration(minutes=5),
    best_score_limit="0hard/0soft",  # OR achieves perfect
    unimproved_spent_limit=Duration(seconds=60)  # OR stuck
)

Environment Mode

Controls validation and reproducibility:

from solverforge_legacy.solver.config import EnvironmentMode

SolverConfig(
    environment_mode=EnvironmentMode.REPRODUCIBLE,
    ...
)
ModeDescriptionUse Case
NON_REPRODUCIBLEFastest, no validationProduction
REPRODUCIBLEDeterministic resultsDefault
FAST_ASSERTQuick validationTesting
FULL_ASSERTComplete validationDebugging

Debugging Score Corruption

Use FULL_ASSERT to detect score calculation bugs:

SolverConfig(
    environment_mode=EnvironmentMode.FULL_ASSERT,
    ...
)

This validates every score calculation but is slow.

Reproducibility

For reproducible results, set a random seed:

SolverConfig(
    random_seed=42,
    environment_mode=EnvironmentMode.REPRODUCIBLE,
    ...
)

Configuration Overrides

Override configuration when building a solver:

from solverforge_legacy.solver.config import SolverConfigOverride

solver_factory = SolverFactory.create(solver_config)

# Override termination for this solver instance
override = SolverConfigOverride(
    termination_config=TerminationConfig(spent_limit=Duration(seconds=10))
)
solver = solver_factory.build_solver(override)

Complete Example

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
    EnvironmentMode,
)

from my_app.domain import Timetable, Lesson
from my_app.constraints import define_constraints


def create_solver():
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(minutes=5),
            best_score_limit="0hard/0soft",
        ),
        environment_mode=EnvironmentMode.REPRODUCIBLE,
        random_seed=42,
    )

    factory = SolverFactory.create(config)
    return factory.build_solver()

Configuration Best Practices

Development

SolverConfig(
    environment_mode=EnvironmentMode.FULL_ASSERT,
    termination_config=TerminationConfig(spent_limit=Duration(seconds=10)),
    ...
)

Testing

SolverConfig(
    environment_mode=EnvironmentMode.REPRODUCIBLE,
    random_seed=42,  # Reproducible tests
    termination_config=TerminationConfig(spent_limit=Duration(seconds=5)),
    ...
)

Production

SolverConfig(
    environment_mode=EnvironmentMode.NON_REPRODUCIBLE,
    termination_config=TerminationConfig(
        spent_limit=Duration(minutes=5),
        unimproved_spent_limit=Duration(minutes=1),
    ),
    ...
)

Next Steps

5.2 - Running the Solver

Execute the solver synchronously with Solver.solve().

The simplest way to solve a problem is with Solver.solve(), which blocks until termination.

Basic Usage

from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)

# Configure
config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)

# Create factory and solver
factory = SolverFactory.create(config)
solver = factory.build_solver()

# Load problem
problem = load_problem()

# Solve (blocks until done)
solution = solver.solve(problem)

# Use solution
print(f"Score: {solution.score}")

Event Listeners

Monitor progress with event listeners:

from solverforge_legacy.solver import BestSolutionChangedEvent

def on_best_solution_changed(event: BestSolutionChangedEvent):
    print(f"New best score: {event.new_best_score}")
    print(f"Time spent: {event.time_spent}")

solver.add_event_listener(on_best_solution_changed)
solution = solver.solve(problem)

BestSolutionChangedEvent Properties

PropertyDescription
new_best_scoreThe new best score
new_best_solutionThe new best solution
time_spentDuration since solving started
is_new_best_solution_initializedTrue if all variables are assigned

Removing Listeners

solver.add_event_listener(listener)
# ... later ...
solver.remove_event_listener(listener)

Early Termination

Stop solving before the termination condition:

import threading

def timeout_termination(solver, timeout_seconds):
    """Terminate after timeout."""
    time.sleep(timeout_seconds)
    solver.terminate_early()

# Start termination thread
thread = threading.Thread(target=timeout_termination, args=(solver, 60))
thread.start()

solution = solver.solve(problem)

Manual Termination

# From another thread
solver.terminate_early()

# Check if termination was requested
if solver.is_terminate_early():
    print("Termination was requested")

Checking Solver State

# Is the solver currently running?
if solver.is_solving():
    print("Solver is running")

# Was early termination requested?
if solver.is_terminate_early():
    print("Termination requested")

Problem Changes (Real-Time)

Modify the problem while solving:

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange[Timetable]):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, working_solution: Timetable, score_director):
        # Add to working solution
        working_solution.lessons.append(self.lesson)
        # Notify score director
        score_director.after_entity_added(self.lesson)

# Add change while solving
new_lesson = Lesson("new", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))

See Real-Time Planning for more details.

Solver Reuse

Don’t reuse a solver instance—create a new one for each solve:

# Correct: New solver each time
solver1 = factory.build_solver()
solution1 = solver1.solve(problem1)

solver2 = factory.build_solver()
solution2 = solver2.solve(problem2)

# Incorrect: Reusing solver
solver = factory.build_solver()
solution1 = solver.solve(problem1)
solution2 = solver.solve(problem2)  # Don't do this!

Threading

Solver.solve() blocks the calling thread. For non-blocking operation, use:

  1. Background thread:

    thread = threading.Thread(target=lambda: solver.solve(problem))
    thread.start()
    
  2. SolverManager (recommended for production): See SolverManager

Error Handling

try:
    solution = solver.solve(problem)
except Exception as e:
    print(f"Solving failed: {e}")
    # Handle error (log, retry, etc.)

Complete Example

from solverforge_legacy.solver import SolverFactory, BestSolutionChangedEvent
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("solver")


def solve_timetable(problem: Timetable) -> Timetable:
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(minutes=5),
            unimproved_spent_limit=Duration(seconds=60),
        ),
    )

    factory = SolverFactory.create(config)
    solver = factory.build_solver()

    # Log progress
    def on_progress(event: BestSolutionChangedEvent):
        logger.info(f"Score: {event.new_best_score} at {event.time_spent}")

    solver.add_event_listener(on_progress)

    # Solve
    logger.info("Starting solver...")
    solution = solver.solve(problem)
    logger.info(f"Solving finished. Final score: {solution.score}")

    return solution


if __name__ == "__main__":
    problem = load_problem()
    solution = solve_timetable(problem)
    save_solution(solution)

Next Steps

5.3 - SolverManager

Manage concurrent and asynchronous solving jobs.

SolverManager handles concurrent solving jobs, making it ideal for web applications and services.

Creating a SolverManager

from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)

config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)

solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)

Basic Solving

solve()

Non-blocking solve that returns a future:

import uuid

job_id = str(uuid.uuid4())

# Start solving (non-blocking)
future = solver_manager.solve(job_id, problem)

# ... do other work ...

# Get result (blocks until done)
solution = future.get_final_best_solution()
print(f"Score: {solution.score}")

solve_and_listen()

Solve with progress callbacks:

def on_best_solution_changed(solution: Timetable):
    print(f"New best: {solution.score}")
    # Update UI, save to database, etc.

def on_exception(error):
    print(f"Solving failed: {error}")

solver_manager.solve_and_listen(
    job_id,
    problem_finder=lambda _: problem,
    best_solution_consumer=on_best_solution_changed,
    exception_handler=on_exception,
)

Managing Jobs

Check Job Status

status = solver_manager.get_solver_status(job_id)
# Returns: NOT_SOLVING, SOLVING_ACTIVE, SOLVING_ENDED

Get Current Best Solution

solution = solver_manager.get_best_solution(job_id)
if solution:
    print(f"Current best: {solution.score}")

Terminate Early

solver_manager.terminate_early(job_id)

FastAPI Integration

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid

solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    global solver_manager

    config = SolverConfig(...)
    factory = SolverFactory.create(config)
    solver_manager = SolverManager.create(factory)

    yield

    solver_manager.close()


app = FastAPI(lifespan=lifespan)


@app.post("/solve")
async def start_solving(problem: TimetableRequest) -> str:
    job_id = str(uuid.uuid4())

    def on_best_solution(solution: Timetable):
        solutions[job_id] = solution

    solver_manager.solve_and_listen(
        job_id,
        problem_finder=lambda _: problem.to_domain(),
        best_solution_consumer=on_best_solution,
    )

    return job_id


@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
    if job_id not in solutions:
        raise HTTPException(404, "Job not found")

    solution = solutions[job_id]
    status = solver_manager.get_solver_status(job_id)

    return {
        "status": status.name,
        "score": str(solution.score),
        "solution": TimetableResponse.from_domain(solution),
    }


@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
    solver_manager.terminate_early(job_id)
    return {"status": "terminating"}

Concurrent Jobs

SolverManager handles multiple jobs concurrently:

# Start multiple jobs
job1 = solver_manager.solve("job1", problem1)
job2 = solver_manager.solve("job2", problem2)
job3 = solver_manager.solve("job3", problem3)

# Each runs in its own thread
# Results available when ready
solution1 = job1.get_final_best_solution()

Resource Limits

By default, jobs run with no limit on concurrent execution. For resource management:

# Limit concurrent solvers
solver_manager = SolverManager.create(
    solver_factory,
    parallel_solver_count=4,  # Max 4 concurrent jobs
)

Problem Changes During Solving

Add changes to running jobs:

from solverforge_legacy.solver import ProblemChange

class AddEntity(ProblemChange[Timetable]):
    def __init__(self, entity):
        self.entity = entity

    def do_change(self, working_solution, score_director):
        working_solution.lessons.append(self.entity)
        score_director.after_entity_added(self.entity)

# Add change to running job
solver_manager.add_problem_change(job_id, AddEntity(new_lesson))

Cleanup

Always close the SolverManager when done:

# Using context manager
with SolverManager.create(factory) as manager:
    # ... use manager ...
# Automatically closed

# Manual cleanup
try:
    # ... use manager ...
finally:
    solver_manager.close()

Error Handling

def on_exception(job_id: str, exception: Exception):
    logger.error(f"Job {job_id} failed: {exception}")
    # Clean up, notify user, etc.

solver_manager.solve_and_listen(
    job_id,
    problem_finder=lambda _: problem,
    best_solution_consumer=on_solution,
    exception_handler=on_exception,
)

Best Practices

Do

  • Use solve_and_listen() for progress updates
  • Store solutions externally (database, cache)
  • Handle exceptions properly
  • Close SolverManager on shutdown

Don’t

  • Block the main thread waiting for results
  • Store solutions only in memory (lose on restart)
  • Forget to handle job cleanup

Next Steps

5.4 - SolutionManager

Analyze and explain solutions with SolutionManager.

SolutionManager provides utilities for analyzing solutions without running the solver.

Creating a SolutionManager

from solverforge_legacy.solver import SolverFactory, SolutionManager

solver_factory = SolverFactory.create(config)
solution_manager = SolutionManager.create(solver_factory)

Or from a SolverManager:

solver_manager = SolverManager.create(solver_factory)
solution_manager = SolutionManager.create(solver_manager)

Score Calculation

Calculate the score of a solution without solving:

# Update score in place
solution_manager.update(solution)
print(f"Score: {solution.score}")

This is useful for:

  • Validating manually created solutions
  • Comparing before/after changes
  • Testing constraint configurations

Score Analysis

Get a detailed breakdown of the score:

analysis = solution_manager.analyze(solution)

print(f"Overall score: {analysis.score}")

# Per-constraint breakdown
for constraint in analysis.constraint_analyses():
    print(f"\n{constraint.constraint_name}:")
    print(f"  Score: {constraint.score}")
    print(f"  Matches: {constraint.match_count}")

Constraint Matches

See exactly which entities triggered each constraint:

for constraint in analysis.constraint_analyses():
    print(f"\n{constraint.constraint_name}:")
    for match in constraint.matches():
        print(f"  - {match.justification}: {match.score}")

Indictments

Find which entities are responsible for score impacts:

for indictment in analysis.indictments():
    print(f"\nEntity: {indictment.indicted_object}")
    print(f"  Total impact: {indictment.score}")
    for match in indictment.matches():
        print(f"  - {match.constraint_name}: {match.score}")

Use Cases

Validate User Input

def validate_schedule(schedule: Schedule) -> list[str]:
    """Validate a manually created schedule."""
    solution_manager.update(schedule)

    if schedule.score.is_feasible:
        return []

    # Collect violations
    violations = []
    analysis = solution_manager.analyze(schedule)

    for constraint in analysis.constraint_analyses():
        if constraint.score.hard_score < 0:
            for match in constraint.matches():
                violations.append(str(match.justification))

    return violations

Compare Solutions

def compare_solutions(old: Schedule, new: Schedule) -> dict:
    """Compare two solutions."""
    old_analysis = solution_manager.analyze(old)
    new_analysis = solution_manager.analyze(new)

    return {
        "old_score": str(old_analysis.score),
        "new_score": str(new_analysis.score),
        "improved": new_analysis.score > old_analysis.score,
        "changes": get_constraint_changes(old_analysis, new_analysis),
    }


def get_constraint_changes(old, new):
    old_scores = {c.constraint_name: c.score for c in old.constraint_analyses()}
    changes = []

    for constraint in new.constraint_analyses():
        old_score = old_scores.get(constraint.constraint_name)
        if old_score != constraint.score:
            changes.append({
                "constraint": constraint.constraint_name,
                "old": str(old_score),
                "new": str(constraint.score),
            })

    return changes

Explain to Users

def explain_score(schedule: Schedule) -> dict:
    """Generate user-friendly score explanation."""
    analysis = solution_manager.analyze(schedule)

    hard_violations = []
    soft_penalties = []

    for constraint in analysis.constraint_analyses():
        if constraint.score.hard_score < 0:
            for match in constraint.matches():
                hard_violations.append({
                    "rule": constraint.constraint_name,
                    "details": str(match.justification),
                })
        elif constraint.score.soft_score < 0:
            soft_penalties.append({
                "rule": constraint.constraint_name,
                "impact": constraint.match_count,
            })

    return {
        "is_valid": schedule.score.is_feasible,
        "hard_violations": hard_violations,
        "soft_penalties": soft_penalties,
        "summary": generate_summary(analysis),
    }

API Endpoint

from fastapi import FastAPI

@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
    solution = solutions.get(job_id)
    if not solution:
        raise HTTPException(404)

    analysis = solution_manager.analyze(solution)

    return {
        "score": str(analysis.score),
        "is_feasible": analysis.score.is_feasible,
        "constraints": [
            {
                "name": c.constraint_name,
                "score": str(c.score),
                "matches": c.match_count,
            }
            for c in analysis.constraint_analyses()
        ],
    }

Debugging

Finding Score Corruption

def debug_score(solution):
    """Debug score calculation."""
    # Calculate fresh
    solution_manager.update(solution)
    fresh_score = solution.score

    # Analyze
    analysis = solution_manager.analyze(solution)
    analyzed_score = analysis.score

    if fresh_score != analyzed_score:
        print(f"Score mismatch: {fresh_score} vs {analyzed_score}")

    # Check each constraint
    total_hard = 0
    total_soft = 0
    for c in analysis.constraint_analyses():
        total_hard += c.score.hard_score
        total_soft += c.score.soft_score
        print(f"{c.constraint_name}: {c.score}")

    print(f"\nCalculated: {total_hard}hard/{total_soft}soft")
    print(f"Reported: {analyzed_score}")

Finding Unexpected Matches

def find_unexpected_matches(solution, constraint_name):
    """Debug why a constraint is matching."""
    analysis = solution_manager.analyze(solution)

    for c in analysis.constraint_analyses():
        if c.constraint_name == constraint_name:
            print(f"\n{constraint_name} matches ({c.match_count}):")
            for match in c.matches():
                print(f"  - {match.justification}")
            return

    print(f"Constraint '{constraint_name}' not found")

Performance Notes

  • update() is fast (incremental calculation)
  • analyze() is slower (collects all match details)
  • Cache analysis results if calling repeatedly
  • Don’t analyze every solution during solving

Next Steps

5.5 - Benchmarking

Compare solver configurations and tune performance.

Benchmarking helps you compare different solver configurations and find the best settings for your problem.

Why Benchmark

  • Compare algorithms: Find the best algorithm combination
  • Tune parameters: Optimize termination times, moves, etc.
  • Validate changes: Ensure improvements don’t regress
  • Understand scaling: See how performance changes with problem size

Basic Benchmarking

Create a simple benchmark by running the solver multiple times:

import time
from statistics import mean, stdev

def benchmark_config(config: SolverConfig, problems: list, runs: int = 3):
    """Benchmark a solver configuration."""
    results = []

    for problem in problems:
        problem_results = []
        for run in range(runs):
            factory = SolverFactory.create(config)
            solver = factory.build_solver()

            start = time.time()
            solution = solver.solve(problem)
            elapsed = time.time() - start

            problem_results.append({
                "score": solution.score,
                "time": elapsed,
                "feasible": solution.score.is_feasible,
            })

        results.append({
            "problem": problem.id,
            "avg_score": mean(r["score"].soft_score for r in problem_results),
            "avg_time": mean(r["time"] for r in problem_results),
            "feasibility_rate": sum(r["feasible"] for r in problem_results) / runs,
        })

    return results

Comparing Configurations

def compare_termination_times():
    """Compare different termination durations."""
    base_config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
    )

    durations = [10, 30, 60, 120, 300]  # seconds
    problems = load_benchmark_problems()

    results = {}
    for duration in durations:
        config = SolverConfig(
            **vars(base_config),
            termination_config=TerminationConfig(
                spent_limit=Duration(seconds=duration)
            ),
        )
        results[duration] = benchmark_config(config, problems)

    return results

Benchmark Report

Generate a readable report:

def generate_report(results: dict):
    """Generate benchmark report."""
    print("=" * 60)
    print("BENCHMARK REPORT")
    print("=" * 60)

    for config_name, config_results in results.items():
        print(f"\n{config_name}:")
        print("-" * 40)

        total_score = 0
        total_time = 0
        feasible_count = 0

        for r in config_results:
            print(f"  {r['problem']}: score={r['avg_score']:.1f}, "
                  f"time={r['avg_time']:.1f}s, "
                  f"feasible={r['feasibility_rate']*100:.0f}%")
            total_score += r["avg_score"]
            total_time += r["avg_time"]
            feasible_count += r["feasibility_rate"]

        n = len(config_results)
        print(f"\n  Average: score={total_score/n:.1f}, "
              f"time={total_time/n:.1f}s, "
              f"feasible={feasible_count/n*100:.0f}%")

    print("\n" + "=" * 60)

Problem Datasets

Create consistent benchmark datasets:

class BenchmarkDataset:
    """Collection of benchmark problems."""

    @staticmethod
    def small():
        """Small problems for quick testing."""
        return [
            generate_problem(lessons=20, rooms=3, timeslots=10),
            generate_problem(lessons=30, rooms=4, timeslots=10),
        ]

    @staticmethod
    def medium():
        """Medium problems for standard benchmarks."""
        return [
            generate_problem(lessons=100, rooms=10, timeslots=25),
            generate_problem(lessons=150, rooms=12, timeslots=25),
        ]

    @staticmethod
    def large():
        """Large problems for stress testing."""
        return [
            generate_problem(lessons=500, rooms=20, timeslots=50),
            generate_problem(lessons=1000, rooms=30, timeslots=50),
        ]

Reproducible Benchmarks

For consistent results:

def reproducible_benchmark(config: SolverConfig, problem, seed: int = 42):
    """Run benchmark with fixed seed."""
    config = SolverConfig(
        **vars(config),
        environment_mode=EnvironmentMode.REPRODUCIBLE,
        random_seed=seed,
    )

    factory = SolverFactory.create(config)
    solver = factory.build_solver()

    return solver.solve(problem)

Metrics to Track

Primary Metrics

MetricDescription
Best ScoreFinal solution quality
Time to BestWhen best score was found
Feasibility Rate% of runs finding feasible solution

Secondary Metrics

MetricDescription
Score Over TimeScore improvement curve
Steps per SecondAlgorithm throughput
Memory UsagePeak memory consumption

Score Over Time

Track how score improves:

def benchmark_with_history(config: SolverConfig, problem):
    """Benchmark with score history."""
    history = []

    def on_progress(event):
        history.append({
            "time": event.time_spent.total_seconds(),
            "score": event.new_best_score,
        })

    factory = SolverFactory.create(config)
    solver = factory.build_solver()
    solver.add_event_listener(on_progress)

    solution = solver.solve(problem)

    return {
        "final_score": solution.score,
        "history": history,
    }

Visualization

Plot results with matplotlib:

import matplotlib.pyplot as plt

def plot_score_over_time(results: dict):
    """Plot score improvement over time."""
    plt.figure(figsize=(10, 6))

    for config_name, result in results.items():
        times = [h["time"] for h in result["history"]]
        scores = [h["score"].soft_score for h in result["history"]]
        plt.plot(times, scores, label=config_name)

    plt.xlabel("Time (seconds)")
    plt.ylabel("Soft Score")
    plt.title("Score Improvement Over Time")
    plt.legend()
    plt.grid(True)
    plt.savefig("benchmark_results.png")

CI/CD Integration

Add benchmarks to your pipeline:

# test_benchmark.py
import pytest

def test_minimum_score():
    """Ensure solver achieves minimum score."""
    config = load_production_config()
    problem = BenchmarkDataset.small()[0]

    factory = SolverFactory.create(config)
    solver = factory.build_solver()
    solution = solver.solve(problem)

    assert solution.score.is_feasible, "Solution should be feasible"
    assert solution.score.soft_score >= -100, "Score should be >= -100"


def test_performance_regression():
    """Check for performance regression."""
    config = load_production_config()
    problem = BenchmarkDataset.medium()[0]

    start = time.time()
    factory = SolverFactory.create(config)
    solver = factory.build_solver()
    solution = solver.solve(problem)
    elapsed = time.time() - start

    assert solution.score.is_feasible
    assert elapsed < 120, "Should complete within 2 minutes"

Best Practices

Do

  • Use consistent problem datasets
  • Run multiple times (3-5) for statistical significance
  • Track both score and time
  • Use reproducible mode for comparisons

Don’t

  • Compare results from different machines
  • Use production data for benchmarks (privacy)
  • Optimize for benchmark problems only
  • Ignore feasibility rate

Next Steps

6 - Optimization Algorithms

Understand the algorithms that power SolverForge’s optimization.

SolverForge uses a combination of algorithms to find high-quality solutions efficiently. Understanding these algorithms helps you tune solver performance.

Topics

Algorithm Phases

SolverForge typically runs algorithms in phases:

1. Construction Heuristic
   └── Builds initial solution (fast, may be suboptimal)

2. Local Search
   └── Iteratively improves solution (most time spent here)

3. (Optional) Exhaustive Search
   └── Proves optimality (only feasible for small problems)

Construction Heuristics

Build an initial feasible solution quickly:

AlgorithmDescription
First FitAssign first available value
First Fit DecreasingAssign largest/most constrained entities first
Cheapest InsertionInsert at lowest cost position
Allocate from PoolAllocate entities from a pool

Local Search Algorithms

Iteratively improve the solution:

AlgorithmDescription
Hill ClimbingAccept only improving moves
Tabu SearchTrack recent moves to avoid cycles
Simulated AnnealingAccept worse moves with decreasing probability
Late AcceptanceAccept if better than solution from N steps ago
Great DelugeAccept if within rising threshold

Default Behavior

By default, SolverForge uses:

  1. First Fit Decreasing construction heuristic
  2. Late Acceptance local search

This works well for most problems. Advanced users can customize the algorithm configuration for specific use cases.

6.1 - Construction Heuristics

Build an initial solution quickly with construction heuristics.

A construction heuristic builds an initial solution by assigning values to all planning variables. It runs fast but may not find an optimal solution—that’s the job of local search.

Why Construction Heuristics?

  • Fast initialization: Quickly assigns all variables
  • Warm start: Gives local search a good starting point
  • Automatic termination: Stops when all variables are assigned

First Fit

Algorithm

First Fit cycles through planning entities in default order, assigning each to the best available value:

  1. Take the first unassigned entity
  2. Try each possible value
  3. Assign the value with the best score
  4. Repeat until all entities are assigned

Behavior

Entity 1 → Best value found → Assigned (never changed)
Entity 2 → Best value found → Assigned (never changed)
Entity 3 → Best value found → Assigned (never changed)
...

Limitations

  • Order matters: Early assignments may block better solutions
  • No backtracking: Once assigned, values don’t change
  • May not find feasible solution if early choices are poor

First Fit Decreasing

Algorithm

Like First Fit, but sorts entities by difficulty first:

  1. Sort entities by difficulty (hardest first)
  2. Assign difficult entities first
  3. Easy entities fit in remaining slots

Why It Helps

Difficult entities (those with fewer valid options) are assigned first while there are more options available. Easy entities can usually fit anywhere.

Example

For school timetabling:

  • Teachers with many constraints → assigned first
  • Teachers with few constraints → assigned last

Default Behavior

SolverForge uses First Fit Decreasing by default. This works well for most problems without configuration.

How It Works Internally

Phase: Construction Heuristic
├── Sort entities by difficulty
├── For each unassigned entity:
│   ├── Try each value from value range
│   ├── Calculate score impact
│   └── Assign best value
└── Done when all entities assigned
AspectConstructionLocal Search
PurposeBuild initial solutionImprove existing solution
SpeedVery fastRuns until termination
QualityDecentOptimal/near-optimal
ChangesAssigns unassigned onlyModifies assigned values

When Construction Fails

If construction can’t find a feasible solution:

  1. Overconstrained problem: Not enough resources for all entities
  2. Tight constraints: Early assignments block later ones
  3. Poor entity ordering: Important entities assigned last

Solutions

  • Use medium constraints for “assign as many as possible”
  • Add nullable planning variables
  • Let local search fix infeasibilities

Monitoring Construction

from solverforge_legacy.solver import BestSolutionChangedEvent

def on_progress(event: BestSolutionChangedEvent):
    if not event.is_new_best_solution_initialized:
        print("Construction phase...")
    else:
        print("Local search phase...")

solver.add_event_listener(on_progress)

Performance Tips

Entity Ordering

Entities are processed in declaration order by default. For better results:

  • Define difficult entities first in your entity list
  • Or implement difficulty comparison

Value Ordering

Values are tried in order. Better default values lead to faster construction.

Next Steps

6.2 - Local Search

Improve solutions iteratively with local search algorithms.

Local search algorithms iteratively improve a solution by making small changes called “moves.” This is where the solver spends most of its time finding better solutions.

How It Works

Start with initial solution
Repeat until termination:
    1. Generate possible moves
    2. Evaluate each move's score impact
    3. Select a move based on acceptance criteria
    4. Apply the move
    5. Update best solution if improved

Local Search Algorithms

Late Acceptance

Default algorithm. Accepts moves that improve on the solution from N steps ago.

  • Balances exploration and exploitation
  • Escapes local optima by accepting slightly worse moves
  • Simple and effective for most problems

Hill Climbing

Only accepts moves that improve the score:

  • Fast convergence
  • Gets stuck in local optima
  • Best for easy problems or quick iterations

Maintains a list of recently made moves and forbids reversing them:

  • Avoids cycles and revisiting solutions
  • Explores more of the search space
  • Memory overhead for tabu list

Simulated Annealing

Accepts worse moves with probability that decreases over time:

  • “Temperature” controls acceptance probability
  • High temperature = more exploration
  • Low temperature = more exploitation
  • Inspired by metallurgy annealing process

Great Deluge

Accepts moves above a rising “water level” threshold:

  • Threshold increases over time
  • Forces gradual improvement
  • Similar to simulated annealing

Move Selection

Local search evaluates moves generated by move selectors:

Move Examples:
├── Change Move: lesson.room = Room B → Room C
├── Swap Move: lesson1.room ↔ lesson2.room
├── 2-Opt Move: Reverse segment in route
└── Custom Move: Domain-specific change

See Move Selectors for details.

Score Improvement Curve

Score
  ^
  |     ****
  |    *    **
  |   *       ***
  |  *           ****
  | *                ********
  |*                         ***************
  +---------------------------------> Time
     Construction   Local Search

Rapid improvement early, then diminishing returns.

Local Optima

A local optimum is a solution where no single move improves the score, but better solutions exist:

Score
  ^
  |        *
  |       * *        Global optimum
  |      *   *      ↓
  |     *     *    *
  |    *       *  * *
  |   *    ↑    **   *
  |  *     Local      *
  |        optimum
  +------------------------→ Solution Space

Algorithms like Late Acceptance and Tabu Search help escape local optima.

Termination

Local search runs until termination:

TerminationConfig(
    spent_limit=Duration(minutes=5),      # Time limit
    best_score_limit="0hard/0soft",       # Score target
    unimproved_spent_limit=Duration(seconds=60),  # Plateau detection
)

Choosing Termination Time

Problem SizeSuggested Time
Small (< 100 entities)10-60 seconds
Medium (100-1000)1-10 minutes
Large (> 1000)10-60 minutes

More time generally means better scores, with diminishing returns.

Monitoring Progress

def on_progress(event: BestSolutionChangedEvent):
    print(f"Time: {event.time_spent}")
    print(f"Score: {event.new_best_score}")
    print(f"Initialized: {event.is_new_best_solution_initialized}")

solver.add_event_listener(on_progress)

Score Plateaus

When the score stops improving:

  1. Stuck in local optimum: Algorithm can’t find better moves
  2. Near optimal: Little room for improvement
  3. Constraint conflict: Hard constraints blocking progress

Detecting Plateaus

TerminationConfig(
    unimproved_spent_limit=Duration(seconds=60)  # Stop if no improvement
)

Algorithm Selection

AlgorithmBest For
Late AcceptanceDefault choice, most problems
Hill ClimbingSimple problems, quick checks
Tabu SearchProblems with many local optima
Simulated AnnealingComplex landscapes

Start with the default (Late Acceptance) and only change if benchmarking shows improvement.

Performance Tips

1. Let It Run Longer

More time usually means better scores.

2. Optimize Constraints

Slow constraints = fewer moves evaluated per second.

3. Use Appropriate Moves

Some moves work better for certain problems (see Move Selectors).

4. Benchmark

Test different algorithms and parameters on your specific problem.

Next Steps

6.3 - Exhaustive Search

Find optimal solutions with exhaustive search (for small problems).

Exhaustive search algorithms explore all possible solutions to find the optimal one. They guarantee the best solution but are only practical for small problems.

When to Use

Exhaustive search is only feasible when:

  • Problem is very small (< 20 entities, few values)
  • You need a guaranteed optimal solution
  • You have time to wait for completion

For most problems, local search finds near-optimal solutions much faster.

Branch and Bound

The main exhaustive search algorithm. It systematically explores the solution space while pruning branches that can’t improve on the best solution found.

How It Works

                    Root (no assignments)
                   /    |    \
            Entity1=A  Entity1=B  Entity1=C
              /  \        |          |
        E2=A  E2=B    E2=A         ...
        /  \    |      |
      E3=A ...  X    (pruned)
       |
    (Best?)
  1. Build a tree of partial solutions
  2. At each node, try assigning a value to the next entity
  3. Calculate a score bound for the branch
  4. If bound is worse than best known solution, prune the branch
  5. Continue until all branches are explored or pruned

Pruning

Pruning is key to performance:

Best so far: -5hard/0soft

Current partial: -3hard/?soft
→ Continue (might improve)

Current partial: -10hard/?soft
→ Prune (can't beat best)

Brute Force

Tries every possible combination without pruning:

  • Guarantees optimal solution
  • Extremely slow (exponential time)
  • Only for very small problems or validation

Complexity

For N entities with M possible values each:

  • Combinations: M^N
  • Example: 10 entities × 10 values = 10^10 = 10 billion combinations

Comparison

AspectBranch and BoundBrute Force
OptimalityGuaranteedGuaranteed
SpeedBetter (pruning)Very slow
MemoryHigherLower
Use caseSmall problemsTiny problems

Practical Limits

Problem SizeExhaustive Search Feasibility
< 10 entitiesPossible (seconds to minutes)
10-20 entitiesChallenging (minutes to hours)
> 20 entitiesUsually impractical

When Local Search is Better

For most real problems, local search is the right choice:

ProblemEntitiesExhaustiveLocal Search
Small demo101 second1 second
School timetabling200Years30 seconds
Vehicle routing100Years1 minute

Hybrid Approach

Use exhaustive search to validate local search:

def validate_optimality(problem):
    """
    For small problems, verify local search finds optimal.
    For testing only!
    """
    # Run local search
    local_solution = run_local_search(problem)

    # Run exhaustive search (small problems only!)
    optimal_solution = run_exhaustive(problem)

    assert local_solution.score == optimal_solution.score

Best Practices

Do

  • Use exhaustive search only for very small problems
  • Use it to validate your constraint model on tiny examples
  • Understand that it’s for special cases, not general use

Don’t

  • Expect exhaustive search to scale
  • Use it in production for real-world problems
  • Wait for results on large problems (it won’t finish)

Next Steps

6.4 - Move Selectors

Reference for move types available in local search.

Move selectors generate the moves that local search evaluates. Different move types are effective for different problems.

Move Types

Change Move

Changes one planning variable to a different value:

Before: lesson.room = Room A
After:  lesson.room = Room B

Best for: Assignment problems, scheduling

Swap Move

Swaps values between two entities:

Before: lesson1.room = Room A, lesson2.room = Room B
After:  lesson1.room = Room B, lesson2.room = Room A

Best for: When both changes are needed for improvement

Pillar Change Move

Changes multiple entities with the same value simultaneously:

Before: [lesson1, lesson2, lesson3].room = Room A
After:  [lesson1, lesson2, lesson3].room = Room B

Best for: Grouped entities that should move together

Pillar Swap Move

Swaps values between two groups of entities:

Before: [l1, l2].room = A, [l3, l4].room = B
After:  [l1, l2].room = B, [l3, l4].room = A

Best for: Problems with entity groups

List Change Move (for List Variables)

Changes an element’s position in a list:

Before: vehicle.visits = [A, B, C, D]
Move: Move C from position 2 to position 0
After:  vehicle.visits = [C, A, B, D]

Best for: Routing, sequencing

List Swap Move

Swaps two elements within or between lists:

Before: vehicle1.visits = [A, B], vehicle2.visits = [C, D]
Move: Swap B and C
After:  vehicle1.visits = [A, C], vehicle2.visits = [B, D]

Best for: Rebalancing routes

2-Opt Move

Reverses a segment of a list:

Before: vehicle.visits = [A, B, C, D, E]
Move: Reverse [B, C, D]
After:  vehicle.visits = [A, D, C, B, E]

Best for: Routing (reduces “crossing” paths)

Sublist Change Move

Moves a subsequence to a different position:

Before: vehicle.visits = [A, B, C, D, E]
Move: Move [B, C] to end
After:  vehicle.visits = [A, D, E, B, C]

Best for: Batch relocations

Sublist Swap Move

Swaps two subsequences:

Before: vehicle1.visits = [A, B, C], vehicle2.visits = [X, Y, Z]
Move: Swap [B, C] and [Y, Z]
After:  vehicle1.visits = [A, Y, Z], vehicle2.visits = [X, B, C]

Best for: Inter-route optimization

Default Move Selectors

SolverForge automatically selects appropriate moves based on your variable types:

Variable TypeDefault Moves
PlanningVariableChange, Swap
PlanningListVariableList Change, List Swap, 2-Opt

Move Selection Process

1. Selector generates candidate moves
2. Each move is evaluated (score calculated)
3. Acceptance criteria decides to apply or not
4. Repeat

Move Efficiency

Incremental Scoring

Moves are scored incrementally—only recalculating affected constraints:

Change lesson.room = A → B
Only recalculate:
├── Room conflict (for A and B)
├── Teacher room stability
└── (Other constraints unaffected)

This makes move evaluation fast.

Move Speed

Typical moves evaluated per second:

ScenarioMoves/Second
Simple constraints10,000+
Complex constraints1,000-10,000
Very complex100-1,000

More moves = more exploration = better solutions (usually).

Filtering Moves

The solver automatically filters invalid moves:

  • Moves that don’t change anything (same value)
  • Moves that violate pinning
  • Moves on uninitialized variables

Move Caching

To avoid regenerating the same moves:

  • Construction moves are cached
  • Local search moves are regenerated (solution changes)

Performance Impact

Move selection affects:

  1. Diversity: Different move types explore different parts of the search space
  2. Speed: Some moves are faster to evaluate
  3. Effectiveness: Some moves are more likely to find improvements

Problem-Specific Guidance

Scheduling (Timetabling, Shifts)

  • Change moves: Reassign timeslot, room, employee
  • Swap moves: Exchange assignments
  • Default selection works well

Routing (VRP)

  • List moves: Reorder visits
  • 2-Opt: Eliminate crossing paths
  • Sublist moves: Move segments between vehicles

Assignment (Task Assignment, Bin Packing)

  • Change moves: Reassign to different resource
  • Swap moves: Exchange assignments
  • Pillar moves: Move groups together

Troubleshooting

Slow Moves

If moves are slow:

  1. Check constraint complexity
  2. Optimize filtering (use joiners)
  3. Reduce problem size

Poor Improvement

If solutions don’t improve:

  1. Run longer
  2. Ensure moves can reach better solutions
  3. Check if stuck in local optimum

Next Steps

7 - Design Patterns

Common patterns for handling real-world planning scenarios.

Real-world planning problems often require more than basic optimization. This section covers patterns for common scenarios.

Topics

Real-Time Planning

Handle dynamic changes during solving:

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange[Timetable]):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, working_solution: Timetable, score_director):
        # Add the new lesson to the working solution
        working_solution.lessons.append(self.lesson)
        score_director.after_entity_added(self.lesson)

# Apply change while solver is running
solver.add_problem_change(AddLessonChange(new_lesson))

Continuous Planning

For problems that span long time periods, use a rolling horizon:

  1. Plan Window - Only optimize a subset of the timeline
  2. Published Window - Lock decisions that are being executed
  3. Draft Window - Future decisions that can still change

When to Use These Patterns

ScenarioPattern
New orders arrive during planningReal-Time Planning
Plan extends into the futureContinuous Planning
Daily/weekly batch optimizationRepeated Planning
Vehicle breakdowns, cancellationsReal-Time Planning
Rolling weekly schedulesContinuous Planning

7.1 - Real-Time Planning

Handle changes while the solver is running.

Real-time planning allows you to modify the problem while the solver is running. This is essential for handling dynamic changes like new orders, cancellations, or resource changes.

Problem Changes

Use ProblemChange to modify the working solution:

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange[Timetable]):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, working_solution: Timetable, score_director):
        # Add to solution
        working_solution.lessons.append(self.lesson)
        # Notify score director
        score_director.after_entity_added(self.lesson)

Applying Changes

# With Solver
new_lesson = Lesson("new-1", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))

# With SolverManager
solver_manager.add_problem_change(job_id, AddLessonChange(new_lesson))

Common Change Types

Add Entity

class AddVisitChange(ProblemChange[RoutePlan]):
    def __init__(self, visit: Visit):
        self.visit = visit

    def do_change(self, solution: RoutePlan, score_director):
        solution.visits.append(self.visit)
        score_director.after_entity_added(self.visit)

Remove Entity

class RemoveVisitChange(ProblemChange[RoutePlan]):
    def __init__(self, visit_id: str):
        self.visit_id = visit_id

    def do_change(self, solution: RoutePlan, score_director):
        visit = next(v for v in solution.visits if v.id == self.visit_id)

        # Remove from vehicle if assigned
        if visit.vehicle:
            score_director.before_list_variable_changed(
                visit.vehicle, "visits", visit.vehicle.visits
            )
            visit.vehicle.visits.remove(visit)
            score_director.after_list_variable_changed(
                visit.vehicle, "visits", visit.vehicle.visits
            )

        # Remove from solution
        score_director.before_entity_removed(visit)
        solution.visits.remove(visit)
        score_director.after_entity_removed(visit)

Modify Entity

class UpdateVisitDemandChange(ProblemChange[RoutePlan]):
    def __init__(self, visit_id: str, new_demand: int):
        self.visit_id = visit_id
        self.new_demand = new_demand

    def do_change(self, solution: RoutePlan, score_director):
        visit = next(v for v in solution.visits if v.id == self.visit_id)

        score_director.before_problem_property_changed(visit)
        visit.demand = self.new_demand
        score_director.after_problem_property_changed(visit)

Add Problem Fact

class AddVehicleChange(ProblemChange[RoutePlan]):
    def __init__(self, vehicle: Vehicle):
        self.vehicle = vehicle

    def do_change(self, solution: RoutePlan, score_director):
        solution.vehicles.append(self.vehicle)
        score_director.after_problem_fact_added(self.vehicle)

Score Director Notifications

Always notify the score director of changes:

MethodWhen to Use
after_entity_added()Added planning entity
before/after_entity_removed()Removing planning entity
before/after_variable_changed()Changed planning variable
before/after_list_variable_changed()Changed list variable
before/after_problem_property_changed()Changed entity property
after_problem_fact_added()Added problem fact
before/after_problem_fact_removed()Removing problem fact

Order Matters

For removals and changes, call before_* first:

score_director.before_entity_removed(entity)
# Actually remove
solution.entities.remove(entity)
score_director.after_entity_removed(entity)

Real-Time API Example

from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager, ProblemChange

app = FastAPI()
solver_manager: SolverManager


@app.post("/visits")
async def add_visit(visit: VisitRequest, job_id: str):
    """Add a visit to an active solving job."""
    new_visit = Visit(
        id=str(uuid.uuid4()),
        location=visit.location,
        demand=visit.demand,
    )

    solver_manager.add_problem_change(
        job_id,
        AddVisitChange(new_visit)
    )

    return {"id": new_visit.id, "status": "added"}


@app.delete("/visits/{visit_id}")
async def remove_visit(visit_id: str, job_id: str):
    """Remove a visit from an active solving job."""
    solver_manager.add_problem_change(
        job_id,
        RemoveVisitChange(visit_id)
    )

    return {"status": "removed"}

Change Ordering

Changes are applied in the order they’re submitted:

solver.add_problem_change(change1)  # Applied first
solver.add_problem_change(change2)  # Applied second
solver.add_problem_change(change3)  # Applied third

Best Practices

Do

  • Keep changes small and focused
  • Notify score director of all modifications
  • Use before_* methods for removals/changes
  • Test changes in isolation

Don’t

  • Make changes without notifying score director
  • Modify multiple entities in one complex change
  • Forget to handle entity relationships

Debugging Changes

class DebugChange(ProblemChange[Solution]):
    def __init__(self, inner: ProblemChange):
        self.inner = inner

    def do_change(self, solution, score_director):
        print(f"Before: {len(solution.entities)} entities")
        self.inner.do_change(solution, score_director)
        print(f"After: {len(solution.entities)} entities")

Next Steps

7.2 - Continuous Planning

Rolling horizon and replanning strategies.

Continuous planning handles problems that span long time periods by using a rolling planning window. Instead of planning everything at once, you plan a window and move it forward as time passes.

The Challenge

Planning a full year of shifts at once:

  • Huge problem size
  • Far-future plans become irrelevant
  • Real-world changes invalidate long-term plans

Rolling Horizon

Plan only a window of time, then slide it forward:

Time ──────────────────────────────────────────►

Window 1: [====Plan====]
Window 2:    [====Plan====]
Window 3:       [====Plan====]

Implementation

from datetime import datetime, timedelta

def plan_window(start_date: date, window_days: int, problem: Schedule) -> Schedule:
    """Plan a time window."""
    end_date = start_date + timedelta(days=window_days)

    # Filter entities to window
    window_shifts = [
        s for s in problem.shifts
        if start_date <= s.date < end_date
    ]

    window_problem = Schedule(
        employees=problem.employees,
        shifts=window_shifts,
    )

    solver = create_solver()
    return solver.solve(window_problem)


def continuous_plan(problem: Schedule, window_days: int = 14):
    """Run continuous planning with rolling windows."""
    current_date = date.today()
    end_date = max(s.date for s in problem.shifts)

    while current_date < end_date:
        solution = plan_window(current_date, window_days, problem)
        save_solution(solution)

        # Move window forward
        current_date += timedelta(days=7)  # Overlap

Published vs Draft

Divide the window into published (locked) and draft (changeable):

Time ──────────────────────────────────────────►

      [Published][====Draft====]
      (Locked)   (Can change)

Implementation with Pinning

def prepare_window(problem: Schedule, publish_deadline: datetime):
    """Pin published shifts, leave draft unpinned."""
    for shift in problem.shifts:
        if shift.start_time < publish_deadline:
            shift.pinned = True
        else:
            shift.pinned = False

    return problem

Replanning Triggers

Replan when:

  1. Time-based: Every hour, day, or week
  2. Event-based: New orders, cancellations, resource changes
  3. Threshold-based: When score degrades below threshold

Event-Based Replanning

def on_new_order(order: Order, active_job_id: str):
    """Trigger replanning when new order arrives."""
    solver_manager.terminate_early(active_job_id)

    updated_problem = load_current_state()
    updated_problem.orders.append(order)

    new_job_id = start_solving(updated_problem)
    return new_job_id

Warm Starting

Start from the previous solution to preserve good assignments:

def warm_start_plan(previous: Schedule, new_shifts: list[Shift]) -> Schedule:
    """Start from previous solution, add new shifts."""
    # Keep previous assignments (pinned or as starting point)
    for shift in previous.shifts:
        if shift.employee is not None:
            shift.pinned = True  # Or just leave assigned

    # Add new unassigned shifts
    for shift in new_shifts:
        shift.employee = None
        shift.pinned = False
        previous.shifts.append(shift)

    return solve(previous)

Time Windows

Sliding Window

Week 1: Plan days 1-14
Week 2: Plan days 8-21 (7-day overlap)
Week 3: Plan days 15-28

The overlap allows replanning of near-future assignments.

Growing Window

For finite problems, grow the window:

Day 1: Plan days 1-7
Day 2: Plan days 1-14
Day 3: Plan days 1-21
...until complete

Handling Conflicts

When replanning conflicts with executed work:

def merge_with_reality(planned: Schedule, actual: Schedule) -> Schedule:
    """Merge planned schedule with actual execution."""
    for planned_shift in planned.shifts:
        actual_shift = find_actual(actual, planned_shift.id)

        if actual_shift and actual_shift.is_started:
            # Can't change started shifts
            planned_shift.employee = actual_shift.employee
            planned_shift.pinned = True

    return planned

Best Practices

Do

  • Use overlapping windows for smoother transitions
  • Pin executed/committed work
  • Warm start from previous solutions
  • Handle edge cases (window boundaries)

Don’t

  • Plan too far ahead (changes will invalidate)
  • Forget to merge with reality
  • Ignore the transition between windows

Example: Weekly Scheduling

class WeeklyScheduler:
    def __init__(self):
        self.solver_manager = create_solver_manager()

    def plan_next_week(self):
        """Run weekly planning cycle."""
        # Load current state
        current = load_current_schedule()

        # Determine window
        today = date.today()
        window_start = today + timedelta(days=(7 - today.weekday()))  # Next Monday
        window_end = window_start + timedelta(days=14)

        # Pin this week (being executed)
        for shift in current.shifts:
            if shift.date < window_start:
                shift.pinned = True
            elif shift.date < window_end:
                shift.pinned = False  # Can replan
            else:
                continue  # Outside window

        # Solve
        solution = self.solve(current)

        # Publish next week
        publish_week(solution, window_start, window_start + timedelta(days=7))

        return solution

Next Steps

7.3 - Repeated Planning

Batch optimization and periodic replanning.

Repeated planning runs the solver on a regular schedule, optimizing batches of work. Unlike continuous planning, each run is independent.

Use Cases

  • Daily route optimization
  • Weekly shift scheduling
  • Periodic resource allocation
  • Batch order assignment

Basic Pattern

from datetime import datetime
import schedule
import time

def daily_optimization():
    """Run optimization every day at 2 AM."""
    # Load today's problem
    problem = load_todays_problem()

    # Solve
    solver = create_solver()
    solution = solver.solve(problem)

    # Save results
    save_solution(solution)
    notify_stakeholders(solution)

# Schedule daily run
schedule.every().day.at("02:00").do(daily_optimization)

while True:
    schedule.run_pending()
    time.sleep(60)

Batch Processing

Process multiple independent problems:

def optimize_all_regions():
    """Optimize each region independently."""
    regions = load_regions()
    results = {}

    for region in regions:
        problem = load_region_problem(region)
        solution = solve(problem)
        results[region] = solution
        save_solution(region, solution)

    return results

Parallel Batch Processing

from concurrent.futures import ThreadPoolExecutor

def optimize_regions_parallel():
    """Optimize regions in parallel."""
    regions = load_regions()

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(solve_region, region): region
            for region in regions
        }

        results = {}
        for future in futures:
            region = futures[future]
            results[region] = future.result()

    return results

Time-Based Replanning

Fixed Schedule

# Every hour
schedule.every().hour.do(replan)

# Every day at specific time
schedule.every().day.at("06:00").do(replan)

# Every Monday
schedule.every().monday.at("00:00").do(weekly_plan)

Cron-Based

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# Run at 2 AM every day
scheduler.add_job(daily_optimization, 'cron', hour=2)

# Run every 30 minutes
scheduler.add_job(frequent_replan, 'cron', minute='*/30')

scheduler.start()

Handling Failures

def robust_optimization():
    """Optimization with retry and fallback."""
    max_retries = 3

    for attempt in range(max_retries):
        try:
            problem = load_problem()
            solution = solve(problem)
            save_solution(solution)
            return solution

        except Exception as e:
            logger.error(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(60)  # Wait before retry
            else:
                # Use previous solution as fallback
                return load_previous_solution()

Comparing Solutions

Track solution quality over time:

def track_solution_quality(solution: Schedule):
    """Log solution metrics for analysis."""
    metrics = {
        "timestamp": datetime.now().isoformat(),
        "score": str(solution.score),
        "feasible": solution.score.is_feasible,
        "entity_count": len(solution.shifts),
        "assigned_count": sum(1 for s in solution.shifts if s.employee),
    }

    log_metrics(metrics)

    # Alert if quality degrades
    if not solution.score.is_feasible:
        send_alert("Infeasible solution generated!")

Incremental vs Fresh

Fresh Start

Each run starts from scratch:

def fresh_optimization():
    problem = load_problem()
    # All entities unassigned
    for entity in problem.entities:
        entity.planning_variable = None
    return solve(problem)

Incremental (Warm Start)

Start from previous solution:

def incremental_optimization():
    previous = load_previous_solution()

    # Keep good assignments, clear bad ones
    for entity in previous.entities:
        if should_keep(entity):
            entity.pinned = True
        else:
            entity.planning_variable = None
            entity.pinned = False

    return solve(previous)

Monitoring

class OptimizationMonitor:
    def __init__(self):
        self.runs = []

    def record_run(self, solution, duration):
        self.runs.append({
            "time": datetime.now(),
            "score": solution.score,
            "duration": duration,
            "feasible": solution.score.is_feasible,
        })

    def get_statistics(self):
        if not self.runs:
            return None

        feasible_rate = sum(r["feasible"] for r in self.runs) / len(self.runs)
        avg_duration = sum(r["duration"] for r in self.runs) / len(self.runs)

        return {
            "total_runs": len(self.runs),
            "feasibility_rate": feasible_rate,
            "avg_duration_seconds": avg_duration,
        }

Best Practices

Do

  • Log all runs for analysis
  • Implement retry logic
  • Monitor solution quality trends
  • Use appropriate scheduling library

Don’t

  • Run optimization during peak hours
  • Ignore failures silently
  • Forget to save results
  • Overload with too frequent replanning

Next Steps

8 - Integration

Integrate SolverForge with web frameworks and other systems.

SolverForge integrates easily with Python web frameworks and data systems.

Topics

FastAPI Example

from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager

app = FastAPI()
solver_manager = SolverManager.create(solver_factory)

@app.post("/solve")
async def solve(problem: Timetable) -> str:
    job_id = str(uuid.uuid4())
    solver_manager.solve_and_listen(
        job_id,
        lambda _: problem,
        on_best_solution_changed
    )
    return job_id

@app.get("/solution/{job_id}")
async def get_solution(job_id: str) -> Timetable:
    return solver_manager.get_best_solution(job_id)

@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
    solver_manager.terminate_early(job_id)

Serialization

SolverForge domain objects are standard Python dataclasses, making them easy to serialize:

import json
from dataclasses import asdict

# Serialize to JSON
json_str = json.dumps(asdict(solution))

# With Pydantic for validation
from pydantic.dataclasses import dataclass as pydantic_dataclass

@pydantic_dataclass
class TimetableDTO:
    timeslots: list[TimeslotDTO]
    rooms: list[RoomDTO]
    lessons: list[LessonDTO]

Database Integration

Use any Python ORM (SQLAlchemy, Django ORM, etc.) for persistence:

  1. Load data from database into domain objects
  2. Run the solver
  3. Save results back to database

The solver works with in-memory Python objects, so any data source that can produce those objects will work.

8.1 - FastAPI Integration

Build REST APIs for your solver with FastAPI.

FastAPI is a modern Python web framework that works well with SolverForge. This guide shows common patterns for building solver APIs.

Basic Setup

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid

from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)


# Global state
solver_manager: SolverManager | None = None
solutions: dict[str, Solution] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize solver on startup, cleanup on shutdown."""
    global solver_manager

    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
    )

    solver_factory = SolverFactory.create(config)
    solver_manager = SolverManager.create(solver_factory)

    yield

    if solver_manager:
        solver_manager.close()


app = FastAPI(
    title="Solver API",
    description="Planning optimization API",
    lifespan=lifespan,
)

API Endpoints

Submit Problem

@app.post("/solve", response_model=str)
async def submit_problem(request: ProblemRequest) -> str:
    """Submit a problem for solving. Returns job ID."""
    job_id = str(uuid.uuid4())
    problem = request.to_domain()

    def on_best_solution(solution):
        solutions[job_id] = solution

    solver_manager.solve_and_listen(
        job_id,
        problem_finder=lambda _: problem,
        best_solution_consumer=on_best_solution,
    )

    return job_id

Get Solution

@app.get("/solution/{job_id}", response_model=SolutionResponse)
async def get_solution(job_id: str) -> SolutionResponse:
    """Get the current best solution."""
    if job_id not in solutions:
        raise HTTPException(404, "Job not found")

    solution = solutions[job_id]
    status = solver_manager.get_solver_status(job_id)

    return SolutionResponse.from_domain(solution, status)

Stop Solving

@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
    """Stop solving early."""
    solver_manager.terminate_early(job_id)
    return {"status": "terminating"}

Get Score Analysis

@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
    """Get detailed score analysis."""
    if job_id not in solutions:
        raise HTTPException(404, "Job not found")

    solution = solutions[job_id]
    analysis = solution_manager.analyze(solution)

    return {
        "score": str(analysis.score),
        "is_feasible": analysis.score.is_feasible,
        "constraints": [
            {
                "name": c.constraint_name,
                "score": str(c.score),
                "matches": c.match_count,
            }
            for c in analysis.constraint_analyses()
        ],
    }

Request/Response Models

Use Pydantic for validation:

from pydantic import BaseModel
from datetime import time


class TimeslotDTO(BaseModel):
    day: str
    start_time: str
    end_time: str

    def to_domain(self) -> Timeslot:
        return Timeslot(
            self.day,
            time.fromisoformat(self.start_time),
            time.fromisoformat(self.end_time),
        )

    @classmethod
    def from_domain(cls, timeslot: Timeslot) -> "TimeslotDTO":
        return cls(
            day=timeslot.day,
            start_time=timeslot.start_time.isoformat(),
            end_time=timeslot.end_time.isoformat(),
        )


class ProblemRequest(BaseModel):
    timeslots: list[TimeslotDTO]
    rooms: list[RoomDTO]
    lessons: list[LessonDTO]

    def to_domain(self) -> Timetable:
        timeslots = [t.to_domain() for t in self.timeslots]
        rooms = [r.to_domain() for r in self.rooms]
        lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
        return Timetable("api", timeslots, rooms, lessons)


class SolutionResponse(BaseModel):
    status: str
    score: str | None
    is_feasible: bool | None
    lessons: list[LessonDTO]

    @classmethod
    def from_domain(cls, solution: Timetable, status) -> "SolutionResponse":
        return cls(
            status=status.name,
            score=str(solution.score) if solution.score else None,
            is_feasible=solution.score.is_feasible if solution.score else None,
            lessons=[LessonDTO.from_domain(l) for l in solution.lessons],
        )

Real-Time Updates

Problem Changes

@app.post("/solve/{job_id}/lessons")
async def add_lesson(job_id: str, lesson: LessonDTO):
    """Add a lesson to an active job."""
    new_lesson = lesson.to_domain()

    solver_manager.add_problem_change(
        job_id,
        AddLessonChange(new_lesson)
    )

    return {"status": "added", "id": new_lesson.id}

WebSocket Updates

from fastapi import WebSocket

@app.websocket("/ws/{job_id}")
async def websocket_updates(websocket: WebSocket, job_id: str):
    await websocket.accept()

    async def send_update(solution):
        await websocket.send_json({
            "score": str(solution.score),
            "timestamp": datetime.now().isoformat(),
        })

    # Register listener
    # (Implementation depends on your event system)

    try:
        while True:
            await asyncio.sleep(1)
            if job_id in solutions:
                await send_update(solutions[job_id])
    except WebSocketDisconnect:
        pass

Error Handling

from fastapi import HTTPException
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
    return JSONResponse(
        status_code=500,
        content={"error": str(exc)},
    )


@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
    if job_id not in solutions:
        raise HTTPException(
            status_code=404,
            detail=f"Job {job_id} not found"
        )
    # ...

Testing

from fastapi.testclient import TestClient

def test_submit_and_get():
    client = TestClient(app)

    # Submit problem
    response = client.post("/solve", json=problem_data)
    assert response.status_code == 200
    job_id = response.json()

    # Wait for solving
    time.sleep(5)

    # Get solution
    response = client.get(f"/solution/{job_id}")
    assert response.status_code == 200
    assert response.json()["is_feasible"]

Deployment

Docker

FROM python:3.11-slim

# Install JDK
RUN apt-get update && apt-get install -y openjdk-17-jdk

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Running

# Development
uvicorn main:app --reload

# Production
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

Next Steps

8.2 - Serialization

JSON serialization with dataclasses and Pydantic.

SolverForge domain objects are Python dataclasses, making them easy to serialize to JSON for APIs and storage.

Basic JSON Serialization

Using dataclasses

from dataclasses import dataclass, asdict
import json

@dataclass
class Timeslot:
    day: str
    start_time: str
    end_time: str

@dataclass
class Room:
    name: str

# Serialize
timeslot = Timeslot("MONDAY", "08:30", "09:30")
json_str = json.dumps(asdict(timeslot))
# {"day": "MONDAY", "start_time": "08:30", "end_time": "09:30"}

# Deserialize
data = json.loads(json_str)
timeslot = Timeslot(**data)

Handling Complex Types

For types like datetime and time:

from dataclasses import dataclass
from datetime import time
import json

class TimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, time):
            return obj.isoformat()
        return super().default(obj)

def time_decoder(dct):
    for key, value in dct.items():
        if key.endswith('_time') and isinstance(value, str):
            try:
                dct[key] = time.fromisoformat(value)
            except ValueError:
                pass
    return dct

# Serialize
json_str = json.dumps(asdict(obj), cls=TimeEncoder)

# Deserialize
data = json.loads(json_str, object_hook=time_decoder)

Pydantic Integration

Pydantic provides automatic validation and serialization:

DTO Pattern

Separate API models from domain models:

from pydantic import BaseModel
from datetime import time

# API model
class TimeslotDTO(BaseModel):
    day: str
    start_time: str
    end_time: str

    def to_domain(self) -> Timeslot:
        return Timeslot(
            self.day,
            time.fromisoformat(self.start_time),
            time.fromisoformat(self.end_time),
        )

    @classmethod
    def from_domain(cls, timeslot: Timeslot) -> "TimeslotDTO":
        return cls(
            day=timeslot.day,
            start_time=timeslot.start_time.isoformat(),
            end_time=timeslot.end_time.isoformat(),
        )


# Domain model (unchanged)
@dataclass
class Timeslot:
    day: str
    start_time: time
    end_time: time

Full Example

from pydantic import BaseModel
from datetime import time


class LessonDTO(BaseModel):
    id: str
    subject: str
    teacher: str
    student_group: str
    timeslot: TimeslotDTO | None = None
    room: RoomDTO | None = None

    def to_domain(self, timeslots: list[Timeslot], rooms: list[Room]) -> Lesson:
        ts = None
        if self.timeslot:
            ts = find_timeslot(timeslots, self.timeslot)

        rm = None
        if self.room:
            rm = find_room(rooms, self.room)

        return Lesson(
            id=self.id,
            subject=self.subject,
            teacher=self.teacher,
            student_group=self.student_group,
            timeslot=ts,
            room=rm,
        )

    @classmethod
    def from_domain(cls, lesson: Lesson) -> "LessonDTO":
        return cls(
            id=lesson.id,
            subject=lesson.subject,
            teacher=lesson.teacher,
            student_group=lesson.student_group,
            timeslot=TimeslotDTO.from_domain(lesson.timeslot) if lesson.timeslot else None,
            room=RoomDTO.from_domain(lesson.room) if lesson.room else None,
        )


class TimetableDTO(BaseModel):
    id: str
    timeslots: list[TimeslotDTO]
    rooms: list[RoomDTO]
    lessons: list[LessonDTO]
    score: str | None = None

    def to_domain(self) -> Timetable:
        timeslots = [t.to_domain() for t in self.timeslots]
        rooms = [r.to_domain() for r in self.rooms]
        lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
        return Timetable(self.id, timeslots, rooms, lessons)

    @classmethod
    def from_domain(cls, timetable: Timetable) -> "TimetableDTO":
        return cls(
            id=timetable.id,
            timeslots=[TimeslotDTO.from_domain(t) for t in timetable.timeslots],
            rooms=[RoomDTO.from_domain(r) for r in timetable.rooms],
            lessons=[LessonDTO.from_domain(l) for l in timetable.lessons],
            score=str(timetable.score) if timetable.score else None,
        )

Reference Resolution

When deserializing, resolve references to shared objects:

def find_timeslot(timeslots: list[Timeslot], dto: TimeslotDTO) -> Timeslot:
    """Find matching timeslot by properties."""
    for ts in timeslots:
        if (ts.day == dto.day and
            ts.start_time.isoformat() == dto.start_time):
            return ts
    raise ValueError(f"Timeslot not found: {dto}")


def find_room(rooms: list[Room], dto: RoomDTO) -> Room:
    """Find matching room by name."""
    for room in rooms:
        if room.name == dto.name:
            return room
    raise ValueError(f"Room not found: {dto}")

Score Serialization

from solverforge_legacy.solver.score import HardSoftScore

# To string
score_str = str(solution.score)  # "-2hard/-15soft"

# From string
score = HardSoftScore.parse("-2hard/-15soft")

# To dict
score_dict = {
    "hard": solution.score.hard_score,
    "soft": solution.score.soft_score,
    "feasible": solution.score.is_feasible,
}

Database Persistence

SQLAlchemy Example

from sqlalchemy import Column, String, Integer, ForeignKey
from sqlalchemy.orm import relationship

class TimeslotModel(Base):
    __tablename__ = "timeslots"

    id = Column(Integer, primary_key=True)
    day = Column(String)
    start_time = Column(String)
    end_time = Column(String)

    def to_domain(self) -> Timeslot:
        return Timeslot(
            self.day,
            time.fromisoformat(self.start_time),
            time.fromisoformat(self.end_time),
        )


class LessonModel(Base):
    __tablename__ = "lessons"

    id = Column(String, primary_key=True)
    subject = Column(String)
    teacher = Column(String)
    student_group = Column(String)
    timeslot_id = Column(Integer, ForeignKey("timeslots.id"), nullable=True)
    room_id = Column(Integer, ForeignKey("rooms.id"), nullable=True)

    timeslot = relationship("TimeslotModel")
    room = relationship("RoomModel")

Best Practices

Do

  • Use DTOs for API boundaries
  • Validate input with Pydantic
  • Handle None values explicitly
  • Use consistent naming conventions

Don’t

  • Serialize domain objects directly (may expose internals)
  • Forget to handle score serialization
  • Ignore reference resolution
  • Mix API and domain models

Next Steps

8.3 - Logging

Configure logging for debugging and monitoring.

Configure Python logging to monitor solver behavior and debug issues.

Basic Configuration

import logging

# Configure root logger
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

# Get logger for your app
logger = logging.getLogger("my_app")

Solver Logging

The solver uses the ai.timefold logger hierarchy:

# Enable solver debug logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)

# Or just specific components
logging.getLogger("ai.timefold.solver").setLevel(logging.DEBUG)

Log Levels

LevelUse Case
DEBUGDetailed solver internals
INFOProgress updates, scores
WARNINGPotential issues
ERRORFailures

Progress Logging

Log solver progress with event listeners:

from solverforge_legacy.solver import BestSolutionChangedEvent

logger = logging.getLogger("solver")

def on_progress(event: BestSolutionChangedEvent):
    logger.info(
        f"Score: {event.new_best_score} | "
        f"Time: {event.time_spent} | "
        f"Initialized: {event.is_new_best_solution_initialized}"
    )

solver.add_event_listener(on_progress)

File Logging

Write logs to a file:

import logging

# Create file handler
file_handler = logging.FileHandler("solver.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))

# Add to logger
logging.getLogger().addHandler(file_handler)

Structured Logging

For production, use structured logging:

import json
import logging

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if hasattr(record, "score"):
            log_data["score"] = record.score
        if hasattr(record, "job_id"):
            log_data["job_id"] = record.job_id
        return json.dumps(log_data)


handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.getLogger().addHandler(handler)

Logging with Context

def log_with_context(logger, job_id, message, **kwargs):
    extra = {"job_id": job_id, **kwargs}
    logger.info(message, extra=extra)

# Usage
log_with_context(logger, "job-123", "Solving started", entities=100)

FastAPI Logging

from fastapi import FastAPI, Request
import logging
import time

logger = logging.getLogger("api")

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start

    logger.info(
        f"{request.method} {request.url.path} "
        f"- {response.status_code} - {duration:.3f}s"
    )
    return response

Debugging Tips

Enable Verbose Logging

# Maximum verbosity
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)

Log Constraint Matches

def debug_constraints(solution):
    logger = logging.getLogger("constraints")
    analysis = solution_manager.analyze(solution)

    for constraint in analysis.constraint_analyses():
        logger.debug(
            f"{constraint.constraint_name}: "
            f"score={constraint.score}, matches={constraint.match_count}"
        )
        for match in constraint.matches():
            logger.debug(f"  - {match.justification}")

Log Configuration

def log_config(config: SolverConfig):
    logger = logging.getLogger("config")
    logger.info(f"Solution class: {config.solution_class}")
    logger.info(f"Entity classes: {config.entity_class_list}")
    logger.info(f"Termination: {config.termination_config}")

Production Recommendations

Log Aggregation

Send logs to a central system:

# Example with Python logging to stdout (for container orchestration)
logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',  # JSON formatted
    stream=sys.stdout,
)

Metrics

Track key metrics:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class SolveMetrics:
    job_id: str
    start_time: datetime
    end_time: datetime | None = None
    final_score: str | None = None
    is_feasible: bool | None = None

    def log(self):
        duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0
        logger.info(
            f"Job {self.job_id}: "
            f"duration={duration:.1f}s, "
            f"score={self.final_score}, "
            f"feasible={self.is_feasible}"
        )

Alerting

Alert on issues:

def check_solution_quality(solution, job_id):
    if not solution.score.is_feasible:
        logger.warning(f"Job {job_id} produced infeasible solution!")
        send_alert(f"Infeasible solution for job {job_id}")

    if solution.score.soft_score < -10000:
        logger.warning(f"Job {job_id} has poor soft score: {solution.score}")

Next Steps

9 - Reference

API reference and frequently asked questions.

Quick reference guides and answers to common questions.

Topics

  • API Summary - Quick reference for key classes and functions
  • FAQ - Frequently asked questions

Key Imports

# Domain modeling
from solverforge_legacy.solver.domain import (
    planning_entity,
    planning_solution,
    PlanningId,
    PlanningVariable,
    PlanningListVariable,
    PlanningEntityCollectionProperty,
    ProblemFactCollectionProperty,
    ValueRangeProvider,
    PlanningScore,
    PlanningPin,
    InverseRelationShadowVariable,
    PreviousElementShadowVariable,
    NextElementShadowVariable,
    CascadingUpdateShadowVariable,
)

# Constraints
from solverforge_legacy.solver.score import (
    constraint_provider,
    ConstraintFactory,
    Constraint,
    Joiners,
    ConstraintCollectors,
    HardSoftScore,
    HardMediumSoftScore,
    SimpleScore,
)

# Solver
from solverforge_legacy.solver import (
    SolverFactory,
    SolverManager,
    SolutionManager,
    ProblemChange,
)

# Configuration
from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

Score Types

Score TypeLevelsUse Case
SimpleScore1Single optimization objective
HardSoftScore2Feasibility (hard) + optimization (soft)
HardMediumSoftScore3Hard + important preferences + nice-to-have
BendableScoreNCustom number of levels
*DecimalScore-Decimal precision variants

9.1 - API Summary

Quick reference for SolverForge Python API.

Quick reference for commonly used SolverForge APIs.

Domain Decorators

from solverforge_legacy.solver.domain import (
    planning_entity,
    planning_solution,
)
DecoratorPurpose
@planning_entityMark a class as a planning entity
@planning_solutionMark a class as the planning solution

Type Annotations

from solverforge_legacy.solver.domain import (
    PlanningId,
    PlanningVariable,
    PlanningListVariable,
    PlanningEntityCollectionProperty,
    ProblemFactCollectionProperty,
    ValueRangeProvider,
    PlanningScore,
    PlanningPin,
    PlanningPinToIndex,
)
AnnotationUse WithPurpose
PlanningIdEntity fieldUnique identifier
PlanningVariableEntity fieldVariable to optimize
PlanningListVariableEntity fieldOrdered list of entities
PlanningEntityCollectionPropertySolution fieldCollection of entities
ProblemFactCollectionPropertySolution fieldImmutable input data
ValueRangeProviderSolution fieldPossible values for variables
PlanningScoreSolution fieldWhere score is stored
PlanningPinEntity fieldLock entity assignment
PlanningPinToIndexEntity fieldLock list position

Usage Pattern

from typing import Annotated
from dataclasses import dataclass, field

@planning_entity
@dataclass
class Lesson:
    id: Annotated[str, PlanningId]
    timeslot: Annotated[Timeslot | None, PlanningVariable] = field(default=None)

Shadow Variable Annotations

from solverforge_legacy.solver.domain import (
    InverseRelationShadowVariable,
    PreviousElementShadowVariable,
    NextElementShadowVariable,
    CascadingUpdateShadowVariable,
)
AnnotationPurpose
InverseRelationShadowVariableBack-reference to list owner
PreviousElementShadowVariablePrevious element in list
NextElementShadowVariableNext element in list
CascadingUpdateShadowVariableComputed value that cascades

Score Types

from solverforge_legacy.solver.score import (
    SimpleScore,
    HardSoftScore,
    HardMediumSoftScore,
    HardSoftDecimalScore,
)
TypeLevelsExample
SimpleScore1-5
HardSoftScore2-2hard/-15soft
HardMediumSoftScore3-1hard/-3medium/-10soft
HardSoftDecimalScore2 (decimal)-2hard/-15.5soft

Common Operations

score = HardSoftScore.of(-2, -15)
score.hard_score      # -2
score.soft_score      # -15
score.is_feasible     # False (hard_score < 0)

# Constants
HardSoftScore.ZERO
HardSoftScore.ONE_HARD
HardSoftScore.ONE_SOFT

Constraint Streams

from solverforge_legacy.solver.score import (
    constraint_provider,
    ConstraintFactory,
    Constraint,
    Joiners,
    ConstraintCollectors,
)

ConstraintFactory Methods

MethodPurpose
for_each(Class)Start stream with all instances
for_each_unique_pair(Class, *Joiners)All unique pairs
for_each_including_unassigned(Class)Include entities with null variables

Stream Operations

MethodPurpose
.filter(predicate)Filter elements
.join(Class, *Joiners)Join with another class
.if_exists(Class, *Joiners)Keep if matching exists
.if_not_exists(Class, *Joiners)Keep if no matching exists
.group_by(groupKey, collector)Group and aggregate
.flatten_last(mapper)Expand collection
.map(mapper)Transform elements
.complement(Class, filler)Add missing elements

Terminal Operations

MethodPurpose
.penalize(Score)Add penalty
.penalize(Score, weigher)Weighted penalty
.reward(Score)Add reward
.reward(Score, weigher)Weighted reward
.penalize_decimal(Score, weigher)Decimal penalty
.as_constraint(name)Name the constraint

Joiners

from solverforge_legacy.solver.score import Joiners
JoinerPurpose
Joiners.equal(extractor)Match on equality
Joiners.equal(extractorA, extractorB)Match properties
Joiners.less_than(extractorA, extractorB)A < B
Joiners.less_than_or_equal(extractorA, extractorB)A <= B
Joiners.greater_than(extractorA, extractorB)A > B
Joiners.greater_than_or_equal(extractorA, extractorB)A >= B
Joiners.overlapping(startA, endA, startB, endB)Time overlap
Joiners.overlapping(startA, endA)Same start/end extractors
Joiners.filtering(predicate)Custom filter

Collectors

from solverforge_legacy.solver.score import ConstraintCollectors
CollectorResult
count()Number of items
count_distinct(mapper)Distinct count
sum(mapper)Sum of values
min(mapper)Minimum value
max(mapper)Maximum value
average(mapper)Average value
to_list(mapper)Collect to list
to_set(mapper)Collect to set
load_balance(keyMapper, loadMapper)Fairness measure
compose(c1, c2, combiner)Combine collectors

Solver Configuration

from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

SolverConfig

config = SolverConfig(
    solution_class=Timetable,
    entity_class_list=[Lesson],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(
        spent_limit=Duration(seconds=30)
    ),
)

TerminationConfig Options

PropertyTypePurpose
spent_limitDurationTime limit
unimproved_spent_limitDurationTime without improvement
best_score_limitstrTarget score
best_score_feasibleboolStop when feasible

Solver API

from solverforge_legacy.solver import (
    SolverFactory,
    SolverManager,
    SolutionManager,
    SolverStatus,
)

SolverFactory

solver_factory = SolverFactory.create(config)
solver = solver_factory.build_solver()
solution = solver.solve(problem)

SolverManager

solver_manager = SolverManager.create(solver_factory)

# Async solving
solver_manager.solve_and_listen(
    problem_id,
    problem_finder=lambda _: problem,
    best_solution_consumer=on_best_solution,
)

# Control
solver_manager.terminate_early(problem_id)
status = solver_manager.get_solver_status(problem_id)
solver_manager.close()

SolverStatus

StatusMeaning
NOT_SOLVINGNot started
SOLVING_ACTIVECurrently solving
SOLVING_SCHEDULEDQueued

SolutionManager

solution_manager = SolutionManager.create(solver_factory)
analysis = solution_manager.analyze(solution)
score = solution_manager.update(solution)

Duration

from solverforge_legacy.solver.config import Duration

Duration(seconds=30)
Duration(minutes=5)
Duration(hours=1)

Event Listeners

from solverforge_legacy.solver import BestSolutionChangedEvent

def on_best_solution(event: BestSolutionChangedEvent):
    print(f"Score: {event.new_best_score}")
    print(f"Time: {event.time_spent}")

Problem Changes

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, solution: Timetable, problem_change_director):
        problem_change_director.add_entity(
            self.lesson,
            lambda l: solution.lessons.append(l)
        )

Import Summary

# Domain modeling
from solverforge_legacy.solver.domain import (
    planning_entity,
    planning_solution,
    PlanningId,
    PlanningVariable,
    PlanningListVariable,
    PlanningEntityCollectionProperty,
    ProblemFactCollectionProperty,
    ValueRangeProvider,
    PlanningScore,
    PlanningPin,
    InverseRelationShadowVariable,
    PreviousElementShadowVariable,
    NextElementShadowVariable,
    CascadingUpdateShadowVariable,
)

# Scores and constraints
from solverforge_legacy.solver.score import (
    constraint_provider,
    ConstraintFactory,
    Constraint,
    Joiners,
    ConstraintCollectors,
    HardSoftScore,
    HardMediumSoftScore,
    SimpleScore,
)

# Solver
from solverforge_legacy.solver import (
    SolverFactory,
    SolverManager,
    SolutionManager,
    SolverStatus,
    BestSolutionChangedEvent,
    ProblemChange,
)

# Configuration
from solverforge_legacy.solver.config import (
    SolverConfig,
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)

9.2 - FAQ

Frequently asked questions about SolverForge.

General

What is SolverForge?

SolverForge is a Python constraint solver for planning and scheduling optimization problems. It uses constraint streams to define rules and metaheuristic algorithms to find high-quality solutions.

What problems can SolverForge solve?

SolverForge excels at:

  • Employee scheduling: Shift assignment with skills, availability, fairness
  • Vehicle routing: Route optimization with capacity, time windows
  • School timetabling: Class scheduling with room and teacher constraints
  • Meeting scheduling: Room booking with attendee conflicts
  • Task assignment: Job shop, bin packing, resource allocation

How is SolverForge licensed?

SolverForge is open source software released under the Apache License 2.0. This allows commercial use, modification, and distribution.

Installation

What are the requirements?

  • Python 3.10 or later
  • JDK 17 or later (SolverForge uses the JVM for solving)

Why does SolverForge need Java?

SolverForge’s solving engine runs on the JVM for performance. The Python API communicates with the JVM transparently via JPype.

How do I install it?

pip install solverforge-legacy

Make sure JAVA_HOME is set or Java is on your PATH.

Modeling

What’s the difference between problem facts and planning entities?

TypeChanges During SolvingExample
Problem factsNo (input data)Rooms, Timeslots, Employees
Planning entitiesYes (variables assigned)Lessons, Shifts, Visits

When should I use PlanningVariable vs PlanningListVariable?

TypeUse CaseExample
PlanningVariableAssign one valueLesson → Timeslot
PlanningListVariableOrdered list of entitiesVehicle → list of Visits

Can I use Pydantic instead of dataclasses?

Yes. Both dataclasses and Pydantic models work. The quickstart examples show both patterns.

How do I pin (lock) an assignment?

Add PlanningPin to a boolean field:

@planning_entity
class Lesson:
    pinned: Annotated[bool, PlanningPin] = False

Set pinned=True to prevent the solver from changing that entity’s variables.

Constraints

What’s the difference between hard and soft constraints?

TypeMeaningExample
HardMust not violateNo room conflicts
SoftPrefer to satisfyTeacher prefers certain room

Hard constraints define feasibility. Soft constraints define quality.

Why use for_each_unique_pair instead of for_each + join?

for_each_unique_pair is more efficient and avoids counting conflicts twice:

# Good - each pair counted once
constraint_factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),
    Joiners.equal(lambda l: l.room),
)

# Less efficient - (A,B) and (B,A) both matched
constraint_factory.for_each(Lesson).join(Lesson, ...)

How do I debug a constraint?

  1. Use SolutionManager.analyze() to see the score breakdown:
analysis = solution_manager.analyze(solution)
for c in analysis.constraint_analyses():
    print(f"{c.constraint_name}: {c.score}")
  1. Examine individual matches:
for match in constraint_analysis.matches():
    print(f"  {match.justification}")

Why is my score always infeasible?

Common causes:

  • Not enough resources (rooms, timeslots, employees) for entities
  • Conflicting hard constraints that can’t all be satisfied
  • Uninitialized entities (variables still None)

Try:

  • Increasing termination time
  • Relaxing some hard constraints to soft
  • Adding more resources

Solving

How long should I let the solver run?

Depends on problem size and complexity:

Problem SizeTypical Time
Small (< 100 entities)10-60 seconds
Medium (100-1000 entities)1-10 minutes
Large (> 1000 entities)10+ minutes

Use benchmarking to find the optimal time for your problem.

Why isn’t the score improving?

Possible causes:

  • Stuck in local optimum (try different algorithm)
  • All hard constraints satisfied (now optimizing soft)
  • Constraints are too restrictive

Try:

  • Simulated Annealing or Late Acceptance instead of Tabu Search
  • Longer termination time
  • Review constraint design

How do I stop solving early?

With Solver:

# External termination (from another thread)
solver.terminate_early()

With SolverManager:

solver_manager.terminate_early(problem_id)

Can I get progress updates during solving?

Yes, use SolverManager with a listener:

solver_manager.solve_and_listen(
    problem_id,
    problem_finder=lambda _: problem,
    best_solution_consumer=lambda solution: print(f"Score: {solution.score}"),
)

Performance

How do I make constraints faster?

  1. Use Joiners instead of filter():
# Fast - indexing
Joiners.equal(lambda lesson: lesson.timeslot)

# Slower - Python filter
.filter(lambda l1, l2: l1.timeslot == l2.timeslot)
  1. Cache computed values in entity fields
  2. Avoid expensive operations in lambdas

How do I scale to larger problems?

  • Increase termination time
  • Use more efficient constraints
  • Consider partitioning large problems
  • Use PlanningListVariable for routing problems

Should I use multiple threads?

The solver is single-threaded by design for score calculation consistency. Use SolverManager to solve multiple problems concurrently.

Integration

Can I use SolverForge with FastAPI?

Yes! See the FastAPI Integration guide. Key pattern:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global solver_manager
    solver_manager = SolverManager.create(solver_factory)
    yield
    solver_manager.close()

How do I serialize solutions to JSON?

Use Pydantic models or dataclasses.asdict():

# With dataclasses
import json
from dataclasses import asdict

json.dumps(asdict(solution))

# With Pydantic
solution.model_dump_json()

See Serialization for handling references.

Can I save and load solutions?

Yes, serialize to JSON and deserialize back:

# Save
with open("solution.json", "w") as f:
    json.dump(solution_to_dict(solution), f)

# Load
with open("solution.json") as f:
    problem = dict_to_solution(json.load(f))

Troubleshooting

“No JVM shared library file (libjvm.so) found”

Java isn’t installed or JAVA_HOME isn’t set:

# Check Java
java -version

# Set JAVA_HOME (example for Linux)
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk

“Score corruption detected”

A constraint is producing inconsistent scores. Common causes:

  • Non-deterministic lambdas
  • External state changes
  • Incorrect shadow variable updates

Run with RUST_LOG=debug to see details.

“OutOfMemoryError” in the JVM

Increase JVM heap:

export JAVA_TOOL_OPTIONS="-Xmx4g"

More Help