This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Getting Started

Quickstart Guides — repository layout, prerequisites, and how to run examples locally.

Legacy Implementation Guides

These quickstart guides use solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.

SolverForge has been completely rewritten as a native constraint solver in Rust, with its own solving engine built from scratch. These guides are preserved as:

  • Reference material for understanding constraint solving concepts
  • Educational examples of constraint modeling patterns
  • Demonstration of optimization problem domains

The JPype bridge and Timefold-based architecture described in these guides do not apply to current SolverForge.

Native Python bindings for the Rust implementation are under active development.

Choose a Quickstart

Employee Scheduling

Assign staff to shifts based on skills and availability. Perfect for learning core optimization concepts. [Start Tutorial →](employee-scheduling/)

Meeting Scheduling

Find optimal times and rooms for meetings while avoiding conflicts. [Start Tutorial →](meeting-scheduling/)

Vehicle Routing

Plan delivery routes that minimize travel time with capacity constraints. [Start Tutorial →](vehicle-routing/)

School Timetabling

Schedule lessons to rooms and timeslots without teacher or room conflicts. [Start Tutorial →](school-timetabling/)

Portfolio Optimization

Select stocks for a diversified portfolio while maximizing expected returns. [Start Tutorial →](portfolio-optimization/)

VM Placement

Place virtual machines on servers respecting capacity, affinity, and consolidation goals. [Start Tutorial →](vm-placement/)

Rust Quickstart

Build a solver using the core Rust library directly. For advanced users interested in the internals. [Start Tutorial →](rust-quickstart/)


This page covers:

  • Repository layout and quickstart variants
  • Prerequisites and installation notes
  • How to run an example locally
  • Where to find benchmarks, technical notes and individual quickstart READMEs

Repository layout

The repository is organised so you can choose between pedagogical, reference implementations and optimized, performance-minded variants:

  • legacy/ — Refactored quickstarts that minimize runtime overhead by constraining Pydantic to the API boundary and using lighter-weight models during solver moves.
  • benchmarks/ — Benchmarks, results and a short performance report comparing implementations and use cases.

Common quickstarts available now:

  • legacy/meeting-scheduling-fast
  • legacy/vehicle-routing-fast
  • legacy/employee-scheduling-fast
  • legacy/portfolio-optimization-fast
  • legacy/vm-placement-fast

Each use case folder includes a README describing how to run the example, expected inputs, and any implementation-specific details.

Prerequisites

Typical requirements (may vary per quickstart):

  • Python 3.8+ (use a virtual environment)
  • pip to install dependencies
  • Optional: Docker if you prefer containerised execution

Some examples expose a small FastAPI UI or HTTP API and will list FastAPI and related packages in their requirements.txt or pyproject.toml.

Installation

  1. Clone or download the SolverForge quickstarts repository.

  2. Create and activate a virtual environment:

    • Unix/macOS:
      • python -m venv .venv
      • source .venv/bin/activate
    • Windows:
      • python -m venv .venv
      • .\\.venv\\Scripts\\activate
  3. Install dependencies from the chosen quickstart directory:

    • pip install -r requirements.txt
    • Or follow the quickstart’s pyproject.toml instructions if provided.

Each quickstart README documents any extra dependencies or optional tooling.

Setup

  • Inspect the quickstart folder for example data, configuration files, and environment variables.
  • If the quickstart includes Docker assets, follow the README for Docker or docker-compose instructions.
  • Confirm any required ports or external resources before starting the example.

Try it out!

Most quickstarts offer one or both run modes:

  • A minimal FastAPI service that serves a tiny UI and HTTP endpoints.
  • A CLI script that runs the solver on example data and outputs results.

To try a quick example:

  1. Open the quickstart folder of interest (for example legacy/meeting-scheduling-fast).
  2. Follow the run instructions in that folder’s README. Common commands are:
    • python -m <module> or uvicorn for FastAPI-based examples.
    • python run_demo.py or similar CLI entrypoints described in the README.

Check these README files for concrete run commands:

  • legacy/vehicle-routing/README.MD
  • legacy/vehicle-routing-fast/README.MD
  • legacy/meeting-scheduling-fast/README.adoc
  • legacy/employee-scheduling-fast/README.MD

Benchmarks & performance

Performance-focused work and benchmark artifacts live in the benchmarks/ folder:

  • benchmarks/results_meeting-scheduling.md
  • benchmarks/results_vehicle-routing.md
  • benchmarks/report.md

Where to read more

  • Start at the repository top-level README for an overview and the full use-case list.
  • Read the individual quickstart READMEs for run instructions, configuration and design notes.
  • Consult benchmarks/ for performance comparisons and technical rationale.

This repository derives from prior quickstarts and carries permissive licensing details documented in the top-level README and LICENSE files. Refer to those files for full copyright and licensing information.

1 - Employee Scheduling (Rust)

Build efficient employee scheduling with SolverForge’s native Rust constraint solver

Native Rust Implementation

This guide uses SolverForge’s native Rust constraint solver — a fully monomorphized, zero-erasure implementation with no JVM bridge. All constraints compile to concrete types at build time, enabling aggressive compiler optimizations and true native performance.

If you’re looking for the Python implementation (legacy JPype bridge), see Employee Scheduling (Python).


Table of Contents

  1. Introduction
  2. Getting Started
  3. The Problem We’re Solving
  4. Understanding the Data Model
  5. How Optimization Works
  6. Writing Constraints: The Business Rules
  7. The Solver Engine
  8. Web Interface and API
  9. Making Your First Customization
  10. Advanced Constraint Patterns
  11. Quick Reference

Introduction

What You’ll Learn

This guide walks you through a complete employee scheduling application built with SolverForge, a native Rust constraint-based optimization framework. You’ll learn:

  • How to model real-world scheduling problems as optimization problems using Rust’s type system
  • How to express business rules as constraints using a fluent API
  • How SolverForge’s zero-erasure architecture enables native performance
  • How to customize the system for your specific needs

No optimization background required — we’ll explain concepts as we encounter them in the code.

Prerequisites

  • Rust knowledge (structs, traits, closures, derive macros)
  • Familiarity with REST APIs
  • Comfort with command-line operations

What is Constraint-Based Optimization?

Traditional programming: You write explicit logic that says “do this, then that.”

Constraint-based optimization: You describe what a good solution looks like and the solver figures out how to achieve it.

Think of it like describing what puzzle pieces you have and what rules they must follow — then having a computer try millions of arrangements per second to find the best fit.

Why Native Rust?

SolverForge’s Rust implementation offers key advantages:

  • Zero-erasure architecture: All generic types resolve at compile time — no Box<dyn Trait>, no Arc, no runtime dispatch
  • Full monomorphization: Each constraint compiles to specialized machine code
  • Memory efficiency: Index-based references instead of string cloning
  • True parallelism: No GIL, no JVM pause, native threading

Getting Started

Running the Application

  1. Clone the quickstarts repository:

    git clone https://github.com/SolverForge/solverforge-quickstarts
    cd ./solverforge-quickstarts/rust/employee-scheduling
    
  2. Build the project:

    cargo build --release
    
  3. Run the server:

    cargo run --release
    
  4. Open your browser:

    http://localhost:7860
    

You’ll see a scheduling interface with employees, shifts and a “Solve” button. Click it and watch the solver automatically assign employees to shifts while respecting business rules.

File Structure Overview

rust/employee-scheduling/
├── src/
│   ├── main.rs           # Axum server entry point
│   ├── lib.rs            # Library crate root
│   ├── domain.rs         # Data models (Employee, Shift, EmployeeSchedule)
│   ├── constraints.rs    # Business rules (90% of customization happens here)
│   ├── api.rs            # REST endpoint handlers
│   ├── dto.rs            # JSON serialization types
│   └── demo_data.rs      # Sample data generation
├── static/
│   ├── index.html        # Web UI
│   └── app.js            # UI logic and visualization
└── Cargo.toml            # Dependencies

Key insight: Most business customization happens in constraints.rs alone. You rarely need to modify other files.


The Problem We’re Solving

The Scheduling Challenge

You need to assign employees to shifts while satisfying rules like:

Hard constraints (must be satisfied):

  • Employee must have the required skill for the shift
  • Employee can’t work overlapping shifts
  • Employee needs 10 hours rest between shifts
  • Employee can’t work more than one shift per day
  • Employee can’t work on days they’re unavailable

Soft constraints (preferences to optimize):

  • Avoid scheduling on days the employee marked as “undesired”
  • Prefer scheduling on days the employee marked as “desired”
  • Balance workload fairly across all employees

Why This is Hard

For even 20 shifts and 10 employees, there are 10^20 possible assignments (100 quintillion). A human can’t evaluate them all. Even a computer trying random assignments would take years.

Optimization algorithms use smart strategies to explore this space efficiently, finding high-quality solutions in seconds.


Understanding the Data Model

Let’s examine the three core structs that model our problem. Open src/domain.rs:

The Employee Struct (Problem Fact)

#[problem_fact]
#[derive(Serialize, Deserialize)]
pub struct Employee {
    /// Index of this employee in `EmployeeSchedule.employees` for O(1) join matching.
    pub index: usize,
    pub name: String,
    pub skills: HashSet<String>,
    #[serde(rename = "unavailableDates", default)]
    pub unavailable_dates: HashSet<NaiveDate>,
    #[serde(rename = "undesiredDates", default)]
    pub undesired_dates: HashSet<NaiveDate>,
    #[serde(rename = "desiredDates", default)]
    pub desired_dates: HashSet<NaiveDate>,
    /// Sorted unavailable dates for `flatten_last` compatibility.
    #[serde(skip)]
    pub unavailable_days: Vec<NaiveDate>,
    #[serde(skip)]
    pub undesired_days: Vec<NaiveDate>,
    #[serde(skip)]
    pub desired_days: Vec<NaiveDate>,
}

What it represents: A person who can be assigned to shifts.

Key fields:

  • index: Position in the employees array — enables O(1) lookups without string comparison
  • name: Human-readable identifier
  • skills: What skills this employee possesses (e.g., {"Doctor", "Cardiology"})
  • unavailable_dates: Days the employee absolutely cannot work (hard constraint)
  • undesired_dates / desired_dates: Soft preference fields

Rust-specific design:

  • #[problem_fact]: Derive macro that marks this as immutable solver data
  • HashSet<NaiveDate> for O(1) membership testing during JSON deserialization
  • Vec<NaiveDate> sorted copies for flatten_last stream compatibility

The Builder Pattern with finalize()

The Employee struct uses a builder pattern with explicit finalization:

impl Employee {
    pub fn new(index: usize, name: impl Into<String>) -> Self {
        Self {
            index,
            name: name.into(),
            skills: HashSet::new(),
            // ... fields initialized empty
        }
    }

    /// Populates derived Vec fields from HashSets for zero-erasure stream compatibility.
    /// Must be called after all dates have been added to HashSets.
    pub fn finalize(&mut self) {
        self.unavailable_days = self.unavailable_dates.iter().copied().collect();
        self.unavailable_days.sort();
        // ... same for undesired_days, desired_days
    }

    pub fn with_skill(mut self, skill: impl Into<String>) -> Self {
        self.skills.insert(skill.into());
        self
    }
}

Why finalize()? The constraint stream API’s flatten_last operation requires sorted slices for O(1) index-based lookups. After JSON deserialization or programmatic construction, finalize() converts HashSets to sorted Vecs.

The Shift Struct (Planning Entity)

#[planning_entity]
#[derive(Serialize, Deserialize)]
pub struct Shift {
    #[planning_id]
    pub id: String,
    pub start: NaiveDateTime,
    pub end: NaiveDateTime,
    pub location: String,
    #[serde(rename = "requiredSkill")]
    pub required_skill: String,
    /// Index into `EmployeeSchedule.employees` (O(1) lookup, no String cloning).
    #[planning_variable(allows_unassigned = true)]
    pub employee_idx: Option<usize>,
}

What it represents: A time slot that needs an employee assignment.

Key annotations:

  • #[planning_entity]: Derive macro marking this as containing decisions to optimize
  • #[planning_id]: Marks id as the unique identifier
  • #[planning_variable(allows_unassigned = true)]: The decision variable — what the solver assigns

Critical design choice — index-based references:

pub employee_idx: Option<usize>  // ✓ O(1) lookup, no allocation
// NOT: pub employee: Option<String>  // ✗ String clone on every comparison

Using usize indices instead of employee names provides:

  • O(1) lookups via schedule.employees[idx]
  • Zero allocations during constraint evaluation
  • Direct equality comparison (integer vs string)

The EmployeeSchedule Struct (Planning Solution)

#[planning_solution]
#[basic_variable_config(
    entity_collection = "shifts",
    variable_field = "employee_idx",
    variable_type = "usize",
    value_range = "employees"
)]
#[solverforge_constraints_path = "crate::constraints::create_fluent_constraints"]
#[derive(Serialize, Deserialize)]
pub struct EmployeeSchedule {
    #[problem_fact_collection]
    pub employees: Vec<Employee>,
    #[planning_entity_collection]
    pub shifts: Vec<Shift>,
    #[planning_score]
    pub score: Option<HardSoftDecimalScore>,
    #[serde(rename = "solverStatus", skip_serializing_if = "Option::is_none")]
    pub solver_status: Option<String>,
}

What it represents: The complete problem and its solution.

Annotations explained:

  • #[planning_solution]: Top-level problem definition
  • #[basic_variable_config(...)]: Declarative configuration specifying:
    • Which collection contains planning entities (shifts)
    • Which field is the planning variable (employee_idx)
    • The variable’s type (usize)
    • Where valid values come from (employees)
  • #[solverforge_constraints_path]: Points to the constraint factory function
  • #[problem_fact_collection]: Immutable data (doesn’t change during solving)
  • #[planning_entity_collection]: Entities being optimized
  • #[planning_score]: Where the solver stores the calculated score

How Optimization Works

Before diving into constraints, let’s understand how the solver finds solutions.

The Search Process (Simplified)

  1. Start with an initial solution (often random or all unassigned)
  2. Evaluate the score using your constraint functions
  3. Make a small change (assign a different employee to one shift)
  4. Evaluate the new score
  5. Keep the change if it improves the score (with some controlled randomness)
  6. Repeat millions of times in seconds
  7. Return the best solution found

Why This Works: Metaheuristics

SolverForge uses sophisticated metaheuristic algorithms like:

  • Tabu Search: Remembers recent moves to avoid cycling
  • Simulated Annealing: Occasionally accepts worse solutions to escape local optima
  • Late Acceptance: Compares current solution to recent history, not just the immediate previous

These techniques efficiently explore the massive solution space without getting stuck.

The Score: How “Good” is a Solution?

Every solution gets a score with two parts:

0hard/-45soft
  • Hard score: Counts hard constraint violations (must be 0 for a valid solution)
  • Soft score: Counts soft constraint violations/rewards (higher is better)

Scoring rules:

  • Hard score must be 0 or positive (negative = invalid/infeasible)
  • Among valid solutions (hard score = 0), highest soft score wins
  • Hard score always takes priority over soft score

Writing Constraints: The Business Rules

Now the heart of the system. Open src/constraints.rs.

The Constraint Factory Pattern

All constraints are created in one function:

pub fn create_fluent_constraints() -> impl ConstraintSet<EmployeeSchedule, HardSoftDecimalScore> {
    let factory = ConstraintFactory::<EmployeeSchedule, HardSoftDecimalScore>::new();

    // Build each constraint...
    let required_skill = factory.clone()
        .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
        // ...

    // Return all constraints as a tuple
    (
        required_skill,
        no_overlap,
        at_least_10_hours,
        one_per_day,
        unavailable,
        undesired,
        desired,
        balanced,
    )
}

The function returns impl ConstraintSet — a trait implemented for tuples of constraints. Each constraint is fully typed with no runtime dispatch.

Hard Constraint: Required Skill

Business rule: “An employee assigned to a shift must have the required skill.”

let required_skill = factory
    .clone()
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .join(
        |s: &EmployeeSchedule| s.employees.as_slice(),
        equal_bi(
            |shift: &Shift| shift.employee_idx,
            |emp: &Employee| Some(emp.index),
        ),
    )
    .filter(|shift: &Shift, emp: &Employee| {
        shift.employee_idx.is_some() && !emp.skills.contains(&shift.required_skill)
    })
    .penalize(HardSoftDecimalScore::ONE_HARD)
    .as_constraint("Required skill");

How to read this:

  1. for_each(|s| s.shifts.as_slice()): Stream over all shifts
  2. .join(..., equal_bi(...)): Join with employees where shift.employee_idx == Some(emp.index)
  3. .filter(...): Keep only where employee lacks the required skill
  4. .penalize(ONE_HARD): Each violation subtracts 1 from hard score
  5. .as_constraint(...): Name for debugging

Key Rust adaptation — equal_bi joiner:

equal_bi(
    |shift: &Shift| shift.employee_idx,      // Option<usize>
    |emp: &Employee| Some(emp.index),         // Option<usize>
)

The equal_bi joiner takes two closures — one for each side of the join. This enables joining different types with potentially different key extraction logic.

Hard Constraint: No Overlapping Shifts

Business rule: “An employee can’t work two shifts that overlap in time.”

let no_overlap = factory
    .clone()
    .for_each_unique_pair(
        |s: &EmployeeSchedule| s.shifts.as_slice(),
        joiner::equal(|shift: &Shift| shift.employee_idx),
    )
    .filter(|a: &Shift, b: &Shift| {
        a.employee_idx.is_some() && a.start < b.end && b.start < a.end
    })
    .penalize_hard_with(|a: &Shift, b: &Shift| {
        HardSoftDecimalScore::of_hard_scaled(overlap_minutes(a, b) * 100000)
    })
    .as_constraint("Overlapping shift");

How to read this:

  1. for_each_unique_pair(...): Create pairs of shifts from the same collection
  2. joiner::equal(|shift| shift.employee_idx): Only pair shifts with the same employee
  3. .filter(...): Check time overlap with interval comparison
  4. .penalize_hard_with(...): Variable penalty based on overlap duration

Optimization concept: for_each_unique_pair ensures we don’t count violations twice (A,B vs B,A). The joiner uses hash indexing for O(1) pair matching.

Hard Constraint: Rest Between Shifts

Business rule: “Employees need at least 10 hours rest between shifts.”

let at_least_10_hours = factory
    .clone()
    .for_each_unique_pair(
        |s: &EmployeeSchedule| s.shifts.as_slice(),
        joiner::equal(|shift: &Shift| shift.employee_idx),
    )
    .filter(|a: &Shift, b: &Shift| a.employee_idx.is_some() && gap_penalty_minutes(a, b) > 0)
    .penalize_hard_with(|a: &Shift, b: &Shift| {
        HardSoftDecimalScore::of_hard_scaled(gap_penalty_minutes(a, b) * 100000)
    })
    .as_constraint("At least 10 hours between 2 shifts");

Helper function:

fn gap_penalty_minutes(a: &Shift, b: &Shift) -> i64 {
    const MIN_GAP_MINUTES: i64 = 600;  // 10 hours

    let (earlier, later) = if a.end <= b.start {
        (a, b)
    } else if b.end <= a.start {
        (b, a)
    } else {
        return 0;  // Overlapping, handled by different constraint
    };

    let gap = (later.start - earlier.end).num_minutes();
    if (0..MIN_GAP_MINUTES).contains(&gap) {
        MIN_GAP_MINUTES - gap
    } else {
        0
    }
}

Optimization concept: The penalty 600 - actual_gap creates incremental guidance. 9 hours rest (penalty 60) is better than 5 hours rest (penalty 300).

Hard Constraint: One Shift Per Day

Business rule: “Employees can work at most one shift per calendar day.”

let one_per_day = factory
    .clone()
    .for_each_unique_pair(
        |s: &EmployeeSchedule| s.shifts.as_slice(),
        joiner::equal(|shift: &Shift| (shift.employee_idx, shift.date())),
    )
    .filter(|a: &Shift, b: &Shift| a.employee_idx.is_some() && b.employee_idx.is_some())
    .penalize(HardSoftDecimalScore::ONE_HARD)
    .as_constraint("One shift per day");

Key pattern — tuple joiner:

joiner::equal(|shift: &Shift| (shift.employee_idx, shift.date()))

The joiner matches on a tuple (Option<usize>, NaiveDate) — same employee AND same date.

Hard Constraint: Unavailable Employee

Business rule: “Employees cannot work on days they marked as unavailable.”

let unavailable = factory
    .clone()
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .join(
        |s: &EmployeeSchedule| s.employees.as_slice(),
        equal_bi(
            |shift: &Shift| shift.employee_idx,
            |emp: &Employee| Some(emp.index),
        ),
    )
    .flatten_last(
        |emp: &Employee| emp.unavailable_days.as_slice(),
        |date: &NaiveDate| *date,      // C → index key
        |shift: &Shift| shift.date(),  // A → lookup key
    )
    .filter(|shift: &Shift, date: &NaiveDate| {
        shift.employee_idx.is_some() && shift_date_overlap_minutes(shift, *date) > 0
    })
    .penalize_hard_with(|shift: &Shift, date: &NaiveDate| {
        HardSoftDecimalScore::of_hard_scaled(shift_date_overlap_minutes(shift, *date) * 100000)
    })
    .as_constraint("Unavailable employee");

The flatten_last Operation:

.flatten_last(
    |emp: &Employee| emp.unavailable_days.as_slice(),  // Collection to flatten
    |date: &NaiveDate| *date,                          // Index key extractor
    |shift: &Shift| shift.date(),                      // Lookup key extractor
)

This is a powerful pattern unique to SolverForge’s fluent API:

  1. Takes the last element of the current tuple (the Employee)
  2. Flattens their unavailable_days collection
  3. Pre-indexes by the date key
  4. On lookup, finds matching dates in O(1) using the shift’s date

Why sorted Vecs? The flatten_last operation uses binary search internally, requiring sorted input. That’s why Employee::finalize() sorts the date vectors.

Soft Constraint: Undesired Days

Business rule: “Prefer not to schedule employees on days they marked as undesired.”

let undesired = factory
    .clone()
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .join(
        |s: &EmployeeSchedule| s.employees.as_slice(),
        equal_bi(
            |shift: &Shift| shift.employee_idx,
            |emp: &Employee| Some(emp.index),
        ),
    )
    .flatten_last(
        |emp: &Employee| emp.undesired_days.as_slice(),
        |date: &NaiveDate| *date,
        |shift: &Shift| shift.date(),
    )
    .filter(|shift: &Shift, _date: &NaiveDate| shift.employee_idx.is_some())
    .penalize(HardSoftDecimalScore::ONE_SOFT)
    .as_constraint("Undesired day for employee");

Key difference: Uses ONE_SOFT instead of ONE_HARD. The solver will try to avoid undesired days but may violate this if necessary.

Soft Constraint: Desired Days (Reward)

Business rule: “Prefer to schedule employees on days they marked as desired.”

let desired = factory
    .clone()
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .join(
        |s: &EmployeeSchedule| s.employees.as_slice(),
        equal_bi(
            |shift: &Shift| shift.employee_idx,
            |emp: &Employee| Some(emp.index),
        ),
    )
    .flatten_last(
        |emp: &Employee| emp.desired_days.as_slice(),
        |date: &NaiveDate| *date,
        |shift: &Shift| shift.date(),
    )
    .filter(|shift: &Shift, _date: &NaiveDate| shift.employee_idx.is_some())
    .reward(HardSoftDecimalScore::ONE_SOFT)
    .as_constraint("Desired day for employee");

Key difference: Uses .reward() instead of .penalize(). Rewards increase the score.

Soft Constraint: Load Balancing

Business rule: “Distribute shifts fairly across employees.”

let balanced = factory
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .balance(|shift: &Shift| shift.employee_idx)
    .penalize(HardSoftDecimalScore::of_soft(1))
    .as_constraint("Balance employee assignments");

The balance() Operation:

This is the simplest and most powerful load balancing pattern:

  1. Groups shifts by the grouping key (employee index)
  2. Calculates standard deviation incrementally
  3. Penalizes based on unfairness metric

Unlike manual group_by + count + math, the balance() operation:

  • Maintains O(1) incremental updates during solving
  • Handles edge cases (empty groups, single element)
  • Provides mathematically sound fairness calculation

The Solver Engine

Configuration via Derive Macros

Unlike configuration files, SolverForge uses compile-time configuration through derive macros:

#[planning_solution]
#[basic_variable_config(
    entity_collection = "shifts",
    variable_field = "employee_idx",
    variable_type = "usize",
    value_range = "employees"
)]
#[solverforge_constraints_path = "crate::constraints::create_fluent_constraints"]
pub struct EmployeeSchedule { ... }

This generates:

  • The Solvable trait implementation
  • Variable descriptor metadata
  • Move selector configuration
  • Constraint factory wiring

The Solvable Trait

The derive macro implements Solvable:

// Generated by #[planning_solution]
impl Solvable for EmployeeSchedule {
    type Score = HardSoftDecimalScore;

    fn solve(self, config: Option<SolverConfig>, callback: Sender<...>) {
        // Metaheuristic search loop
    }
}

Starting the Solver

In src/api.rs:

async fn create_schedule(
    State(state): State<Arc<AppState>>,
    Json(dto): Json<ScheduleDto>,
) -> String {
    let id = Uuid::new_v4().to_string();
    let schedule = dto.to_domain();

    // Store initial state
    {
        let mut jobs = state.jobs.write();
        jobs.insert(id.clone(), SolveJob {
            solution: schedule.clone(),
            solver_status: "SOLVING".to_string(),
        });
    }

    // Create channel for solution updates
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

    // Spawn async task to receive updates
    tokio::spawn(async move {
        while let Some((solution, _score)) = rx.recv().await {
            // Update stored solution
        }
    });

    // Spawn solver on rayon thread pool
    rayon::spawn(move || {
        schedule.solve(None, tx);
    });

    id
}

Architecture notes:

  • Solving runs on rayon thread pool (CPU-bound work)
  • Updates sent via tokio::sync::mpsc channel
  • Async Axum handler for non-blocking HTTP
  • parking_lot::RwLock for thread-safe state access

TypedScoreDirector for Analysis

For score breakdown without solving:

async fn analyze_schedule(Json(dto): Json<ScheduleDto>) -> Json<AnalyzeResponse> {
    let schedule = dto.to_domain();
    let constraints = create_fluent_constraints();
    let director = TypedScoreDirector::new(schedule, constraints);

    let score = director.get_score();
    let analyses = director
        .constraints()
        .evaluate_detailed(director.working_solution());

    // Convert to DTO...
}

The TypedScoreDirector:

  • Evaluates all constraints against a solution
  • Returns detailed match information per constraint
  • No actual solving — just score calculation

Web Interface and API

REST API Endpoints

The API is built with Axum (src/api.rs):

pub fn router(state: Arc<AppState>) -> Router {
    Router::new()
        .route("/health", get(health))
        .route("/healthz", get(health))
        .route("/info", get(info))
        .route("/demo-data", get(list_demo_data))
        .route("/demo-data/{id}", get(get_demo_data))
        .route("/schedules", post(create_schedule))
        .route("/schedules", get(list_schedules))
        .route("/schedules/analyze", put(analyze_schedule))
        .route("/schedules/{id}", get(get_schedule))
        .route("/schedules/{id}/status", get(get_schedule_status))
        .route("/schedules/{id}", delete(stop_solving))
        .with_state(state)
}

GET /demo-data

Returns available demo datasets:

["SMALL", "LARGE"]

GET /demo-data/{id}

Generates and returns sample data:

{
  "employees": [
    {
      "name": "Amy Cole",
      "skills": ["Doctor", "Cardiology"],
      "unavailableDates": ["2026-01-25"],
      "undesiredDates": ["2026-01-26"],
      "desiredDates": ["2026-01-27"]
    }
  ],
  "shifts": [
    {
      "id": "0",
      "start": "2026-01-25T06:00:00",
      "end": "2026-01-25T14:00:00",
      "location": "Ambulatory care",
      "requiredSkill": "Doctor",
      "employee": null
    }
  ]
}

POST /schedules

Submit a schedule to solve. Returns job ID:

"a1b2c3d4-e5f6-7890-abcd-ef1234567890"

GET /schedules/{id}

Get current solution state:

{
  "employees": [...],
  "shifts": [...],
  "score": "0hard/-12soft",
  "solverStatus": "SOLVING"
}

DELETE /schedules/{id}

Stop solving early. Returns 204 No Content.

PUT /schedules/analyze

Analyze a schedule without solving:

{
  "score": "-2hard/-45soft",
  "constraints": [
    {
      "name": "Required skill",
      "constraintType": "hard",
      "weight": "1hard",
      "score": "-2hard",
      "matches": [
        {
          "score": "-1hard",
          "justification": "Shift 5 assigned to Amy Cole without required skill"
        }
      ]
    }
  ]
}

Server Entry Point

src/main.rs:

#[tokio::main]
async fn main() {
    solverforge::console::init();

    let state = Arc::new(api::AppState::new());

    let cors = CorsLayer::new()
        .allow_origin(Any)
        .allow_methods(Any)
        .allow_headers(Any);

    let static_path = if PathBuf::from("examples/employee-scheduling/static").exists() {
        "examples/employee-scheduling/static"
    } else {
        "static"
    };

    let app = api::router(state)
        .fallback_service(ServeDir::new(static_path))
        .layer(cors);

    let addr = SocketAddr::from(([0, 0, 0, 0], 7860));
    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();

    println!("Server running at http://localhost:{}", addr.port());
    axum::serve(listener, app).await.unwrap();
}

Making Your First Customization

Let’s modify an existing constraint to understand the pattern.

Adjusting Constraint Weights

The balancing constraint currently uses a weight of 1:

let balanced = factory
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .balance(|shift: &Shift| shift.employee_idx)
    .penalize(HardSoftDecimalScore::of_soft(1))  // Weight: 1
    .as_constraint("Balance employee assignments");

To make fairness more important relative to other soft constraints:

    .penalize(HardSoftDecimalScore::of_soft(10))  // Weight: 10

Now each unit of imbalance costs 10 soft points instead of 1, making the solver prioritize fair distribution over other soft preferences.

Adding a New Hard Constraint

Let’s add: “No employee can work more than 5 shifts total.”

In src/constraints.rs, add the constraint:

use solverforge::stream::collector::count;

// Inside create_fluent_constraints()
let max_shifts = factory
    .clone()
    .for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
    .filter(|shift: &Shift| shift.employee_idx.is_some())
    .group_by(|shift: &Shift| shift.employee_idx, count())
    .penalize_hard_with(|shift_count: &usize| {
        if *shift_count > 5 {
            HardSoftDecimalScore::of_hard((*shift_count - 5) as i64)
        } else {
            HardSoftDecimalScore::ZERO
        }
    })
    .as_constraint("Max 5 shifts per employee");

Then add it to the return tuple:

(
    required_skill,
    no_overlap,
    at_least_10_hours,
    one_per_day,
    unavailable,
    undesired,
    desired,
    balanced,
    max_shifts,  // Add here
)

Rebuild and test:

cargo build --release
cargo run --release

Advanced Constraint Patterns

Zero-Erasure Architecture

SolverForge’s constraints are fully monomorphized — every generic parameter resolves to concrete types at compile time:

// This type is FULLY concrete at compile time:
ConstraintStream<
    EmployeeSchedule,
    HardSoftDecimalScore,
    (Shift, Employee),  // Tuple of concrete types
    JoinedMatcher<...>  // Concrete matcher type
>

No Box<dyn Constraint>, no Arc<dyn Stream>, no vtable dispatch. The compiler sees the entire constraint graph and can inline, vectorize, and optimize aggressively.

Index-Based References

The pattern of using indices instead of cloned values:

// In Shift
pub employee_idx: Option<usize>  // Index into employees array

// In constraint - O(1) lookup
.filter(|shift: &Shift, emp: &Employee| {
    shift.employee_idx == Some(emp.index)
})

Benefits:

  • Integer comparison vs string comparison
  • No allocation during constraint evaluation
  • Cache-friendly memory access patterns

The flatten_last Pattern

For constraints involving collections on joined entities:

.join(employees, equal_bi(...))
.flatten_last(
    |emp| emp.unavailable_days.as_slice(),  // What to flatten
    |date| *date,                            // Index key
    |shift| shift.date(),                    // Lookup key
)

This creates an indexed structure during stream setup, enabling O(1) lookups during constraint evaluation.

Custom Penalty Functions

Variable penalties guide the solver more effectively:

.penalize_hard_with(|a: &Shift, b: &Shift| {
    let overlap = overlap_minutes(a, b);
    HardSoftDecimalScore::of_hard_scaled(overlap * 100000)
})

The 100000 multiplier ensures minute-level granularity affects the score meaningfully compared to unit penalties.


Quick Reference

File Locations

Need to…Edit this file
Add/change business rulesrc/constraints.rs
Add field to Employee/Shiftsrc/domain.rs + src/dto.rs
Change API endpointssrc/api.rs
Change demo datasrc/demo_data.rs
Change UIstatic/index.html, static/app.js

Common Constraint Patterns

Unary constraint (examine one entity):

factory.for_each(|s| s.shifts.as_slice())
    .filter(|shift| /* condition */)
    .penalize(HardSoftDecimalScore::ONE_HARD)

Binary constraint with join:

factory.for_each(|s| s.shifts.as_slice())
    .join(|s| s.employees.as_slice(), equal_bi(...))
    .filter(|shift, emp| /* condition */)
    .penalize(...)

Unique pairs (same collection):

factory.for_each_unique_pair(
    |s| s.shifts.as_slice(),
    joiner::equal(|shift| shift.employee_idx),
)

Flatten collections:

.flatten_last(
    |emp| emp.dates.as_slice(),
    |date| *date,
    |shift| shift.date(),
)

Load balancing:

factory.for_each(|s| s.shifts.as_slice())
    .balance(|shift| shift.employee_idx)
    .penalize(HardSoftDecimalScore::of_soft(1))

Python → Rust Translation

PythonRust
@dataclass#[derive(...)] struct
@planning_entity decorator#[planning_entity] derive macro
PlanningId annotation#[planning_id] attribute
PlanningVariable annotation#[planning_variable] attribute
constraint_factory.for_each(Shift)factory.for_each(|s| s.shifts.as_slice())
Joiners.equal(lambda: ...)joiner::equal(|x| x.field)
lambda shift: shift.employee|shift: &Shift| shift.employee_idx
FastAPI serverAxum server
pip installcargo build

Debugging Tips

Enable verbose logging:

// In Cargo.toml
solverforge = { ..., features = ["verbose-logging"] }

Print in constraints (debug only):

.filter(|shift: &Shift, emp: &Employee| {
    eprintln!("Checking shift {} with {}", shift.id, emp.name);
    !emp.skills.contains(&shift.required_skill)
})

Use the analyze endpoint:

curl -X PUT http://localhost:7860/schedules/analyze \
  -H "Content-Type: application/json" \
  -d @schedule.json

Common Gotchas

  1. Forgot to call finalize() on employees after construction

    • Symptom: flatten_last constraints don’t match anything
  2. Index out of sync — employee indices don’t match array positions

    • Always use enumerate() when constructing employees
  3. Missing factory.clone() — factory is consumed by each constraint

    • Clone before each constraint chain
  4. Forgot to add constraint to return tuple

    • Constraint silently not evaluated
  5. Using String instead of usize for references

    • Performance degradation and allocation overhead

Additional Resources

2 - Employee Scheduling

A comprehensive quickstart guide to understanding and building intelligent employee scheduling with SolverForge

Legacy Implementation Guide

This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.

SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.


Table of Contents

  1. Introduction
  2. Getting Started
  3. The Problem We’re Solving
  4. Understanding the Data Model
  5. How Optimization Works
  6. Writing Constraints: The Business Rules
  7. The Solver Engine
  8. Web Interface and API
  9. Making Your First Customization
  10. Advanced Constraint Patterns
  11. Testing and Validation
  12. Quick Reference

Introduction

What You’ll Learn

This guide walks you through a complete employee scheduling application built with SolverForge, a constraint-based optimization framework. You’ll learn:

  • How to model real-world scheduling problems as optimization problems
  • How to express business rules as constraints that guide the solution
  • How optimization algorithms find high-quality solutions automatically
  • How to customize the system for your specific needs

No optimization background required — we’ll explain concepts as we encounter them in the code.

Architecture Note: This guide uses the “fast” implementation pattern with dataclass domain models and Pydantic only at API boundaries. For the architectural reasoning behind this design, see Dataclasses vs Pydantic in Constraint Solvers.

Prerequisites

  • Basic Python knowledge (classes, functions, type annotations)
  • Familiarity with REST APIs
  • Comfort with command-line operations

What is Constraint-Based Optimization?

Traditional programming: You write explicit logic that says “do this, then that.”

Constraint-based optimization: You describe what a good solution looks like and the solver figures out how to achieve it.

Think of it like describing what puzzle pieces you have and what rules they must follow — then having a computer try millions of arrangements per second to find the best fit.


Getting Started

Running the Application

  1. Download and navigate to the project directory:

    git clone https://github.com/SolverForge/solverforge-quickstarts
    cd ./solverforge-quickstarts/legacy/employee-scheduling-fast
    
  2. Create and activate virtual environment:

    python -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. Start the server:

    run-app
    
  5. Open your browser:

    http://localhost:8080
    

You’ll see a scheduling interface with employees, shifts and a “Solve” button. Click it and watch the solver automatically assign employees to shifts while respecting business rules.

File Structure Overview

legacy/employee-scheduling-fast/
├── src/employee_scheduling/
│   ├── domain.py              # Data classes (Employee, Shift, Schedule)
│   ├── constraints.py         # Business rules (90% of customization happens here)
│   ├── solver.py              # Solver configuration
│   ├── demo_data.py           # Sample data generation
│   ├── rest_api.py            # HTTP API endpoints
│   ├── converters.py          # REST ↔ Domain model conversion
│   ├── json_serialization.py  # JSON helpers
│   └── score_analysis.py      # Score breakdown DTOs
├── static/
│   ├── index.html             # Web UI
│   └── app.js                 # UI logic and visualization
└── tests/
    ├── test_constraints.py    # Unit tests for constraints
    └── test_feasible.py       # Integration tests

Key insight: Most business customization happens in constraints.py alone. You rarely need to modify other files.


The Problem We’re Solving

The Scheduling Challenge

You need to assign employees to shifts while satisfying rules like:

Hard constraints (must be satisfied):

  • Employee must have the required skill for the shift
  • Employee can’t work overlapping shifts
  • Employee needs 10 hours rest between shifts
  • Employee can’t work more than one shift per day
  • Employee can’t work on days they’re unavailable

Soft constraints (preferences to optimize):

  • Avoid scheduling on days the employee marked as “undesired”
  • Prefer scheduling on days the employee marked as “desired”
  • Balance workload fairly across all employees

Why This is Hard

For even 20 shifts and 10 employees, there are 10^20 possible assignments (100 quintillion). A human can’t evaluate them all. Even a computer trying random assignments would take years.

Optimization algorithms use smart strategies to explore this space efficiently, finding high-quality solutions in seconds.


Understanding the Data Model

Let’s examine the three core classes that model our problem. Open src/employee_scheduling/domain.py:

Domain Model Architecture

This quickstart separates domain models (dataclasses) from API models (Pydantic):

  • Domain layer (domain.py lines 17-39): Pure @dataclass models for solver operations
  • API layer (domain.py lines 46-75): Pydantic BaseModel classes for REST endpoints
  • Converters (converters.py): Translate between the two layers

*This separation provides better performance during solving—Pydantic’s validation overhead becomes expensive when constraints are evaluated millions of times per second. See the architecture article for benchmark comparisons. Note that while benchmarks on small problems show comparable iteration counts between Python and Java, the JPype bridge overhead may compound at larger scales.

The Employee Class

@dataclass
class Employee:
    name: Annotated[str, PlanningId]
    skills: set[str] = field(default_factory=set)
    unavailable_dates: set[date] = field(default_factory=set)
    undesired_dates: set[date] = field(default_factory=set)
    desired_dates: set[date] = field(default_factory=set)

What it represents: A person who can be assigned to shifts.

Key fields:

  • name: Unique identifier (the PlanningId annotation tells SolverForge this is the primary key)
  • skills: What skills this employee possesses (e.g., {"Doctor", "Cardiology"})
  • unavailable_dates: Days the employee absolutely cannot work (hard constraint)
  • undesired_dates: Days the employee prefers not to work (soft constraint)
  • desired_dates: Days the employee wants to work (soft constraint)

Optimization concept: These availability fields demonstrate hard vs soft constraints. Unavailable is non-negotiable; undesired is a preference the solver will try to honor but may violate if necessary.

The Shift Class (Planning Entity)

@planning_entity
@dataclass
class Shift:
    id: Annotated[str, PlanningId]
    start: datetime
    end: datetime
    location: str
    required_skill: str
    employee: Annotated[Employee | None, PlanningVariable] = None

What it represents: A time slot that needs an employee assignment.

Key fields:

  • id: Unique identifier
  • start/end: When the shift occurs
  • location: Where the work happens
  • required_skill: What skill is needed (must match employee’s skills)
  • employee: The assignment decision — this is what the solver optimizes!

Important annotations:

  • @planning_entity: Tells SolverForge this class contains decisions to make
  • PlanningVariable: Marks employee as the decision variable

Optimization concept: This is a planning variable — the value the solver assigns. Each shift starts with employee=None (unassigned). The solver tries different employee assignments, evaluating each according to your constraints.

The EmployeeSchedule Class (Planning Solution)

@planning_solution
@dataclass
class EmployeeSchedule:
    employees: Annotated[list[Employee], ProblemFactCollectionProperty, ValueRangeProvider]
    shifts: Annotated[list[Shift], PlanningEntityCollectionProperty]
    score: Annotated[HardSoftDecimalScore | None, PlanningScore] = None
    solver_status: SolverStatus = SolverStatus.NOT_SOLVING

What it represents: The complete problem and its solution.

Key fields:

  • employees: All available employees (these are the possible values for assignments)
  • shifts: All shifts that need assignment (the planning entities)
  • score: Solution quality metric (calculated by constraints)
  • solver_status: Whether solving is active

Annotations explained:

  • @planning_solution: Marks this as the top-level problem definition
  • ProblemFactCollectionProperty: Immutable data (doesn’t change during solving)
  • PlanningEntityCollectionProperty: The entities being optimized
  • ValueRangeProvider: Tells the solver which employees can be assigned to shifts
  • PlanningScore: Where the solver stores the calculated score

Optimization concept: This demonstrates the declarative modeling approach. You describe the problem structure (what can be assigned to what) and the solver handles the search process.


How Optimization Works

Before diving into constraints, let’s understand how the solver finds solutions.

The Search Process (Simplified)

  1. Start with an initial solution (often random or all unassigned)
  2. Evaluate the score using your constraint functions
  3. Make a small change (assign a different employee to one shift)
  4. Evaluate the new score
  5. Keep the change if it improves the score (with some controlled randomness)
  6. Repeat millions of times in seconds
  7. Return the best solution found

Why This Works: Metaheuristics

Timefold (the engine that powers SolverForge) uses sophisticated metaheuristic algorithms like:

  • Tabu Search: Remembers recent moves to avoid cycling
  • Simulated Annealing: Occasionally accepts worse solutions to escape local optima
  • Late Acceptance: Compares current solution to recent history, not just the immediate previous

These techniques efficiently explore the massive solution space without getting stuck.

The Score: How “Good” is a Solution?

Every solution gets a score with two parts:

0hard/-45soft
  • Hard score: Counts hard constraint violations (must be 0 for a valid solution)
  • Soft score: Counts soft constraint violations/rewards (higher is better)

Scoring rules:

  • Hard score must be 0 or positive (negative = invalid/infeasible)
  • Among valid solutions (hard score = 0), highest soft score wins
  • Hard score always takes priority over soft score

Optimization concept: This is multi-objective optimization with a lexicographic ordering. We absolutely prioritize hard constraints, then optimize soft ones.


Writing Constraints: The Business Rules

Now the heart of the system. Open src/employee_scheduling/constraints.py.

The Constraint Provider Pattern

All constraints are registered in one function:

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
    return [
        # Hard constraints
        required_skill(constraint_factory),
        no_overlapping_shifts(constraint_factory),
        at_least_10_hours_between_two_shifts(constraint_factory),
        one_shift_per_day(constraint_factory),
        unavailable_employee(constraint_factory),
        # Soft constraints
        undesired_day_for_employee(constraint_factory),
        desired_day_for_employee(constraint_factory),
        balance_employee_shift_assignments(constraint_factory),
    ]

Each constraint is a function returning a Constraint object. Let’s examine them from simple to complex.

Domain Model Methods for Constraints

The Shift class in domain.py includes helper methods that support datetime calculations used by multiple constraints. Following object-oriented design principles, these methods are part of the domain model rather than standalone functions:

def has_required_skill(self) -> bool:
    """Check if assigned employee has the required skill."""
    if self.employee is None:
        return False
    return self.required_skill in self.employee.skills

def is_overlapping_with_date(self, dt: date) -> bool:
    """Check if shift overlaps with a specific date."""
    return self.start.date() == dt or self.end.date() == dt

These methods encapsulate shift-related logic within the domain model, making constraints more readable and maintainable.

Implementation Note: For datetime overlap calculations in constraint penalty lambdas, the codebase uses inline calculations with explicit time(0, 0, 0) and time(23, 59, 59) rather than calling domain methods. This avoids transpilation issues with datetime.min.time() and datetime.max.time() in the constraint stream API.

Hard Constraint: Required Skill

Business rule: “An employee assigned to a shift must have the required skill.”

def required_skill(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Shift)
        .filter(lambda shift: not shift.has_required_skill())
        .penalize(HardSoftDecimalScore.ONE_HARD)
        .as_constraint("Missing required skill")
    )

How to read this:

  1. for_each(Shift): Consider every shift in the schedule
  2. .filter(...): Keep only shifts where the employee lacks the required skill
  3. .penalize(ONE_HARD): Each violation subtracts 1 from the hard score
  4. .as_constraint(...): Give it a name for debugging

Optimization concept: This is a unary constraint — it examines one entity at a time. The filter identifies violations and the penalty quantifies the impact.

Note: There’s no null check for shift.employee because constraints are only evaluated on complete assignments during the scoring phase.

Hard Constraint: No Overlapping Shifts

Business rule: “An employee can’t work two shifts that overlap in time.”

def no_overlapping_shifts(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each_unique_pair(
            Shift,
            Joiners.equal(lambda shift: shift.employee.name),
            Joiners.overlapping(lambda shift: shift.start, lambda shift: shift.end),
        )
        .penalize(HardSoftDecimalScore.ONE_HARD, get_minute_overlap)
        .as_constraint("Overlapping shift")
    )

How to read this:

  1. for_each_unique_pair(Shift, ...): Create pairs of shifts
  2. Joiners.equal(lambda shift: shift.employee.name): Only pair shifts assigned to the same employee
  3. Joiners.overlapping(...): Only pair shifts that overlap in time
  4. .penalize(ONE_HARD, get_minute_overlap): Penalize by the number of overlapping minutes

Optimization concept: This is a binary constraint — it examines pairs of entities. The for_each_unique_pair ensures we don’t count each violation twice (e.g., comparing shift A to B and B to A).

Helper function:

def get_minute_overlap(shift1: Shift, shift2: Shift) -> int:
    return (
        min(shift1.end, shift2.end) - max(shift1.start, shift2.start)
    ).total_seconds() // 60

Why penalize by minutes? This creates a graded penalty. A 5-minute overlap is less bad than a 5-hour overlap, giving the solver better guidance.

Hard Constraint: Rest Between Shifts

Business rule: “Employees need at least 10 hours rest between shifts.”

def at_least_10_hours_between_two_shifts(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Shift)
        .join(
            Shift,
            Joiners.equal(lambda shift: shift.employee.name),
            Joiners.less_than_or_equal(
                lambda shift: shift.end, lambda shift: shift.start
            ),
        )
        .filter(
            lambda first_shift, second_shift: (
                second_shift.start - first_shift.end
            ).total_seconds() // (60 * 60) < 10
        )
        .penalize(
            HardSoftDecimalScore.ONE_HARD,
            lambda first_shift, second_shift: 600 - (
                (second_shift.start - first_shift.end).total_seconds() // 60
            ),
        )
        .as_constraint("At least 10 hours between 2 shifts")
    )

How to read this:

  1. for_each(Shift): Start with all shifts
  2. .join(Shift, ...): Pair with other shifts
  3. Joiners.equal(...): Same employee
  4. Joiners.less_than_or_equal(...): First shift ends before or when second starts (ensures ordering)
  5. .filter(...): Keep only pairs with less than 10 hours gap
  6. .penalize(...): Penalize by 600 - actual_minutes (the deficit from required 10 hours)

Optimization concept: The penalty function 600 - actual_minutes creates incremental guidance. 9 hours rest (penalty 60) is better than 5 hours rest (penalty 300), helping the solver navigate toward feasibility.

Hard Constraint: One Shift Per Day

Business rule: “Employees can work at most one shift per calendar day.”

def one_shift_per_day(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each_unique_pair(
            Shift,
            Joiners.equal(lambda shift: shift.employee.name),
            Joiners.equal(lambda shift: shift.start.date()),
        )
        .penalize(HardSoftDecimalScore.ONE_HARD)
        .as_constraint("Max one shift per day")
    )

How to read this:

  1. for_each_unique_pair(Shift, ...): Create pairs of shifts
  2. First joiner: Same employee
  3. Second joiner: Same date (shift.start.date() extracts calendar day)
  4. Each pair found is a violation

Hard Constraint: Unavailable Dates

Business rule: “Employees cannot work on days they marked as unavailable.”

from datetime import time

def unavailable_employee(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Shift)
        .join(
            Employee,
            Joiners.equal(lambda shift: shift.employee, lambda employee: employee),
        )
        .flatten_last(lambda employee: employee.unavailable_dates)
        .filter(lambda shift, unavailable_date: is_overlapping_with_date(shift, unavailable_date))
        .penalize(
            HardSoftDecimalScore.ONE_HARD,
            lambda shift, unavailable_date: int((
                min(shift.end, datetime.combine(unavailable_date, time(23, 59, 59)))
                - max(shift.start, datetime.combine(unavailable_date, time(0, 0, 0)))
            ).total_seconds() / 60),
        )
        .as_constraint("Unavailable employee")
    )

How to read this:

  1. for_each(Shift): All shifts
  2. .join(Employee, ...): Join with the assigned employee
  3. .flatten_last(lambda employee: employee.unavailable_dates): Expand each employee’s unavailable_dates set
  4. .filter(...): Keep only when shift overlaps the unavailable date
  5. .penalize(...): Penalize by overlapping duration in minutes (calculated inline)

Optimization concept: The flatten_last operation demonstrates constraint streaming with collections. We iterate over each date in the employee’s unavailable set, creating (shift, date) pairs to check.

Why inline calculation? The penalty lambda uses explicit time(0, 0, 0) and time(23, 59, 59) rather than datetime.min.time() or calling domain methods. This is required because certain datetime methods don’t transpile correctly to the constraint stream engine.

Soft Constraint: Undesired Days

Business rule: “Prefer not to schedule employees on days they marked as undesired.”

def undesired_day_for_employee(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Shift)
        .join(
            Employee,
            Joiners.equal(lambda shift: shift.employee, lambda employee: employee),
        )
        .flatten_last(lambda employee: employee.undesired_dates)
        .filter(lambda shift, undesired_date: shift.is_overlapping_with_date(undesired_date))
        .penalize(
            HardSoftDecimalScore.ONE_SOFT,
            lambda shift, undesired_date: int((
                min(shift.end, datetime.combine(undesired_date, time(23, 59, 59)))
                - max(shift.start, datetime.combine(undesired_date, time(0, 0, 0)))
            ).total_seconds() / 60),
        )
        .as_constraint("Undesired day for employee")
    )

Key difference from hard constraints: Uses ONE_SOFT instead of ONE_HARD.

Optimization concept: The solver will try to avoid undesired days but may violate this if necessary to satisfy hard constraints or achieve better overall soft score.

Soft Constraint: Desired Days (Reward)

Business rule: “Prefer to schedule employees on days they marked as desired.”

def desired_day_for_employee(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Shift)
        .join(
            Employee,
            Joiners.equal(lambda shift: shift.employee, lambda employee: employee),
        )
        .flatten_last(lambda employee: employee.desired_dates)
        .filter(lambda shift, desired_date: shift.is_overlapping_with_date(desired_date))
        .reward(
            HardSoftDecimalScore.ONE_SOFT,
            lambda shift, desired_date: int((
                min(shift.end, datetime.combine(desired_date, time(23, 59, 59)))
                - max(shift.start, datetime.combine(desired_date, time(0, 0, 0)))
            ).total_seconds() / 60),
        )
        .as_constraint("Desired day for employee")
    )

Key difference: Uses .reward() instead of .penalize().

Optimization concept: Rewards increase the score instead of decreasing it. This constraint actively pulls the solution toward desired assignments.

Soft Constraint: Load Balancing

Business rule: “Distribute shifts fairly across employees.”

def balance_employee_shift_assignments(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Shift)
        .group_by(lambda shift: shift.employee, ConstraintCollectors.count())
        .complement(Employee, lambda e: 0)
        .group_by(
            ConstraintCollectors.load_balance(
                lambda employee, shift_count: employee,
                lambda employee, shift_count: shift_count,
            )
        )
        .penalize_decimal(
            HardSoftDecimalScore.ONE_SOFT,
            lambda load_balance: load_balance.unfairness(),
        )
        .as_constraint("Balance employee shift assignments")
    )

How to read this:

  1. for_each(Shift): All shifts
  2. .group_by(..., ConstraintCollectors.count()): Count shifts per employee
  3. .complement(Employee, lambda e: 0): Include employees with 0 shifts
  4. .group_by(ConstraintCollectors.load_balance(...)): Calculate fairness metric
  5. .penalize_decimal(..., unfairness()): Penalize by the unfairness amount

Optimization concept: This uses a sophisticated load balancing collector that calculates variance/unfairness in workload distribution. It’s more nuanced than simple quadratic penalties — it measures how far the distribution is from perfectly balanced.

Why complement? Without it, employees with zero shifts wouldn’t appear in the grouping, skewing the fairness calculation.


The Solver Engine

Now let’s see how the solver is configured. Open src/employee_scheduling/solver.py:

solver_config = SolverConfig(
    solution_class=EmployeeSchedule,
    entity_class_list=[Shift],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)

solver_manager = SolverManager.create(SolverFactory.create(solver_config))
solution_manager = SolutionManager.create(solver_manager)

Configuration Breakdown

solution_class: Your planning solution class (EmployeeSchedule)

entity_class_list: Planning entities to optimize ([Shift])

score_director_factory_config: Contains the constraint provider function

  • Note: This is nested inside ScoreDirectorFactoryConfig, not directly in SolverConfig

termination_config: When to stop solving

  • spent_limit=Duration(seconds=30): Stop after 30 seconds

SolverManager: Asynchronous Solving

SolverManager handles solving in the background without blocking your API:

# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, schedule, callback_function)

# Check status
status = solver_manager.get_status(job_id)

# Get current best solution
solution = solver_manager.get_solution(job_id)

# Stop early
solver_manager.terminate_early(job_id)

Optimization concept: Real-world problems may take minutes to hours. Anytime algorithms like metaheuristics continuously improve solutions over time, so you can stop whenever you’re satisfied with the quality.

Solving Timeline

Small problems (10-20 shifts, 5-10 employees):

  • Initial valid solution: < 1 second
  • Good solution: 5-10 seconds
  • High-quality: 30 seconds

Medium problems (50-100 shifts, 20-30 employees):

  • Initial valid solution: 1-5 seconds
  • Good solution: 30-60 seconds
  • High-quality: 5-10 minutes

Factors affecting speed:

  • Number of employees × shifts (search space size)
  • Constraint complexity
  • How “tight” constraints are (fewer valid solutions = harder)

Web Interface and API

REST API Endpoints

Open src/employee_scheduling/rest_api.py to see the API:

GET /demo-data

Returns available demo datasets:

["SMALL", "LARGE"]

GET /demo-data/{dataset_id}

Generates and returns sample data:

{
  "employees": [
    {
      "name": "Amy Cole",
      "skills": ["Doctor", "Cardiology"],
      "unavailableDates": ["2025-11-25"],
      "undesiredDates": ["2025-11-26"],
      "desiredDates": ["2025-11-27"]
    }
  ],
  "shifts": [
    {
      "id": "0",
      "start": "2025-11-25T06:00:00",
      "end": "2025-11-25T14:00:00",
      "location": "Ambulatory care",
      "requiredSkill": "Doctor",
      "employee": null
    }
  ]
}

Note: Field names use camelCase in JSON (REST API convention) but snake_case in Python (domain model). The converters.py handles this translation.

POST /schedules

Submit a schedule to solve:

Request body: Same format as demo-data response

Response: Job ID as plain text

"a1b2c3d4-e5f6-7890-abcd-ef1234567890"

Implementation:

@app.post("/schedules")
async def solve_timetable(schedule_model: EmployeeScheduleModel) -> str:
    job_id = str(uuid4())
    schedule = model_to_schedule(schedule_model)
    data_sets[job_id] = schedule
    solver_manager.solve_and_listen(
        job_id, 
        schedule,
        lambda solution: update_schedule(job_id, solution)
    )
    return job_id

Key detail: Uses solve_and_listen() with a callback that updates the stored solution in real-time as solving progresses.

GET /schedules/{problem_id}

Check solving status and get current solution:

Response (while solving):

{
  "employees": [...],
  "shifts": [...],
  "score": "0hard/-45soft",
  "solverStatus": "SOLVING_ACTIVE"
}

Response (finished):

{
  "employees": [...],
  "shifts": [...],
  "score": "0hard/-12soft",
  "solverStatus": "NOT_SOLVING"
}

GET /schedules

List all active job IDs:

Response:

["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "b2c3d4e5-f6a7-8901-bcde-f23456789012"]

GET /schedules/{problem_id}/status

Lightweight status check (score and solver status only):

Response:

{
  "score": {
    "hardScore": 0,
    "softScore": -12
  },
  "solverStatus": "SOLVING_ACTIVE"
}

DELETE /schedules/{problem_id}

Stop solving early and return best solution found so far:

@app.delete("/schedules/{problem_id}")
async def stop_solving(problem_id: str) -> EmployeeScheduleModel:
    solver_manager.terminate_early(problem_id)
    return await get_timetable(problem_id)

PUT /schedules/analyze

Analyze a schedule without solving to understand constraint violations:

Request body: Same format as demo-data response

Response:

{
  "constraints": [
    {
      "name": "Required skill",
      "weight": "1hard",
      "score": "-2hard",
      "matches": [
        {
          "name": "Required skill",
          "score": "-1hard",
          "justification": "Shift(id=5) assigned to Employee(Amy Cole)"
        }
      ]
    }
  ]
}

Web UI Flow

The static/app.js implements this polling workflow:

  1. User opens page → Load demo data (GET /demo-data/SMALL)
  2. Display employees and shifts in timeline visualization
  3. User clicks “Solve”POST /schedules (get job ID back)
  4. Poll GET /schedules/{id} every 2 seconds
  5. Update UI with latest assignments in real-time
  6. When solverStatus === "NOT_SOLVING" → Stop polling
  7. Display final score and solution

Visual feedback: The UI uses vis-timeline library to show:

  • Shifts color-coded by availability (red=unavailable, orange=undesired, green=desired, blue=normal)
  • Skills color-coded (red=missing skill, green=has skill)
  • Two views: by employee and by location

Making Your First Customization

The quickstart includes an optional cardinality constraint that demonstrates a common pattern. Let’s understand how it works and then learn how to create similar constraints.

Understanding the Max Shifts Constraint

The codebase includes max_shifts_per_employee which limits workload imbalance. This constraint is disabled by default (commented out in define_constraints()) but serves as a useful example:

Business rule: “No employee can work more than 12 shifts in the schedule period.”

This is a hard constraint (must be satisfied when enabled).

The Constraint Implementation

This constraint is already in src/employee_scheduling/constraints.py:

def max_shifts_per_employee(constraint_factory: ConstraintFactory):
    """
    Hard constraint: No employee can have more than 12 shifts.

    The limit of 12 is chosen based on the demo data dimensions:
    - SMALL dataset: 139 shifts / 15 employees = ~9.3 average
    - This provides headroom while preventing extreme imbalance

    Note: A limit that's too low (e.g., 5) would make the problem infeasible.
    Always ensure your constraints are compatible with your data dimensions.
    """
    return (
        constraint_factory.for_each(Shift)
        .group_by(lambda shift: shift.employee, ConstraintCollectors.count())
        .filter(lambda employee, shift_count: shift_count > 12)
        .penalize(
            HardSoftDecimalScore.ONE_HARD,
            lambda employee, shift_count: shift_count - 12,
        )
        .as_constraint("Max 12 shifts per employee")
    )

How this works:

  1. Group shifts by employee and count them
  2. Filter to employees with more than 12 shifts
  3. Penalize by the excess amount (13 shifts = penalty 1, 14 shifts = penalty 2, etc.)

Why 12? The demo data has 139 shifts and 15 employees (~9.3 shifts per employee on average). A limit that’s too low (e.g., 5) would make the problem infeasible — there simply aren’t enough employees to cover all shifts. Always ensure your constraints are compatible with your problem’s dimensions.

How It’s Registered

To enable this constraint, uncomment it in define_constraints():

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
    return [
        # Hard constraints
        required_skill(constraint_factory),
        no_overlapping_shifts(constraint_factory),
        at_least_10_hours_between_two_shifts(constraint_factory),
        one_shift_per_day(constraint_factory),
        unavailable_employee(constraint_factory),
        # max_shifts_per_employee(constraint_factory),  # ← Uncomment to enable
        # Soft constraints
        undesired_day_for_employee(constraint_factory),
        desired_day_for_employee(constraint_factory),
        balance_employee_shift_assignments(constraint_factory),
    ]

Experimenting With It

Try enabling and modifying the constraint to see its effect:

  1. Uncomment max_shifts_per_employee(constraint_factory), in constraints.py
  2. Change the limit from 12 to 8 if desired
  3. Restart the server: python -m employee_scheduling.rest_api
  4. Load demo data and click “Solve”
  5. Observe how the constraint affects the solution

Note: A very low limit (e.g., 5) will make the problem infeasible.

Why Unit Testing Constraints Matters

The quickstart includes unit tests in tests/test_constraints.py using ConstraintVerifier. Run them with:

pytest tests/test_constraints.py -v

Testing catches critical issues early. When we initially implemented this constraint with a limit of 5, the feasibility test (test_feasible.py) failed — the solver couldn’t find a valid solution because there weren’t enough employees to cover all shifts within that limit. Without tests, this would have silently broken the scheduling system. Always test new constraints — a typo in a filter or an overly restrictive limit can make your problem unsolvable.

Understanding What You Did

You just implemented a cardinality constraint — limiting the count of something. This pattern is extremely common in scheduling:

  • Maximum hours per week
  • Minimum shifts per employee
  • Exact number of nurses per shift

The pattern is always:

  1. Group by what you’re counting
  2. Collect the count
  3. Filter by your limit
  4. Penalize/reward appropriately

Advanced Constraint Patterns

Pattern 1: Weighted Penalties

Scenario: Some skills are harder to staff — penalize their absence more heavily.

def preferred_skill_coverage(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Prefer specialized skills when available.
    """
    SPECIALTY_SKILLS = {"Cardiology", "Anaesthetics", "Radiology"}
    
    return (
        constraint_factory.for_each(Shift)
        .filter(lambda shift: shift.required_skill in SPECIALTY_SKILLS)
        .filter(lambda shift: shift.required_skill in shift.employee.skills)
        .reward(
            HardSoftDecimalScore.of_soft(10),  # 10x normal reward
        )
        .as_constraint("Preferred specialty coverage")
    )

Optimization concept: Weighted constraints let you express relative importance. This rewards specialty matches 10 times more than standard matches.

Pattern 2: Conditional Constraints

Scenario: Night shifts (after 6 PM) require two employees at the same location.

def night_shift_minimum_staff(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Night shifts need at least 2 employees per location.
    """
    def is_night_shift(shift: Shift) -> bool:
        return shift.start.hour >= 18  # 6 PM or later
    
    return (
        constraint_factory.for_each(Shift)
        .filter(is_night_shift)
        .group_by(
            lambda shift: (shift.start, shift.location),
            ConstraintCollectors.count()
        )
        .filter(lambda timeslot_location, count: count < 2)
        .penalize(
            HardSoftDecimalScore.ONE_HARD,
            lambda timeslot_location, count: 2 - count
        )
        .as_constraint("Night shift minimum 2 staff")
    )

Pattern 3: Employee Pairing (Incompatibility)

Scenario: Certain employees shouldn’t work the same shift.

First, add the field to domain.py:

@dataclass
class Employee:
    name: Annotated[str, PlanningId]
    skills: set[str] = field(default_factory=set)
    # ... existing fields ...
    incompatible_with: set[str] = field(default_factory=set)  # employee names

Then the constraint:

def avoid_incompatible_pairs(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Incompatible employees can't work overlapping shifts.
    """
    return (
        constraint_factory.for_each(Shift)
        .join(
            Shift,
            Joiners.equal(lambda shift: shift.location),
            Joiners.overlapping(lambda shift: shift.start, lambda shift: shift.end),
        )
        .filter(
            lambda shift1, shift2: 
                shift2.employee.name in shift1.employee.incompatible_with
        )
        .penalize(HardSoftDecimalScore.ONE_HARD)
        .as_constraint("Avoid incompatible pairs")
    )

Pattern 4: Time-Based Accumulation

Scenario: Limit total hours worked per week.

def max_hours_per_week(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Maximum 40 hours per employee per week.
    """
    def get_shift_hours(shift: Shift) -> float:
        return (shift.end - shift.start).total_seconds() / 3600
    
    def get_week(shift: Shift) -> int:
        return shift.start.isocalendar()[1]  # ISO week number
    
    return (
        constraint_factory.for_each(Shift)
        .group_by(
            lambda shift: (shift.employee, get_week(shift)),
            ConstraintCollectors.sum(get_shift_hours)
        )
        .filter(lambda employee_week, total_hours: total_hours > 40)
        .penalize(
            HardSoftDecimalScore.ONE_HARD,
            lambda employee_week, total_hours: int(total_hours - 40)
        )
        .as_constraint("Max 40 hours per week")
    )

Optimization concept: This uses temporal aggregation — grouping by time periods (weeks) and summing durations. Common in workforce scheduling.


Testing and Validation

Unit Testing Constraints

Best practice: Test each constraint in isolation.

Create tests/test_my_constraints.py:

from datetime import datetime, date
from employee_scheduling.domain import Employee, Shift, EmployeeSchedule
from employee_scheduling.solver import solver_config
from solverforge_legacy.solver import SolverFactory

def test_max_shifts_constraint_violation():
    """Test that exceeding 5 shifts creates a hard constraint violation."""
    
    employee = Employee(
        name="Test Employee",
        skills={"Doctor"}
    )
    
    # Create 6 shifts assigned to same employee
    shifts = []
    for i in range(6):
        shifts.append(Shift(
            id=str(i),
            start=datetime(2025, 11, 25 + i, 9, 0),
            end=datetime(2025, 11, 25 + i, 17, 0),
            location="Test Location",
            required_skill="Doctor",
            employee=employee
        ))
    
    schedule = EmployeeSchedule(
        employees=[employee],
        shifts=shifts
    )
    
    # Score the solution
    solver_factory = SolverFactory.create(solver_config)
    score_director = solver_factory.get_score_director_factory().build_score_director()
    score_director.set_working_solution(schedule)
    score = score_director.calculate_score()
    
    # Verify hard constraint violation
    assert score.hard_score == -1, f"Expected -1 hard score, got {score.hard_score}"
    
def test_max_shifts_constraint_satisfied():
    """Test that 5 or fewer shifts doesn't violate constraint."""
    
    employee = Employee(
        name="Test Employee",
        skills={"Doctor"}
    )
    
    # Create only 5 shifts
    shifts = []
    for i in range(5):
        shifts.append(Shift(
            id=str(i),
            start=datetime(2025, 11, 25 + i, 9, 0),
            end=datetime(2025, 11, 25 + i, 17, 0),
            location="Test Location",
            required_skill="Doctor",
            employee=employee
        ))
    
    schedule = EmployeeSchedule(
        employees=[employee],
        shifts=shifts
    )
    
    solver_factory = SolverFactory.create(solver_config)
    score_director = solver_factory.get_score_director_factory().build_score_director()
    score_director.set_working_solution(schedule)
    score = score_director.calculate_score()
    
    # No violation from this constraint (may have soft penalties from balancing, etc.)
    assert score.hard_score >= -0, f"Expected non-negative hard score, got {score.hard_score}"

Run with:

pytest tests/test_my_constraints.py -v

Integration Testing: Full Solve

Test the complete solving cycle in tests/test_feasible.py:

import time
from employee_scheduling.demo_data import DemoData, generate_demo_data
from employee_scheduling.solver import solver_manager
from solverforge_legacy.solver import SolverStatus

def test_solve_small_dataset():
    """Test that solver finds a feasible solution for small dataset."""
    
    # Generate problem
    schedule = generate_demo_data(DemoData.SMALL)
    
    # Verify initially unassigned
    assert all(shift.employee is None for shift in schedule.shifts)
    
    # Solve
    job_id = "test-job"
    solver_manager.solve(job_id, schedule)
    
    # Wait for completion (with timeout)
    timeout_seconds = 60
    start_time = time.time()
    while solver_manager.get_solver_status(job_id) != SolverStatus.NOT_SOLVING:
        if time.time() - start_time > timeout_seconds:
            solver_manager.terminate_early(job_id)
            break
        time.sleep(1)
    
    # Get solution
    solution = solver_manager.get_solution(job_id)
    
    # Verify all shifts assigned
    assert all(shift.employee is not None for shift in solution.shifts), \
        "Not all shifts were assigned"
    
    # Verify feasible (hard score = 0)
    assert solution.score.hard_score == 0, \
        f"Solution is infeasible with hard score {solution.score.hard_score}"
    
    print(f"Final score: {solution.score}")

Manual Testing via UI

  1. Start the application: python -m employee_scheduling.rest_api
  2. Open browser console (F12) to see API calls
  3. Load “SMALL” demo data
  4. Verify data displays correctly (employees with skills, shifts unassigned)
  5. Click “Solve” and watch:
    • Score improving in real-time
    • Shifts getting assigned (colored by availability)
    • Final hard score reaches 0
  6. Manually verify constraint satisfaction:
    • Check that assigned employees have required skills (green badges)
    • Verify no overlapping shifts (timeline shouldn’t show overlaps)
    • Confirm unavailable days are respected (no shifts on red-highlighted dates)

Quick Reference

File Locations

Need to…Edit this file
Add/change business rulesrc/employee_scheduling/constraints.py
Add field to Employeesrc/employee_scheduling/domain.py + converters.py
Add field to Shiftsrc/employee_scheduling/domain.py + converters.py
Change solve timesrc/employee_scheduling/solver.py
Add REST endpointsrc/employee_scheduling/rest_api.py
Change demo datasrc/employee_scheduling/demo_data.py
Change UIstatic/index.html, static/app.js

Common Constraint Patterns

Unary constraint (examine one entity):

constraint_factory.for_each(Shift)
    .filter(lambda shift: # condition)
    .penalize(HardSoftDecimalScore.ONE_HARD)

Binary constraint (examine pairs):

constraint_factory.for_each_unique_pair(
    Shift,
    Joiners.equal(lambda shift: shift.employee.name)
)
    .penalize(HardSoftDecimalScore.ONE_HARD)

Grouping and counting:

constraint_factory.for_each(Shift)
    .group_by(
        lambda shift: shift.employee,
        ConstraintCollectors.count()
    )
    .filter(lambda employee, count: count > MAX)
    .penalize(...)

Reward instead of penalize:

.reward(HardSoftDecimalScore.ONE_SOFT)

Variable penalty:

.penalize(
    HardSoftDecimalScore.ONE_HARD,
    lambda shift: calculate_penalty_amount(shift)
)

Working with collections (flatten):

constraint_factory.for_each(Shift)
    .join(Employee, Joiners.equal(...))
    .flatten_last(lambda employee: employee.unavailable_dates)
    .filter(...)

Common Joiners

JoinerPurpose
Joiners.equal(lambda x: x.field)Match entities with same field value
Joiners.less_than(lambda x: x.field)First entity’s field < second’s (ensures ordering)
Joiners.overlapping(start, end)Time intervals overlap

Debugging Tips

Enable verbose logging:

import logging
logging.basicConfig(level=logging.DEBUG)

Test constraint in isolation:

# Create minimal test case with just the constraint you're debugging
schedule = EmployeeSchedule(
    employees=[test_employee],
    shifts=[test_shift]
)

solver_factory = SolverFactory.create(solver_config)
score_director = solver_factory.get_score_director_factory().build_score_director()
score_director.set_working_solution(schedule)
score = score_director.calculate_score()

print(f"Score: {score}")

Check constraint matches: Add print statements (remove in production):

.filter(lambda shift: (
    print(f"Checking shift {shift.id}") or  # Debug print
    shift.required_skill not in shift.employee.skills
))

Common Gotchas

  1. Forgot to register constraint in define_constraints() return list

    • Symptom: Constraint not enforced
  2. Using wrong Joiner

    • Joiners.equal when you need Joiners.less_than
    • Symptom: Pairs counted twice or constraint not working
  3. Expensive operations in constraint functions

    • Database/API calls in filters
    • Symptom: Solving extremely slow
  4. Score sign confusion

    • Higher soft score is better (not worse!)
    • Hard score must be ≥ 0 for feasible solution
  5. Field name mismatch

    • Guide said skill_set, actual is skills
    • Guide said employee_list, actual is employees

Additional Resources

3 - Portfolio Optimization

A comprehensive quickstart guide to understanding and building intelligent stock portfolio optimization with SolverForge

Legacy Implementation Guide

This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.

SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.


Table of Contents

  1. Introduction
  2. Getting Started
  3. The Problem We’re Solving
  4. Understanding the Data Model
  5. How Optimization Works
  6. Writing Constraints: The Business Rules
  7. The Solver Engine
  8. Web Interface and API
  9. Making Your First Customization
  10. Advanced Constraint Patterns
  11. Testing and Validation
  12. Quick Reference

Introduction

What You’ll Learn

This guide walks you through a complete stock portfolio optimization application built with SolverForge, a constraint-based optimization framework. You’ll learn:

  • How to model investment decisions as optimization problems
  • How to express diversification rules as constraints that guide the solution
  • How optimization algorithms find high-quality portfolios automatically
  • How to customize the system for your specific investment strategies

No optimization or finance background required — we’ll explain both optimization and finance concepts as we encounter them in the code.

Architecture Note: This guide uses the “fast” implementation pattern with dataclass domain models and Pydantic only at API boundaries. For the architectural reasoning behind this design, see Dataclasses vs Pydantic in Constraint Solvers.

Prerequisites

  • Basic Python knowledge (classes, functions, type annotations)
  • Familiarity with REST APIs
  • Comfort with command-line operations

What is Portfolio Optimization?

Traditional portfolio selection: You write explicit rules like “pick the 20 stocks with highest predicted returns.”

Constraint-based portfolio optimization: You describe what a good portfolio looks like (diversified, high-return, exactly 20 stocks) and the solver figures out which specific stocks to select.

Think of it like describing the ideal portfolio characteristics and having a computer try millions of combinations per second to find the best fit.

Finance Concepts (Quick Primer)

TermDefinitionExample
PortfolioCollection of investments you own20 stocks
WeightPercentage of money in each investment5% per stock (equal weight)
SectorIndustry categoryTechnology, Healthcare, Finance, Energy
Predicted ReturnExpected profit/loss percentage12% means $12 profit per $100 invested
DiversificationSpreading risk across sectorsDon’t put all eggs in one basket

Getting Started

Running the Application

  1. Download and navigate to the project directory:

    git clone https://github.com/SolverForge/solverforge-quickstarts
    cd ./solverforge-quickstarts/legacy/portfolio-optimization-fast
    
  2. Create and activate virtual environment:

    python -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. Start the server:

    run-app
    
  5. Open your browser:

    http://localhost:8080
    

You’ll see a portfolio optimization interface with stocks, sectors, and a “Solve” button. Click it and watch the solver automatically select the optimal stocks while respecting diversification rules.

File Structure Overview

fast/portfolio-optimization-fast/
├── src/portfolio_optimization/
│   ├── domain.py              # Data classes (StockSelection, PortfolioOptimizationPlan)
│   ├── constraints.py         # Business rules (90% of customization happens here)
│   ├── solver.py              # Solver configuration
│   ├── demo_data.py           # Sample stock data with ML predictions
│   ├── rest_api.py            # HTTP API endpoints
│   ├── converters.py          # REST ↔ Domain model conversion
│   ├── json_serialization.py  # JSON helpers
│   └── score_analysis.py      # Score breakdown DTOs
├── static/
│   ├── index.html             # Web UI
│   └── app.js                 # UI logic and visualization
├── scripts/
│   └── comparison.py          # Greedy vs Solver comparison
└── tests/
    ├── test_constraints.py    # Unit tests for constraints
    └── test_feasible.py       # Integration tests

Key insight: Most business customization happens in constraints.py alone. You rarely need to modify other files.


The Problem We’re Solving

The Investment Challenge

You have $100,000 to invest and must select 20 stocks from a pool of candidates. Each stock has an ML-predicted return (e.g., “Apple is expected to return 12%”).

Hard constraints (must be satisfied):

  • Select exactly 20 stocks
  • No sector can exceed 25% of the portfolio (max 5 stocks per sector)

Soft constraints (preferences to optimize):

  • Maximize total expected return (pick stocks with highest predictions)

Why Use a Constraint Solver?

For this simplified quickstart (Boolean selection with sector limits), a well-implemented greedy algorithm can find near-optimal solutions. So why use a constraint solver?

1. Declarative vs Imperative: With constraints, you describe what you want, not how to achieve it. Adding a new rule is one function, not a rewrite of your algorithm.

2. Constraint Interactions: As constraints multiply, greedy logic becomes brittle. Consider adding:

  • Minimum 2 stocks per sector (diversification floor)
  • No more than 3 correlated stocks together
  • ESG score requirements

Each new constraint in greedy code means more if/else branches and edge cases. In a constraint solver, you just add another constraint function.

3. Real-World Complexity: Production portfolios have weight optimization (not just in/out), correlation matrices, risk budgets, and regulatory requirements. These create solution spaces where greedy approaches fail.


Understanding the Data Model

Let’s examine the core classes that model our problem. Open src/portfolio_optimization/domain.py:

Domain Model Architecture

This quickstart separates domain models (dataclasses) from API models (Pydantic):

  • Domain layer (domain.py lines 32-169): Pure @dataclass models for solver operations
  • API layer (domain.py lines 268-307): Pydantic BaseModel classes for REST endpoints
  • Converters (converters.py): Translate between the two layers

The StockSelection Class (Planning Entity)

@planning_entity
@dataclass
class StockSelection:
    stock_id: Annotated[str, PlanningId]       # "AAPL", "GOOGL", etc.
    stock_name: str                             # "Apple Inc."
    sector: str                                 # "Technology"
    predicted_return: float                     # 0.12 = 12%
    selection: Annotated[SelectionValue | None, PlanningVariable] = None

What it represents: A stock that could be included in the portfolio.

Key fields:

  • stock_id: Unique identifier (ticker symbol)
  • stock_name: Human-readable company name
  • sector: Industry classification for diversification
  • predicted_return: ML model’s expected return (decimal: 0.12 = 12%)
  • selection: The decision — should this stock be in the portfolio?

Important annotations:

  • @planning_entity: Tells SolverForge this class contains decisions to make
  • PlanningVariable: Marks selection as the decision variable

Optimization concept: This is a planning variable — the value the solver assigns. Each stock starts with selection=None (undecided). The solver tries SELECTED vs NOT_SELECTED for each stock, evaluating according to your constraints.

The SelectionValue Pattern

Unlike employee scheduling where the planning variable is a reference to another entity, portfolio optimization uses a Boolean selection pattern:

@dataclass
class SelectionValue:
    """Wrapper for True/False selection state."""
    value: bool

SELECTED = SelectionValue(True)
NOT_SELECTED = SelectionValue(False)

Why a wrapper? SolverForge needs reference types for value ranges. We wrap the boolean in a dataclass so the solver can work with it.

Convenience property:

@property
def selected(self) -> bool | None:
    """Check if stock is selected."""
    if self.selection is None:
        return None
    return self.selection.value

The PortfolioConfig Class (Problem Fact)

@dataclass
class PortfolioConfig:
    target_count: int = 20           # How many stocks to select
    max_per_sector: int = 5          # Max stocks in any sector
    unselected_penalty: int = 10000  # Soft penalty per unselected stock

What it represents: Configurable parameters that constraints read.

Optimization concept: This is a problem fact — immutable data that doesn’t change during solving but influences constraint behavior. Making these configurable allows users to adjust the optimization without modifying constraint code.

The PortfolioOptimizationPlan Class (Planning Solution)

@planning_solution
@dataclass
class PortfolioOptimizationPlan:
    stocks: Annotated[list[StockSelection], PlanningEntityCollectionProperty, ValueRangeProvider]
    target_position_count: int = 20
    max_sector_percentage: float = 0.25
    portfolio_config: Annotated[PortfolioConfig, ProblemFactProperty]
    selection_range: Annotated[list[SelectionValue], ValueRangeProvider(id="selection_range")]
    score: Annotated[HardSoftScore | None, PlanningScore] = None
    solver_status: SolverStatus = SolverStatus.NOT_SOLVING

What it represents: The complete problem and its solution.

Key fields:

  • stocks: All candidate stocks (planning entities)
  • portfolio_config: Configuration parameters (problem fact)
  • selection_range: [SELECTED, NOT_SELECTED] — possible values for each stock
  • score: Solution quality metric (calculated by constraints)
  • solver_status: Whether solving is active

Annotations explained:

  • @planning_solution: Marks this as the top-level problem definition
  • PlanningEntityCollectionProperty: The entities being optimized
  • ValueRangeProvider: Tells solver what values can be assigned
  • ProblemFactProperty: Immutable configuration data
  • PlanningScore: Where the solver stores the calculated score

Helper Methods for Business Metrics

The PortfolioOptimizationPlan class includes useful analytics:

def get_selected_stocks(self) -> list[StockSelection]:
    """Return only stocks that are selected."""
    return [s for s in self.stocks if s.selected is True]

def get_expected_return(self) -> float:
    """Calculate total expected portfolio return."""
    weight = self.get_weight_per_stock()
    return sum(s.predicted_return * weight for s in self.get_selected_stocks())

def get_sector_weights(self) -> dict[str, float]:
    """Calculate weight per sector."""
    weight = self.get_weight_per_stock()
    sector_weights = {}
    for stock in self.get_selected_stocks():
        sector_weights[stock.sector] = sector_weights.get(stock.sector, 0) + weight
    return sector_weights

How Optimization Works

Before diving into constraints, let’s understand how the solver finds solutions.

The Search Process (Simplified)

  1. Start with an initial solution (often random selections)
  2. Evaluate the score using your constraint functions
  3. Make a small change (toggle one stock’s selection)
  4. Evaluate the new score
  5. Keep the change if it improves the score (with some controlled randomness)
  6. Repeat millions of times in seconds
  7. Return the best solution found

The Search Space

For a portfolio problem with 50 candidate stocks selecting exactly 20, there are trillions of valid combinations. The solver explores this space using smart heuristics, not brute force.

The Score: How “Good” is a Portfolio?

Every solution gets a score with two parts:

0hard/-45000soft
  • Hard score: Counts hard constraint violations (must be 0 for a valid portfolio)
  • Soft score: Reflects optimization quality (higher/less negative is better)

Scoring rules:

  • Hard score must be 0 or positive (negative = invalid portfolio)
  • Among valid portfolios (hard score = 0), highest soft score wins
  • Hard score always takes priority over soft score

Portfolio example:

-2hard/-35000soft  → Invalid: 2 constraint violations
0hard/-50000soft   → Valid but low quality
0hard/-25000soft   → Valid and better quality

Writing Constraints: The Business Rules

Now the heart of the system. Open src/portfolio_optimization/constraints.py.

The Constraint Provider Pattern

All constraints are registered in one function:

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
    return [
        # Hard constraints
        must_select_target_count(constraint_factory),
        sector_exposure_limit(constraint_factory),
        # Soft constraints
        penalize_unselected_stock(constraint_factory),
        maximize_expected_return(constraint_factory),
    ]

Each constraint is a function returning a Constraint object. Let’s examine them.

Hard Constraint: Must Select Target Count

Business rule: “Don’t select more than N stocks” (default 20)

def must_select_target_count(constraint_factory: ConstraintFactory) -> Constraint:
    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .group_by(ConstraintCollectors.count())
        .join(PortfolioConfig)
        .filter(lambda count, config: count > config.target_count)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda count, config: count - config.target_count
        )
        .as_constraint("Must select target count")
    )

How to read this:

  1. for_each(StockSelection): Consider every stock
  2. .filter(...): Keep only selected stocks
  3. .group_by(count()): Count how many are selected
  4. .join(PortfolioConfig): Access the configuration
  5. .filter(...): Keep only if count exceeds target
  6. .penalize(ONE_HARD, ...): Each extra stock adds 1 hard penalty

Why only “not more than”? We use a separate soft constraint to drive selection toward the target. This approach handles edge cases better than counting both over and under.

Soft Constraint: Penalize Unselected Stock

Business rule: “Strongly prefer selecting stocks to meet the target”

def penalize_unselected_stock(constraint_factory: ConstraintFactory) -> Constraint:
    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is False)
        .join(PortfolioConfig)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda stock, config: config.unselected_penalty  # Default 10000
        )
        .as_constraint("Penalize unselected stock")
    )

How to read this:

  1. for_each(StockSelection): Consider every stock
  2. .filter(...): Keep only unselected stocks
  3. .join(PortfolioConfig): Access the penalty value
  4. .penalize(ONE_SOFT, 10000): Each unselected stock costs 10000 soft points

Why 10000? This penalty is higher than the maximum return reward (~2000 per stock). This ensures the solver prioritizes reaching 20 stocks before optimizing returns.

Example with 25 stocks:

  • Optimal: 20 selected + 5 unselected = -50000 soft penalty
  • If returns reward is ~30000, final soft score is around -20000soft

Hard Constraint: Sector Exposure Limit

Business rule: “No sector can exceed N stocks” (default 5 = 25% of 20)

def sector_exposure_limit(constraint_factory: ConstraintFactory) -> Constraint:
    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .group_by(
            lambda stock: stock.sector,      # Group by sector name
            ConstraintCollectors.count()      # Count stocks per sector
        )
        .join(PortfolioConfig)
        .filter(lambda sector, count, config: count > config.max_per_sector)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda sector, count, config: count - config.max_per_sector
        )
        .as_constraint("Max stocks per sector")
    )

How to read this:

  1. for_each(StockSelection): All stocks
  2. .filter(...): Keep only selected stocks
  3. .group_by(sector, count()): Count selected stocks in each sector
  4. .join(PortfolioConfig): Access the sector limit
  5. .filter(...): Keep sectors exceeding the limit
  6. .penalize(...): Penalty = stocks over limit per sector

Finance concept: This enforces diversification. If Tech crashes 50%, you only lose 25% × 50% = 12.5% of your portfolio, not 40% × 50% = 20%.

Soft Constraint: Maximize Expected Return

Business rule: “Among valid portfolios, prefer stocks with higher predicted returns”

def maximize_expected_return(constraint_factory: ConstraintFactory) -> Constraint:
    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .reward(
            HardSoftScore.ONE_SOFT,
            lambda stock: int(stock.predicted_return * 10000)
        )
        .as_constraint("Maximize expected return")
    )

How to read this:

  1. for_each(StockSelection): All stocks
  2. .filter(...): Keep only selected stocks
  3. .reward(...): Add points based on predicted return

Why multiply by 10000? Converts decimal returns (0.12) to integer scores (1200). A stock with 12% predicted return adds 1200 soft points.

Example calculation:

StockReturnScore Contribution
NVDA20%+2000
AAPL12%+1200
JPM8%+800
XOM4%+400

The Solver Engine

Now let’s see how the solver is configured. Open src/portfolio_optimization/solver.py:

def create_solver_config(termination_seconds: int = 30) -> SolverConfig:
    return SolverConfig(
        solution_class=PortfolioOptimizationPlan,
        entity_class_list=[StockSelection],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(
            spent_limit=Duration(seconds=termination_seconds)
        ),
    )

solver_config = create_solver_config()
solver_manager = SolverManager.create(SolverFactory.create(solver_config))
solution_manager = SolutionManager.create(solver_manager)

Configuration Breakdown

solution_class: Your planning solution class (PortfolioOptimizationPlan)

entity_class_list: Planning entities to optimize ([StockSelection])

score_director_factory_config: Contains the constraint provider function

  • Note: Nested inside ScoreDirectorFactoryConfig, not directly in SolverConfig

termination_config: When to stop solving

  • spent_limit=Duration(seconds=30): Stop after 30 seconds

SolverManager: Asynchronous Solving

SolverManager handles solving in the background without blocking your API:

# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, portfolio, callback_function)

# Check status
status = solver_manager.get_solver_status(job_id)

# Get current best solution
solution = solver_manager.get_solution(job_id)

# Stop early
solver_manager.terminate_early(job_id)

Solving Timeline

Small problems (25 stocks, select 20):

  • Initial valid solution: < 1 second
  • Good solution: 5-10 seconds
  • Optimal or near-optimal: 30 seconds

Large problems (50+ stocks, select 20):

  • Initial valid solution: 1-3 seconds
  • Good solution: 15-30 seconds
  • High-quality: 60-120 seconds

Factors affecting speed:

  • Number of candidate stocks (search space size)
  • Sector distribution (tighter constraints = harder)
  • How many stocks to select vs available

Web Interface and API

REST API Endpoints

Open src/portfolio_optimization/rest_api.py to see the API:

GET /demo-data

Returns available demo datasets:

["SMALL", "LARGE"]

GET /demo-data/{dataset_id}

Generates and returns sample stock data:

{
  "stocks": [
    {
      "stockId": "AAPL",
      "stockName": "Apple Inc.",
      "sector": "Technology",
      "predictedReturn": 0.12,
      "selected": null
    }
  ],
  "targetPositionCount": 20,
  "maxSectorPercentage": 0.25
}

POST /portfolios

Submit a portfolio for optimization:

Request body: Same format as demo-data response

Response: Job ID as plain text

"a1b2c3d4-e5f6-7890-abcd-ef1234567890"

Implementation highlights:

@app.post("/portfolios")
async def solve_portfolio(plan_model: PortfolioOptimizationPlanModel) -> str:
    job_id = str(uuid4())
    plan = model_to_plan(plan_model)
    data_sets[job_id] = plan

    # Support custom termination time
    termination_seconds = 30
    if plan_model.solver_config:
        termination_seconds = plan_model.solver_config.termination_seconds

    config = create_solver_config(termination_seconds)
    manager = SolverManager.create(SolverFactory.create(config))
    solver_managers[job_id] = manager

    manager.solve_and_listen(job_id, plan, lambda sol: update_portfolio(job_id, sol))
    return job_id

GET /portfolios/{problem_id}

Get current solution:

{
  "stocks": [...],
  "targetPositionCount": 20,
  "maxSectorPercentage": 0.25,
  "score": "0hard/-25000soft",
  "solverStatus": "SOLVING_ACTIVE"
}

GET /portfolios/{problem_id}/status

Lightweight status check with metrics:

{
  "score": {
    "hardScore": 0,
    "softScore": -25000
  },
  "solverStatus": "SOLVING_ACTIVE",
  "selectedCount": 20,
  "expectedReturn": 0.1125,
  "sectorWeights": {
    "Technology": 0.25,
    "Healthcare": 0.25,
    "Finance": 0.25,
    "Energy": 0.25
  }
}

DELETE /portfolios/{problem_id}

Stop solving early and return best solution found.

PUT /portfolios/analyze

Analyze a portfolio’s constraint violations in detail:

{
  "constraints": [
    {
      "name": "Max stocks per sector",
      "weight": "1hard",
      "score": "-1hard",
      "matches": [
        {
          "name": "Max stocks per sector",
          "score": "-1hard",
          "justification": "Technology: 6 stocks (limit 5)"
        }
      ]
    }
  ]
}

Web UI Flow

The static/app.js implements this polling workflow:

  1. User opens page → Load demo data (GET /demo-data/SMALL)
  2. Display stocks grouped by sector with predicted returns
  3. User clicks “Solve”POST /portfolios (get job ID back)
  4. Poll GET /portfolios/{id}/status every 500ms
  5. Update UI with latest selections and score in real-time
  6. When solverStatus === "NOT_SOLVING" → Stop polling
  7. Display final score, selected stocks, and sector allocation chart

Making Your First Customization

The quickstart includes a tutorial constraint that demonstrates a common pattern. Let’s understand how it works and then learn how to create similar constraints.

Understanding the Preferred Sector Bonus

The codebase includes preferred_sector_bonus which gives a small bonus to Technology and Healthcare stocks. This constraint is disabled by default (commented out in define_constraints()) but serves as a useful example.

Business rule: “Slightly favor Technology and Healthcare stocks (higher growth sectors)”

The Constraint Implementation

Find this in src/portfolio_optimization/constraints.py around line 200:

# TUTORIAL: Uncomment this constraint to add sector preference
# def preferred_sector_bonus(constraint_factory: ConstraintFactory):
#     """Soft constraint: Give a small bonus to stocks from preferred sectors."""
#     PREFERRED_SECTORS = {"Technology", "Healthcare"}
#     BONUS_POINTS = 50  # Small bonus per preferred stock
#
#     return (
#         constraint_factory.for_each(StockSelection)
#         .filter(lambda stock: stock.selected is True)
#         .filter(lambda stock: stock.sector in PREFERRED_SECTORS)
#         .reward(
#             HardSoftScore.ONE_SOFT,
#             lambda stock: BONUS_POINTS
#         )
#         .as_constraint("Preferred sector bonus")
#     )

How this works:

  1. Find all selected stocks
  2. Keep only those in preferred sectors
  3. Reward each with 50 bonus points

Enabling the Constraint

  1. Uncomment the function (remove the # comment markers)

  2. Register it in define_constraints():

    return [
        must_select_target_count(constraint_factory),
        sector_exposure_limit(constraint_factory),
        penalize_unselected_stock(constraint_factory),
        maximize_expected_return(constraint_factory),
        preferred_sector_bonus(constraint_factory),  # ADD THIS LINE
    ]
    
  3. Restart the server and solve again

  4. Observe the portfolio now slightly favors Tech and Healthcare stocks

Why 50 Points?

The bonus is intentionally small (50) compared to return rewards (1000-2000 per stock). This makes it a tiebreaker rather than an override:

  • If two stocks have similar predicted returns, prefer the one in a preferred sector
  • Don’t sacrifice significant returns just to pick a preferred sector

Adding a Test

Add a test class to tests/test_constraints.py:

from portfolio_optimization.constraints import preferred_sector_bonus

class TestPreferredSectorBonus:
    def test_technology_stock_rewarded(self) -> None:
        """Technology stocks should receive bonus."""
        stock = create_stock("TECH1", sector="Technology", selected=True)

        constraint_verifier.verify_that(preferred_sector_bonus).given(
            stock
        ).rewards_with(50)

    def test_energy_stock_not_rewarded(self) -> None:
        """Energy stocks should not receive bonus."""
        stock = create_stock("ENGY1", sector="Energy", selected=True)

        constraint_verifier.verify_that(preferred_sector_bonus).given(
            stock
        ).rewards(0)

    def test_unselected_tech_not_rewarded(self) -> None:
        """Unselected stocks don't receive bonus even if in preferred sector."""
        stock = create_stock("TECH1", sector="Technology", selected=False)

        constraint_verifier.verify_that(preferred_sector_bonus).given(
            stock
        ).rewards(0)

Run with:

pytest tests/test_constraints.py::TestPreferredSectorBonus -v

Advanced Constraint Patterns

Pattern 1: Volatility Risk Penalty

Scenario: Penalize portfolios with high variance in predicted returns (higher risk).

def penalize_high_volatility(constraint_factory: ConstraintFactory) -> Constraint:
    """
    Soft constraint: Penalize portfolios with high return variance.

    Risk-averse investors prefer consistent returns over volatile ones.
    """
    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .group_by(
            ConstraintCollectors.to_list(lambda stock: stock.predicted_return)
        )
        .filter(lambda returns: len(returns) >= 2)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda returns: int(calculate_variance(returns) * 10000)
        )
        .as_constraint("High volatility penalty")
    )

def calculate_variance(returns: list[float]) -> float:
    mean = sum(returns) / len(returns)
    return sum((r - mean) ** 2 for r in returns) / len(returns)

Pattern 2: Minimum Sector Representation

Scenario: Ensure each sector has at least 2 stocks (broader diversification).

def minimum_sector_representation(constraint_factory: ConstraintFactory) -> Constraint:
    """
    Hard constraint: Each sector must have at least 2 stocks.

    Ensures broader diversification beyond just max limits.
    """
    MIN_PER_SECTOR = 2
    KNOWN_SECTORS = {"Technology", "Healthcare", "Finance", "Energy"}

    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .group_by(lambda stock: stock.sector, ConstraintCollectors.count())
        .filter(lambda sector, count: sector in KNOWN_SECTORS and count < MIN_PER_SECTOR)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda sector, count: MIN_PER_SECTOR - count
        )
        .as_constraint("Minimum sector representation")
    )

Pattern 3: Exclude Specific Stocks

Scenario: Some stocks are on a “do not buy” list (regulatory, ethical, etc.).

def exclude_blacklisted_stocks(constraint_factory: ConstraintFactory) -> Constraint:
    """
    Hard constraint: Never select blacklisted stocks.

    Useful for regulatory compliance or ethical investing.
    """
    BLACKLIST = {"TOBACCO1", "GAMBLING2", "WEAPONS3"}

    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .filter(lambda stock: stock.stock_id in BLACKLIST)
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Exclude blacklisted stocks")
    )

Pattern 4: Sector Weight Balance

Scenario: Prefer portfolios where no sector is significantly larger than others.

def balance_sector_weights(constraint_factory: ConstraintFactory) -> Constraint:
    """
    Soft constraint: Prefer balanced sector allocation.

    Uses load balancing to penalize uneven distribution.
    """
    return (
        constraint_factory.for_each(StockSelection)
        .filter(lambda stock: stock.selected is True)
        .group_by(lambda stock: stock.sector, ConstraintCollectors.count())
        .group_by(
            ConstraintCollectors.load_balance(
                lambda sector, count: sector,
                lambda sector, count: count
            )
        )
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda balance: int(balance.unfairness() * 100)
        )
        .as_constraint("Balance sector weights")
    )

Testing and Validation

Unit Testing Constraints

The quickstart uses ConstraintVerifier for isolated constraint testing. See tests/test_constraints.py:

from solverforge_legacy.solver.test import ConstraintVerifier

constraint_verifier = ConstraintVerifier.build(
    define_constraints, PortfolioOptimizationPlan, StockSelection
)

DEFAULT_CONFIG = PortfolioConfig(target_count=20, max_per_sector=5, unselected_penalty=10000)

def create_stock(stock_id, sector="Technology", predicted_return=0.10, selected=True):
    selection_value = SELECTED if selected else NOT_SELECTED
    return StockSelection(
        stock_id=stock_id,
        stock_name=f"{stock_id} Corp",
        sector=sector,
        predicted_return=predicted_return,
        selection=selection_value,
    )

Test patterns:

Verify no penalty:

def test_at_limit_no_penalty(self):
    stocks = [create_stock(f"TECH{i}", sector="Technology") for i in range(5)]
    constraint_verifier.verify_that(sector_exposure_limit).given(
        *stocks, DEFAULT_CONFIG
    ).penalizes(0)

Verify exact penalty amount:

def test_one_over_limit_penalizes_1(self):
    stocks = [create_stock(f"TECH{i}", sector="Technology") for i in range(6)]
    constraint_verifier.verify_that(sector_exposure_limit).given(
        *stocks, DEFAULT_CONFIG
    ).penalizes_by(1)

Verify reward amount:

def test_high_return_stock_rewarded(self):
    stock = create_stock("AAPL", predicted_return=0.12, selected=True)
    constraint_verifier.verify_that(maximize_expected_return).given(
        stock
    ).rewards_with(1200)  # 0.12 * 10000

Running Tests

# All tests
pytest

# Verbose output
pytest -v

# Specific test file
pytest tests/test_constraints.py

# Specific test class
pytest tests/test_constraints.py::TestSectorExposureLimit

# With coverage
pytest --cov=portfolio_optimization

Integration Testing: Full Solve

Test the complete solving cycle in tests/test_feasible.py:

def test_small_dataset_feasible():
    """Solver should find a feasible solution for small dataset."""
    plan = generate_demo_data(DemoData.SMALL)

    # All stocks start unselected
    assert all(s.selection is None for s in plan.stocks)

    # Solve for 10 seconds
    solution = solve_for_seconds(plan, 10)

    # Should select exactly 20 stocks
    assert solution.get_selected_count() == 20

    # Should have no sector over 25%
    for sector, weight in solution.get_sector_weights().items():
        assert weight <= 0.26, f"{sector} at {weight*100}%"

    # Should have 0 hard score (feasible)
    assert solution.score.hard_score == 0

Quick Reference

File Locations

Need to…Edit this file
Add/change business rulesrc/portfolio_optimization/constraints.py
Add field to StockSelectionsrc/portfolio_optimization/domain.py + converters.py
Change default configsrc/portfolio_optimization/domain.py (PortfolioConfig)
Change solve timesrc/portfolio_optimization/solver.py or API parameter
Add REST endpointsrc/portfolio_optimization/rest_api.py
Change demo datasrc/portfolio_optimization/demo_data.py
Change UIstatic/index.html, static/app.js

Common Constraint Patterns

Count selected stocks:

constraint_factory.for_each(StockSelection)
    .filter(lambda stock: stock.selected is True)
    .group_by(ConstraintCollectors.count())
    .filter(lambda count: count > MAX)
    .penalize(...)

Group by sector and count:

constraint_factory.for_each(StockSelection)
    .filter(lambda stock: stock.selected is True)
    .group_by(lambda stock: stock.sector, ConstraintCollectors.count())
    .filter(lambda sector, count: count > MAX)
    .penalize(...)

Reward based on attribute:

constraint_factory.for_each(StockSelection)
    .filter(lambda stock: stock.selected is True)
    .reward(HardSoftScore.ONE_SOFT, lambda stock: int(stock.attribute * 10000))

Filter by set membership:

constraint_factory.for_each(StockSelection)
    .filter(lambda stock: stock.selected is True)
    .filter(lambda stock: stock.sector in PREFERRED_SECTORS)
    .reward(...)

Access configurable parameters:

constraint_factory.for_each(StockSelection)
    .filter(...)
    .join(PortfolioConfig)
    .filter(lambda stock, config: some_condition(stock, config))
    .penalize(HardSoftScore.ONE_HARD, lambda stock, config: penalty(config))

Common Gotchas

  1. Forgot to register constraint in define_constraints() return list

    • Symptom: Constraint not enforced
  2. Using wrong score type

    • HardSoftScore.ONE_HARD for must-satisfy rules
    • HardSoftScore.ONE_SOFT for preferences
  3. Boolean vs SelectionValue confusion

    • Use stock.selected is True (the property)
    • Not stock.selection == True (would compare wrong type)
  4. Empty stream returns nothing, not 0

    • If no stocks are selected, group_by(count()) produces nothing
    • Can’t use “must have at least N” pattern naively
  5. Score sign confusion

    • Higher soft score is better (less negative)
    • Use .reward() to add points, .penalize() to subtract
  6. Forgetting to pass config to constraint tests

    • Parameterized constraints need PortfolioConfig as a problem fact
    • constraint_verifier.verify_that(...).given(*stocks, DEFAULT_CONFIG)

Debugging Tips

Enable verbose logging:

import logging
logging.basicConfig(level=logging.DEBUG)

Use the /analyze endpoint:

curl -X PUT http://localhost:8080/portfolios/analyze \
  -H "Content-Type: application/json" \
  -d @my_portfolio.json

Print in constraints (temporary debugging):

.filter(lambda stock: (
    print(f"Checking {stock.stock_id}: {stock.sector}") or
    stock.sector in PREFERRED_SECTORS
))

Additional Resources

4 - VM Placement

A comprehensive quickstart guide to understanding and building intelligent virtual machine placement optimization with SolverForge

Legacy Implementation Guide

This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.

SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.


Table of Contents

  1. Introduction
  2. Getting Started
  3. The Problem We’re Solving
  4. Understanding the Data Model
  5. How Optimization Works
  6. Writing Constraints: The Business Rules
  7. The Solver Engine
  8. Web Interface and API
  9. Making Your First Customization
  10. Advanced Constraint Patterns
  11. Testing and Validation
  12. Quick Reference

Introduction

What You’ll Learn

This guide walks you through a complete virtual machine placement application built with SolverForge, a constraint-based optimization framework. You’ll learn:

  • How to model resource allocation decisions as optimization problems
  • How to express capacity limits and placement rules as constraints
  • How optimization algorithms find efficient placements automatically
  • How to customize the system for your specific infrastructure requirements

No optimization or cloud infrastructure background required — we’ll explain both optimization and datacenter concepts as we encounter them in the code.

Architecture Note: This guide uses the “fast” implementation pattern with dataclass domain models and Pydantic only at API boundaries. For the architectural reasoning behind this design, see Dataclasses vs Pydantic in Constraint Solvers.

Prerequisites

  • Basic Python knowledge (classes, functions, type annotations)
  • Familiarity with REST APIs
  • Comfort with command-line operations

What is VM Placement Optimization?

Traditional VM placement: You write explicit rules like “sort VMs by size and pack them onto servers using first-fit decreasing.”

Constraint-based VM placement: You describe what a valid placement looks like (capacity respected, replicas separated, load balanced) and the solver figures out which VM goes where.

Think of it like describing the ideal datacenter state and having a computer try millions of placement combinations per second to find the best fit.

Datacenter Concepts (Quick Primer)

TermDefinitionExample
ServerPhysical machine with CPU, memory, and storage32 cores, 128 GB RAM, 2 TB storage
VMVirtual machine requiring resources from a server4 cores, 16 GB RAM, 100 GB storage
RackPhysical grouping of servers in a datacenterRack A contains 8 servers
AffinityVMs that should run on the same serverWeb app and its cache
Anti-AffinityVMs that must run on different serversDatabase primary and replica
ConsolidationUsing fewer servers to reduce power/cooling costsPack VMs tightly

Getting Started

Running the Application

  1. Download and navigate to the project directory:

    git clone https://github.com/SolverForge/solverforge-quickstarts
    cd ./solverforge-quickstarts/legacy/vm-placement-fast
    
  2. Create and activate virtual environment:

    python -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. Start the server:

    run-app
    
  5. Open your browser:

    http://localhost:8080
    

You’ll see a VM placement interface with server racks, VMs, and a “Solve” button. Click it and watch the solver automatically assign VMs to servers while respecting capacity limits and placement rules.

File Structure Overview

legacy/vm-placement-fast/
├── src/vm_placement/
│   ├── domain.py              # Data classes (Server, VM, VMPlacementPlan)
│   ├── constraints.py         # Business rules (90% of customization happens here)
│   ├── solver.py              # Solver configuration
│   ├── demo_data.py           # Sample infrastructure and VMs
│   ├── rest_api.py            # HTTP API endpoints
│   ├── converters.py          # REST ↔ Domain model conversion
│   └── json_serialization.py  # JSON helpers
├── static/
│   ├── index.html             # Web UI with rack visualization
│   ├── app.js                 # UI logic and visualization
│   └── config.js              # Advanced settings sliders
└── tests/
    └── test_constraints.py    # Unit tests for constraints

Key insight: Most business customization happens in constraints.py alone. You rarely need to modify other files.


The Problem We’re Solving

The Infrastructure Challenge

You manage a datacenter with physical servers organized in racks, and must place virtual machines onto those servers. Each server has limited CPU cores, memory, and storage capacity.

Hard constraints (must be satisfied):

  • Never exceed a server’s CPU, memory, or storage capacity
  • Keep database replicas on different servers (anti-affinity)

Soft constraints (preferences to optimize):

  • Place related services together when possible (affinity)
  • Minimize the number of active servers (consolidation)
  • Balance load across active servers
  • Place higher-priority VMs first

Why Use a Constraint Solver?

For simple bin-packing (fit VMs into servers by size), a well-implemented first-fit-decreasing algorithm works. So why use a constraint solver?

1. Declarative vs Imperative: With constraints, you describe what you want, not how to achieve it. Adding a new rule is one function, not a rewrite of your algorithm.

2. Constraint Interactions: As constraints multiply, greedy logic becomes brittle. Consider adding:

  • Anti-affinity for database replicas
  • Affinity for microservice tiers
  • GPU requirements for ML workloads
  • Rack-aware fault tolerance

Each new constraint in greedy code means more if/else branches and edge cases. In a constraint solver, you just add another constraint function.

3. Real-World Complexity: Production datacenters have migration costs, maintenance windows, SLA requirements, and live traffic patterns. These create solution spaces where greedy approaches fail.


Understanding the Data Model

Let’s examine the core classes that model our problem. Open src/vm_placement/domain.py:

Domain Model Architecture

This quickstart separates domain models (dataclasses) from API models (Pydantic):

  • Domain layer (domain.py lines 26-156): Pure @dataclass models for solver operations
  • API layer (domain.py lines 159-207): Pydantic BaseModel classes for REST endpoints
  • Converters (converters.py): Translate between the two layers

The Server Class (Problem Fact)

@dataclass
class Server:
    id: Annotated[str, PlanningId]
    name: str
    cpu_cores: int
    memory_gb: int
    storage_gb: int
    rack: Optional[str] = None

What it represents: A physical server that can host virtual machines.

Key fields:

  • id: Unique identifier for the server
  • name: Human-readable server name
  • cpu_cores: Available CPU cores
  • memory_gb: Available memory in gigabytes
  • storage_gb: Available storage in gigabytes
  • rack: Which physical rack contains this server

Optimization concept: This is a problem fact — immutable data that doesn’t change during solving. Servers are the targets for VM placement, not the decisions themselves.

The VM Class (Planning Entity)

@planning_entity
@dataclass
class VM:
    id: Annotated[str, PlanningId]
    name: str
    cpu_cores: int
    memory_gb: int
    storage_gb: int
    priority: int = 1
    affinity_group: Optional[str] = None
    anti_affinity_group: Optional[str] = None
    server: Annotated[Optional[Server], PlanningVariable] = None

What it represents: A virtual machine that needs to be placed on a server.

Key fields:

  • id: Unique identifier (VM ID)
  • name: Human-readable VM name
  • cpu_cores, memory_gb, storage_gb: Resource requirements
  • priority: Importance level (1-5, higher = more important)
  • affinity_group: Group name for VMs that should be together
  • anti_affinity_group: Group name for VMs that must be separated
  • server: The assignment decision — which server hosts this VM?

Important annotations:

  • @planning_entity: Tells SolverForge this class contains decisions to make
  • PlanningVariable: Marks server as the decision variable

Optimization concept: This is a planning variable — the value the solver assigns. Each VM starts with server=None (unassigned). The solver tries different server assignments, evaluating according to your constraints.

The Assignment Pattern

Unlike portfolio optimization where the planning variable is a Boolean (SELECTED/NOT_SELECTED), VM placement uses a reference assignment pattern:

server: Annotated[Optional[Server], PlanningVariable] = None

Why a reference? Each VM can be assigned to any server from the value range. The solver picks from the list of available servers, or leaves the VM unassigned (None).

The VMPlacementPlan Class (Planning Solution)

@planning_solution
@dataclass
class VMPlacementPlan:
    name: str
    servers: Annotated[list[Server], ProblemFactCollectionProperty, ValueRangeProvider]
    vms: Annotated[list[VM], PlanningEntityCollectionProperty]
    score: Annotated[Optional[HardSoftScore], PlanningScore] = None
    solver_status: SolverStatus = SolverStatus.NOT_SOLVING

What it represents: The complete problem and its solution.

Key fields:

  • name: Problem instance name (e.g., “Datacenter Alpha”)
  • servers: All physical servers (problem facts + value range)
  • vms: All VMs to place (planning entities)
  • score: Solution quality metric (calculated by constraints)
  • solver_status: Whether solving is active

Annotations explained:

  • @planning_solution: Marks this as the top-level problem definition
  • ProblemFactCollectionProperty: Immutable problem data
  • ValueRangeProvider: Servers are valid values for VM.server
  • PlanningEntityCollectionProperty: The entities being optimized
  • PlanningScore: Where the solver stores the calculated score

Helper Methods for Business Metrics

The VMPlacementPlan class includes useful analytics:

def get_vms_on_server(self, server: Server) -> list:
    """Get all VMs assigned to a specific server."""
    return [vm for vm in self.vms if vm.server == server]

def get_server_used_cpu(self, server: Server) -> int:
    """Get total CPU cores used on a server."""
    return sum(vm.cpu_cores for vm in self.vms if vm.server == server)

@property
def active_servers(self) -> int:
    """Count servers that have at least one VM assigned."""
    active_server_ids = set(vm.server.id for vm in self.vms if vm.server is not None)
    return len(active_server_ids)

@property
def unassigned_vms(self) -> int:
    """Count VMs without a server assignment."""
    return sum(1 for vm in self.vms if vm.server is None)

How Optimization Works

Before diving into constraints, let’s understand how the solver finds solutions.

The Search Process (Simplified)

  1. Start with an initial solution (often all VMs unassigned)
  2. Evaluate the score using your constraint functions
  3. Make a small change (assign one VM to a server, or move it)
  4. Evaluate the new score
  5. Keep the change if it improves the score (with some controlled randomness)
  6. Repeat millions of times in seconds
  7. Return the best solution found

The Search Space

For a placement problem with 12 servers and 30 VMs, each VM can go on any of 12 servers (or stay unassigned). That’s 13^30 possible combinations — far too many to enumerate. The solver explores this space using smart heuristics, not brute force.

The Score: How “Good” is a Placement?

Every solution gets a score with two parts:

0hard/-2500soft
  • Hard score: Counts hard constraint violations (must be 0 for a valid placement)
  • Soft score: Reflects optimization quality (higher/less negative is better)

Scoring rules:

  • Hard score must be 0 or positive (negative = invalid placement)
  • Among valid placements (hard score = 0), highest soft score wins
  • Hard score always takes priority over soft score

Placement example:

-4hard/-1200soft  → Invalid: 4 capacity violations
0hard/-3000soft   → Valid but many servers active
0hard/-1500soft   → Valid with better consolidation

Writing Constraints: The Business Rules

Now the heart of the system. Open src/vm_placement/constraints.py.

The Constraint Provider Pattern

All constraints are registered in one function:

@constraint_provider
def define_constraints(factory: ConstraintFactory):
    return [
        # Hard constraints
        cpu_capacity(factory),
        memory_capacity(factory),
        storage_capacity(factory),
        anti_affinity(factory),
        # Soft constraints
        affinity(factory),
        minimize_servers_used(factory),
        balance_utilization(factory),
        prioritize_placement(factory),
    ]

Each constraint is a function returning a Constraint object. Let’s examine them.

Hard Constraint: CPU Capacity

Business rule: “Server CPU capacity cannot be exceeded”

def cpu_capacity(factory: ConstraintFactory):
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.cpu_cores))
        .filter(lambda server, total_cpu: total_cpu > server.cpu_cores)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda server, total_cpu: total_cpu - server.cpu_cores,
        )
        .as_constraint("cpuCapacity")
    )

How to read this:

  1. for_each(VM): Consider every VM
  2. .filter(...): Keep only assigned VMs (server is not None)
  3. .group_by(server, sum(cpu_cores)): Sum CPU cores per server
  4. .filter(...): Keep only overloaded servers
  5. .penalize(ONE_HARD, excess): Each excess core adds 1 hard penalty

Example: Server with 16 cores hosting VMs totaling 20 cores = penalty of 4

Hard Constraint: Memory Capacity

Business rule: “Server memory capacity cannot be exceeded”

def memory_capacity(factory: ConstraintFactory):
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.memory_gb))
        .filter(lambda server, total_memory: total_memory > server.memory_gb)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda server, total_memory: total_memory - server.memory_gb,
        )
        .as_constraint("memoryCapacity")
    )

Same pattern as CPU capacity, applied to memory.

Hard Constraint: Storage Capacity

Business rule: “Server storage capacity cannot be exceeded”

def storage_capacity(factory: ConstraintFactory):
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.storage_gb))
        .filter(lambda server, total_storage: total_storage > server.storage_gb)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda server, total_storage: total_storage - server.storage_gb,
        )
        .as_constraint("storageCapacity")
    )

Same pattern as CPU capacity, applied to storage.

Hard Constraint: Anti-Affinity

Business rule: “VMs in the same anti-affinity group must be on different servers”

def anti_affinity(factory: ConstraintFactory):
    return (
        factory.for_each_unique_pair(
            VM,
            Joiners.equal(lambda vm: vm.anti_affinity_group),
            Joiners.equal(lambda vm: vm.server),
        )
        .filter(lambda vm1, vm2: vm1.anti_affinity_group is not None)
        .filter(lambda vm1, vm2: vm1.server is not None)
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("antiAffinity")
    )

How to read this:

  1. for_each_unique_pair(VM, ...): Find pairs of VMs
  2. Joiners.equal(anti_affinity_group): Same anti-affinity group
  3. Joiners.equal(server): On the same server
  4. .filter(...): Group must be set (not None)
  5. .filter(...): Both must be assigned
  6. .penalize(ONE_HARD): Each violating pair adds 1 hard penalty

Use case: Database replicas should never be on the same physical server. If one server fails, the other replica survives.

Soft Constraint: Affinity

Business rule: “VMs in the same affinity group should be on the same server”

def affinity(factory: ConstraintFactory):
    return (
        factory.for_each_unique_pair(
            VM,
            Joiners.equal(lambda vm: vm.affinity_group),
        )
        .filter(lambda vm1, vm2: vm1.affinity_group is not None)
        .filter(lambda vm1, vm2: vm1.server is not None and vm2.server is not None)
        .filter(lambda vm1, vm2: vm1.server != vm2.server)
        .penalize(HardSoftScore.ONE_SOFT, lambda vm1, vm2: 100)
        .as_constraint("affinity")
    )

How to read this:

  1. Find pairs of VMs in the same affinity group
  2. Both must be assigned
  3. Penalize if they’re on different servers
  4. Each separated pair costs 100 soft points

Use case: Web servers and their cache should be together for low latency.

Soft Constraint: Minimize Servers Used

Business rule: “Use fewer servers to reduce power and cooling costs”

def minimize_servers_used(factory: ConstraintFactory):
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .group_by(lambda vm: vm.server, ConstraintCollectors.count())
        .penalize(HardSoftScore.ONE_SOFT, lambda server, count: 100)
        .as_constraint("minimizeServersUsed")
    )

How to read this:

  1. Find all assigned VMs
  2. Group by server and count VMs
  3. Each active server (with at least 1 VM) costs 100 soft points

Business concept: Server consolidation. An idle server still consumes power for cooling, lighting, and baseline operations. Packing VMs onto fewer servers reduces operational costs.

Soft Constraint: Balance Utilization

Business rule: “Distribute load evenly across active servers”

def balance_utilization(factory: ConstraintFactory):
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.cpu_cores))
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda server, total_cpu: int((total_cpu / server.cpu_cores) ** 2 * 10) if server.cpu_cores > 0 else 0,
        )
        .as_constraint("balanceUtilization")
    )

How to read this:

  1. Sum CPU usage per server
  2. Calculate utilization ratio (used/capacity)
  3. Apply squared penalty — heavily loaded servers cost more

Why squared? A server at 90% utilization is riskier than two servers at 45%. Squaring creates a “fairness” preference that spreads load.

Example:

ScenarioServer AServer BTotal Penalty
Imbalanced90% = 8.110% = 0.18.2
Balanced50% = 2.550% = 2.55.0

Soft Constraint: Prioritize Placement

Business rule: “Higher-priority VMs should be placed first”

def prioritize_placement(factory: ConstraintFactory):
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is None)
        .penalize(HardSoftScore.ONE_SOFT, lambda vm: 10000 + vm.priority * 1000)
        .as_constraint("prioritizePlacement")
    )

How to read this:

  1. Find unassigned VMs
  2. Penalize each based on priority
  3. Higher priority = larger penalty when unassigned

Why these weights? The base penalty (10000) ensures VMs get placed before other soft constraints are optimized. The priority multiplier (1000) makes high-priority VMs more “expensive” to leave unassigned.

Example penalties:

PriorityUnassigned Penalty
1 (low)11000
3 (medium)13000
5 (critical)15000

The Solver Engine

Now let’s see how the solver is configured. Open src/vm_placement/solver.py:

from solverforge_legacy.solver import (
    SolverManager,
    SolverConfig,
    SolverFactory,
    SolutionManager,
)
from solverforge_legacy.solver.config import (
    ScoreDirectorFactoryConfig,
    TerminationConfig,
    Duration,
)
from .constraints import define_constraints
from .domain import VMPlacementPlan, VM

solver_config = SolverConfig(
    solution_class=VMPlacementPlan,
    entity_class_list=[VM],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(
        spent_limit=Duration(seconds=30)
    ),
)

solver_manager = SolverManager.create(SolverFactory.create(solver_config))
solution_manager = SolutionManager.create(solver_manager)

Configuration Breakdown

solution_class: Your planning solution class (VMPlacementPlan)

entity_class_list: Planning entities to optimize ([VM])

  • Note: Only VM is listed, not Server — servers are problem facts

score_director_factory_config: Contains the constraint provider function

termination_config: When to stop solving

  • spent_limit=Duration(seconds=30): Stop after 30 seconds

SolverManager: Asynchronous Solving

SolverManager handles solving in the background without blocking your API:

# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, placement, callback_function)

# Check status
status = solver_manager.get_solver_status(job_id)

# Stop early
solver_manager.terminate_early(job_id)

Solving Timeline

Small problems (12 servers, 30 VMs):

  • Initial valid placement: < 1 second
  • Good placement: 5-10 seconds
  • Near-optimal: 30 seconds

Large problems (50+ servers, 200 VMs):

  • Initial valid placement: 2-5 seconds
  • Good placement: 30-60 seconds
  • High-quality: 2-5 minutes

Factors affecting speed:

  • Number of servers and VMs (search space size)
  • Constraint tightness (capacity headroom)
  • Anti-affinity groups (placement restrictions)

Web Interface and API

REST API Endpoints

Open src/vm_placement/rest_api.py to see the API:

GET /demo-data

Returns available demo datasets:

["SMALL", "MEDIUM", "LARGE"]

GET /demo-data/{dataset_id}

Generates and returns sample infrastructure data:

{
  "name": "Small Datacenter",
  "servers": [
    {
      "id": "server-1",
      "name": "Rack-A-Server-1",
      "cpuCores": 32,
      "memoryGb": 128,
      "storageGb": 2000,
      "rack": "Rack-A"
    }
  ],
  "vms": [
    {
      "id": "vm-1",
      "name": "DB-Primary",
      "cpuCores": 8,
      "memoryGb": 32,
      "storageGb": 500,
      "priority": 5,
      "antiAffinityGroup": "db-replicas",
      "server": null
    }
  ]
}

POST /demo-data/generate

Generate custom infrastructure with configurable parameters:

Request body:

{
  "rack_count": 3,
  "servers_per_rack": 4,
  "vm_count": 30
}

Response: Same format as demo-data response

POST /placements

Submit a placement problem for optimization:

Request body: Same format as demo-data response

Response: Job ID as plain text

"a1b2c3d4-e5f6-7890-abcd-ef1234567890"

GET /placements/{problem_id}

Get current solution:

{
  "name": "Small Datacenter",
  "servers": [...],
  "vms": [...],
  "score": "0hard/-2500soft",
  "solverStatus": "SOLVING_ACTIVE",
  "totalServers": 12,
  "activeServers": 6,
  "unassignedVms": 0
}

GET /placements/{problem_id}/status

Lightweight status check with metrics:

{
  "name": "Small Datacenter",
  "score": "0hard/-2500soft",
  "solverStatus": "SOLVING_ACTIVE",
  "activeServers": 6,
  "unassignedVms": 0
}

DELETE /placements/{problem_id}

Stop solving early and return best solution found.

PUT /placements/analyze

Analyze a placement’s constraint violations in detail:

{
  "constraints": [
    {
      "name": "cpuCapacity",
      "weight": "1hard",
      "score": "-2hard",
      "matches": [
        {
          "name": "cpuCapacity",
          "score": "-2hard",
          "justification": "Server-1: 34 cores used, 32 available"
        }
      ]
    }
  ]
}

Web UI Flow

The static/app.js implements this polling workflow:

  1. User opens page → Load demo data (GET /demo-data/SMALL)
  2. Display servers organized by rack with utilization bars
  3. User clicks “Solve”POST /placements (get job ID back)
  4. Poll GET /placements/{id} every 2 seconds
  5. Update UI with latest assignments and score in real-time
  6. When solverStatus === "NOT_SOLVING" → Stop polling
  7. Display final score, server utilization, and VM assignments

Advanced Settings

The web UI includes configurable sliders (in static/config.js):

  • Racks: Number of server racks (1-8)
  • Servers per Rack: Servers in each rack (2-10)
  • VMs: Number of VMs to place (5-200)
  • Solver Time: How long to optimize (5s-2min)

Click “Generate New Data” to create custom scenarios.


Making Your First Customization

Let’s add a new constraint that demonstrates a common datacenter pattern: rack-aware fault tolerance.

The Rack Diversity Constraint

Business rule: “VMs in the same anti-affinity group should be spread across different racks, not just different servers”

Why This Matters

If two database replicas are on different servers but the same rack, a rack-level failure (power, networking, cooling) takes out both. True high availability requires rack diversity.

Implementation

Add this to src/vm_placement/constraints.py:

def rack_diversity(factory: ConstraintFactory):
    """
    Soft constraint: Anti-affinity VMs should be on different racks.

    Provides rack-level fault tolerance beyond just server separation.
    """
    return (
        factory.for_each_unique_pair(
            VM,
            Joiners.equal(lambda vm: vm.anti_affinity_group),
        )
        .filter(lambda vm1, vm2: vm1.anti_affinity_group is not None)
        .filter(lambda vm1, vm2: vm1.server is not None and vm2.server is not None)
        .filter(lambda vm1, vm2: vm1.server.rack == vm2.server.rack)
        .penalize(HardSoftScore.ONE_SOFT, lambda vm1, vm2: 50)
        .as_constraint("rackDiversity")
    )

How to read this:

  1. Find pairs of VMs in the same anti-affinity group
  2. Both must be assigned
  3. Penalize if they’re on the same rack (even if different servers)
  4. Each same-rack pair costs 50 soft points

Registering the Constraint

Add it to define_constraints():

@constraint_provider
def define_constraints(factory: ConstraintFactory):
    return [
        # Hard constraints
        cpu_capacity(factory),
        memory_capacity(factory),
        storage_capacity(factory),
        anti_affinity(factory),
        # Soft constraints
        affinity(factory),
        minimize_servers_used(factory),
        balance_utilization(factory),
        prioritize_placement(factory),
        rack_diversity(factory),  # ADD THIS LINE
    ]

Adding Tests

Add to tests/test_constraints.py:

from vm_placement.constraints import rack_diversity

def test_rack_diversity_same_rack():
    """Anti-affinity VMs on same rack should be penalized."""
    server1 = Server(id="s1", name="Server1", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-A")
    server2 = Server(id="s2", name="Server2", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-A")
    vm1 = VM(id="vm1", name="DB-Primary", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
    vm2 = VM(id="vm2", name="DB-Replica", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
    assign(server1, vm1)
    assign(server2, vm2)

    (
        constraint_verifier.verify_that(rack_diversity)
        .given(server1, server2, vm1, vm2)
        .penalizes_by(50)
    )


def test_rack_diversity_different_racks():
    """Anti-affinity VMs on different racks should not be penalized."""
    server1 = Server(id="s1", name="Server1", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-A")
    server2 = Server(id="s2", name="Server2", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-B")
    vm1 = VM(id="vm1", name="DB-Primary", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
    vm2 = VM(id="vm2", name="DB-Replica", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
    assign(server1, vm1)
    assign(server2, vm2)

    (
        constraint_verifier.verify_that(rack_diversity)
        .given(server1, server2, vm1, vm2)
        .penalizes_by(0)
    )

Run with:

pytest tests/test_constraints.py -v -k "rack_diversity"

Advanced Constraint Patterns

Pattern 1: GPU Requirement

Scenario: Some VMs need GPU-equipped servers.

First, add a has_gpu field to Server and requires_gpu to VM in domain.py:

@dataclass
class Server:
    # ... existing fields ...
    has_gpu: bool = False

@planning_entity
@dataclass
class VM:
    # ... existing fields ...
    requires_gpu: bool = False

Then add the constraint:

def gpu_requirement(factory: ConstraintFactory):
    """
    Hard constraint: GPU VMs must be placed on GPU servers.
    """
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .filter(lambda vm: vm.requires_gpu)
        .filter(lambda vm: not vm.server.has_gpu)
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("gpuRequirement")
    )

Pattern 2: Maintenance Window Avoidance

Scenario: Some servers are scheduled for maintenance and shouldn’t receive new VMs.

@dataclass
class Server:
    # ... existing fields ...
    in_maintenance: bool = False

def avoid_maintenance_servers(factory: ConstraintFactory):
    """
    Soft constraint: Prefer servers not in maintenance window.

    VMs can still be placed there if necessary, but it's discouraged.
    """
    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .filter(lambda vm: vm.server.in_maintenance)
        .penalize(HardSoftScore.ONE_SOFT, lambda vm: 500)
        .as_constraint("avoidMaintenanceServers")
    )

Pattern 3: Memory Overcommit Limit

Scenario: Allow memory overcommit up to 120%, but heavily penalize beyond that.

def memory_overcommit_limit(factory: ConstraintFactory):
    """
    Soft constraint: Penalize memory overcommit beyond 120%.

    Many hypervisors support memory overcommit, but excessive overcommit
    causes performance degradation.
    """
    OVERCOMMIT_RATIO = 1.2

    return (
        factory.for_each(VM)
        .filter(lambda vm: vm.server is not None)
        .group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.memory_gb))
        .filter(lambda server, total_mem: total_mem > server.memory_gb * OVERCOMMIT_RATIO)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda server, total_mem: int((total_mem - server.memory_gb * OVERCOMMIT_RATIO) * 100)
        )
        .as_constraint("memoryOvercommitLimit")
    )

Pattern 4: Prefer Same Rack for Affinity

Scenario: When VMs can’t be on the same server, prefer the same rack.

def affinity_same_rack_preference(factory: ConstraintFactory):
    """
    Soft constraint: Affinity VMs on different servers should prefer same rack.

    Provides lower latency than cross-rack communication.
    """
    return (
        factory.for_each_unique_pair(
            VM,
            Joiners.equal(lambda vm: vm.affinity_group),
        )
        .filter(lambda vm1, vm2: vm1.affinity_group is not None)
        .filter(lambda vm1, vm2: vm1.server is not None and vm2.server is not None)
        .filter(lambda vm1, vm2: vm1.server != vm2.server)
        .filter(lambda vm1, vm2: vm1.server.rack != vm2.server.rack)
        .penalize(HardSoftScore.ONE_SOFT, lambda vm1, vm2: 25)
        .as_constraint("affinitySameRackPreference")
    )

Testing and Validation

Unit Testing Constraints

The quickstart uses ConstraintVerifier for isolated constraint testing. See tests/test_constraints.py:

from solverforge_legacy.solver.test import ConstraintVerifier

from vm_placement.domain import Server, VM, VMPlacementPlan
from vm_placement.constraints import define_constraints

# VM is the only planning entity (Server is a problem fact)
constraint_verifier = ConstraintVerifier.build(
    define_constraints, VMPlacementPlan, VM
)

def assign(server: Server, *vms: VM):
    """Helper to assign VMs to a server."""
    for vm in vms:
        vm.server = server

Test patterns:

Verify no penalty:

def test_cpu_capacity_not_exceeded():
    server = Server(id="s1", name="Server1", cpu_cores=16, memory_gb=64, storage_gb=500)
    vm1 = VM(id="vm1", name="VM1", cpu_cores=4, memory_gb=8, storage_gb=50)
    vm2 = VM(id="vm2", name="VM2", cpu_cores=8, memory_gb=16, storage_gb=100)
    assign(server, vm1, vm2)

    (
        constraint_verifier.verify_that(cpu_capacity)
        .given(server, vm1, vm2)
        .penalizes_by(0)
    )

Verify exact penalty amount:

def test_cpu_capacity_exceeded():
    server = Server(id="s1", name="Server1", cpu_cores=16, memory_gb=64, storage_gb=500)
    vm1 = VM(id="vm1", name="VM1", cpu_cores=12, memory_gb=8, storage_gb=50)
    vm2 = VM(id="vm2", name="VM2", cpu_cores=8, memory_gb=16, storage_gb=100)
    assign(server, vm1, vm2)

    # 12 + 8 = 20 cores, capacity = 16, excess = 4
    (
        constraint_verifier.verify_that(cpu_capacity)
        .given(server, vm1, vm2)
        .penalizes_by(4)
    )

Verify anti-affinity violation:

def test_anti_affinity_violated():
    server = Server(id="s1", name="Server1", cpu_cores=16, memory_gb=64, storage_gb=500)
    vm1 = VM(id="vm1", name="DB-Primary", cpu_cores=4, memory_gb=8, storage_gb=50, anti_affinity_group="db-replicas")
    vm2 = VM(id="vm2", name="DB-Replica", cpu_cores=4, memory_gb=8, storage_gb=50, anti_affinity_group="db-replicas")
    assign(server, vm1, vm2)

    # Both VMs on same server with same anti-affinity group = 1 violation
    (
        constraint_verifier.verify_that(anti_affinity)
        .given(server, vm1, vm2)
        .penalizes_by(1)
    )

Running Tests

# All tests
pytest

# Verbose output
pytest -v

# Specific test file
pytest tests/test_constraints.py

# Specific test function
pytest tests/test_constraints.py::test_cpu_capacity_exceeded

# With coverage
pytest --cov=vm_placement

Quick Reference

File Locations

Need to…Edit this file
Add/change business rulesrc/vm_placement/constraints.py
Add field to VM or Serversrc/vm_placement/domain.py + converters.py
Change solve timesrc/vm_placement/solver.py
Add REST endpointsrc/vm_placement/rest_api.py
Change demo datasrc/vm_placement/demo_data.py
Change UIstatic/index.html, static/app.js

Common Constraint Patterns

Sum resource usage per server:

constraint_factory.for_each(VM)
    .filter(lambda vm: vm.server is not None)
    .group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.cpu_cores))
    .filter(lambda server, total: total > server.cpu_cores)
    .penalize(...)

Find pairs with same group:

constraint_factory.for_each_unique_pair(
    VM,
    Joiners.equal(lambda vm: vm.some_group),
    Joiners.equal(lambda vm: vm.server),  # On same server
)
.filter(lambda vm1, vm2: vm1.some_group is not None)
.penalize(...)

Count active servers:

constraint_factory.for_each(VM)
    .filter(lambda vm: vm.server is not None)
    .group_by(lambda vm: vm.server, ConstraintCollectors.count())
    .penalize(...)  # Each active server incurs cost

Penalize unassigned entities:

constraint_factory.for_each(VM)
    .filter(lambda vm: vm.server is None)
    .penalize(HardSoftScore.ONE_SOFT, lambda vm: 10000 + vm.priority * 1000)

Common Gotchas

  1. Forgot to register constraint in define_constraints() return list

    • Symptom: Constraint not enforced
  2. Using wrong score type

    • HardSoftScore.ONE_HARD for must-satisfy rules
    • HardSoftScore.ONE_SOFT for preferences
  3. Server is a problem fact, not an entity

    • Don’t add Server to entity_class_list
    • Don’t add Server to ConstraintVerifier.build()
  4. Forgetting to check for None

    • Always filter vm.server is not None before accessing server properties
  5. Score sign confusion

    • Higher soft score is better (less negative)
    • Use .reward() to add points, .penalize() to subtract
  6. Forgetting to include problem facts in tests

    • constraint_verifier.verify_that(...).given(server, vm1, vm2) — servers must be included

Debugging Tips

Enable verbose logging:

import logging
logging.basicConfig(level=logging.DEBUG)

Use the /analyze endpoint:

curl -X PUT http://localhost:8080/placements/analyze \
  -H "Content-Type: application/json" \
  -d @my_placement.json

Print in constraints (temporary debugging):

.filter(lambda vm: (
    print(f"Checking {vm.name}: server={vm.server}") or
    vm.server is not None
))

Additional Resources

5 - Meeting Scheduling

A comprehensive quickstart guide to understanding and building intelligent meeting scheduling with SolverForge

Legacy Implementation Guide

This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.

SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.


Table of Contents

  1. Introduction
  2. Getting Started
  3. The Problem We’re Solving
  4. Understanding the Data Model
  5. How Scheduling Optimization Works
  6. Writing Constraints: The Business Rules
  7. The Solver Engine
  8. Web Interface and API
  9. Making Your First Customization
  10. Advanced Constraint Patterns
  11. Testing and Validation
  12. Production Considerations
  13. Quick Reference

Introduction

What You’ll Learn

This guide walks you through a complete meeting scheduling application built with SolverForge, a constraint-based optimization framework. You’ll learn:

  • How to model complex scheduling problems with multiple resource types (time slots, rooms, people)
  • How to handle hierarchical constraints with different priority levels
  • How to balance competing objectives (minimize conflicts, pack meetings early, encourage breaks)
  • How to customize the system for your organization’s meeting policies

No optimization background required — we’ll explain concepts as we encounter them in the code.

Prerequisites

  • Basic Python knowledge (classes, functions, type annotations)
  • Familiarity with REST APIs
  • Comfort with command-line operations
  • Understanding of calendar/scheduling concepts

What is Meeting Scheduling Optimization?

Traditional approach: Manually coordinate calendars, send emails back and forth, book rooms one by one.

Meeting scheduling optimization: You describe your meetings, attendees, available rooms, and constraints — the solver automatically finds a schedule that satisfies requirements while minimizing conflicts.

Think of it like having an executive assistant who can evaluate millions of scheduling combinations per second to find arrangements that work for everyone.


Getting Started

Running the Application

  1. Navigate to the project directory:

    cd /srv/lab/dev/solverforge/solverforge-quickstarts/legacy/meeting-scheduling-fast
    
  2. Create and activate virtual environment:

    python -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. Start the server:

    run-app
    
  5. Open your browser:

    http://localhost:8080
    

You’ll see a calendar interface with meetings, rooms, and people. Click “Solve” and watch the solver automatically schedule meetings into time slots and rooms while avoiding conflicts.

File Structure Overview

src/meeting_scheduling/
├── domain.py              # Data classes (Meeting, Person, Room, TimeGrain)
├── constraints.py         # Business rules (conflicts, capacity, timing)
├── solver.py              # Solver configuration
├── demo_data.py           # Sample data generation
├── rest_api.py            # HTTP API endpoints
├── converters.py          # REST ↔ Domain model conversion
├── json_serialization.py  # JSON serialization helpers
└── score_analysis.py      # Score breakdown DTOs

static/
├── index.html             # Web UI
└── app.js                 # UI logic and visualization

tests/
├── test_constraints.py    # Unit tests for constraints
└── test_feasible.py       # Integration tests

Key insight: Most business customization happens in constraints.py. The domain model defines what can be scheduled, but constraints define what makes a good schedule.


The Problem We’re Solving

The Meeting Scheduling Challenge

You need to assign meetings to time slots and rooms while satisfying rules like:

Hard constraints (must be satisfied):

  • Rooms cannot be double-booked (no overlapping meetings in same room)
  • Required attendees cannot be double-booked
  • Meetings must fit within available time slots (no overtime)
  • Rooms must have sufficient capacity for all attendees
  • Meetings cannot span multiple days (must start and end same day)

Medium constraints (strong preferences):

  • Avoid conflicts where a person is required at one meeting and preferred at another
  • Avoid conflicts for preferred attendees

Soft constraints (optimization goals):

  • Schedule meetings as early as possible
  • Encourage breaks between consecutive meetings
  • Minimize general overlapping meetings
  • Use larger rooms first (efficient room utilization)
  • Minimize room switches for attendees

Why This is Hard

For even 10 meetings, 5 rooms, and 20 time slots per day, there are over 10 trillion possible schedules. With 24 meetings like in the demo, the possibilities become astronomical.

The challenges:

  • Resource coordination: Must simultaneously allocate time, rooms, and people
  • Conflict resolution: One assignment affects availability of all resources
  • Priority balancing: Hard constraints must hold while optimizing preferences
  • Temporal dependencies: Meeting times affect break patterns and room switch distances

Scheduling optimization algorithms use sophisticated strategies to explore this space efficiently, finding high-quality schedules in seconds.


Understanding the Data Model

Let’s examine the core classes that model our scheduling problem. Open src/meeting_scheduling/domain.py:

The Person Class

@dataclass
class Person:
    id: Annotated[str, PlanningId]
    full_name: str

What it represents: An attendee who can be required or preferred at meetings.

Key fields:

  • id: Unique identifier (the PlanningId annotation tells SolverForge this is the primary key)
  • full_name: Display name (e.g., “Amy Cole”)

Optimization concept: People are resources that can be allocated to meetings. Unlike employee scheduling where employees are assigned to shifts, here people’s attendance at meetings creates constraints but isn’t directly optimized.

The TimeGrain Class

# Time slot granularity (configurable)
GRAIN_LENGTH_IN_MINUTES = 15

@dataclass
class TimeGrain:
    grain_index: Annotated[int, PlanningId]
    day_of_year: int
    starting_minute_of_day: int
    
    @property
    def ending_minute_of_day(self) -> int:
        return self.starting_minute_of_day + GRAIN_LENGTH_IN_MINUTES

What it represents: A discrete time slot (default: 15 minutes).

Time grain granularity: The GRAIN_LENGTH_IN_MINUTES constant (defined at the top of domain.py) controls the scheduling precision. The default 15-minute grain balances precision with search space size. Smaller grains (5 minutes) offer more flexibility but slower solving. Larger grains (30 minutes) solve faster but with less scheduling flexibility.

Key fields:

  • grain_index: Sequential index across all days (0, 1, 2, … for the entire planning horizon)
  • day_of_year: Which day (1-365)
  • starting_minute_of_day: Time within the day (e.g., 480 = 8:00 AM, 540 = 9:00 AM)

Why discrete time slots?

Instead of continuous time, meetings snap to 15-minute intervals. This:

  • Simplifies conflict detection (integer comparisons)
  • Matches real-world calendar behavior
  • Reduces search space (finite number of start times)

Optimization concept: This is time discretization — converting continuous time into discrete slots. It’s a common technique in scheduling to make problems tractable.

Example time grains:

grain_index=0  → Day 1, 8:00-8:15 AM  (starting_minute_of_day=480)
grain_index=1  → Day 1, 8:15-8:30 AM  (starting_minute_of_day=495)
grain_index=39 → Day 1, 5:45-6:00 PM  (starting_minute_of_day=1065)
grain_index=40 → Day 2, 8:00-8:15 AM  (starting_minute_of_day=480)

The Room Class

@dataclass
class Room:
    id: Annotated[str, PlanningId]
    name: str
    capacity: int

What it represents: A physical meeting room.

Key fields:

  • name: Display name (e.g., “Room A”, “Conference Room”)
  • capacity: Maximum number of people it can hold

Optimization concept: Rooms are constrained resources. Each room can host at most one meeting at a time, and must be large enough for attendees.

The Meeting Class

@dataclass
class Meeting:
    id: Annotated[str, PlanningId]
    topic: str
    duration_in_grains: int
    required_attendances: list[RequiredAttendance]
    preferred_attendances: list[PreferredAttendance]
    speakers: list[Person]
    entire_group_meeting: bool

What it represents: A meeting that needs to be scheduled.

Key fields:

  • topic: Meeting subject (e.g., “Sprint Planning”, “Budget Review”)
  • duration_in_grains: Length in 15-minute slots (e.g., 8 = 2 hours)
  • required_attendances: People who must attend (hard constraint)
  • preferred_attendances: People who should attend if possible (soft constraint)
  • speakers: Optional presenter list
  • entire_group_meeting: Flag for all-hands meetings

Attendance types:

Required attendance (hard constraint):

@dataclass
class RequiredAttendance:
    id: Annotated[str, PlanningId]
    person: Person
    meeting: Meeting

The person must be available at the meeting time. Conflicts are hard constraint violations.

Preferred attendance (soft constraint):

@dataclass
class PreferredAttendance:
    id: Annotated[str, PlanningId]
    person: Person
    meeting: Meeting

The person should attend if possible, but conflicts are only soft penalties.

Optimization concept: This is hierarchical attendance — distinguishing must-have from nice-to-have attendees. It allows flexible scheduling when not everyone can attend.

The MeetingAssignment Class (Planning Entity)

@planning_entity
@dataclass
class MeetingAssignment:
    id: Annotated[str, PlanningId]
    meeting: Meeting
    starting_time_grain: Annotated[TimeGrain | None, PlanningVariable] = None
    room: Annotated[Room | None, PlanningVariable] = None
    pinned: bool = False

What it represents: A decision about when and where to hold a meeting.

Key fields:

  • meeting: Reference to the Meeting object (immutable)
  • starting_time_grain: When the meeting starts — this is a planning variable!
  • room: Where the meeting is held — this is also a planning variable!
  • pinned: If True, this assignment is fixed and won’t be changed by the solver

Annotations:

  • @planning_entity: Tells SolverForge this class contains decisions to make
  • PlanningVariable: Marks fields as decision variables

Optimization concept: Unlike employee scheduling (one variable: which employee) or vehicle routing (one list variable: which visits), this problem has two independent planning variables per entity. The solver must simultaneously decide both time and room.

Why multiple planning variables matter: Having two planning variables (time and room) per entity creates a larger search space but more flexibility. The dataclass-based domain model enables efficient evaluation of variable combinations. For architectural details on why dataclasses outperform Pydantic in constraint evaluation, see Dataclasses vs Pydantic in Constraint Solvers.

Important methods:

def get_last_time_grain_index(self) -> int:
    """Calculate when meeting ends."""
    return self.starting_time_grain.grain_index + self.meeting.duration_in_grains - 1

def calculate_overlap(self, other: 'MeetingAssignment') -> int:
    """Calculate overlap in time grains with another meeting."""
    if self.starting_time_grain is None or other.starting_time_grain is None:
        return 0
    
    start1 = self.starting_time_grain.grain_index
    end1 = self.get_last_time_grain_index()
    start2 = other.starting_time_grain.grain_index
    end2 = other.get_last_time_grain_index()
    
    # Interval intersection
    overlap_start = max(start1, start2)
    overlap_end = min(end1, end2)
    
    return max(0, overlap_end - overlap_start + 1)

This helper enables efficient overlap detection in constraints.

The MeetingSchedule Class (Planning Solution)

@planning_solution
@dataclass
class MeetingSchedule:
    day_list: Annotated[list[int], ProblemFactCollectionProperty, ValueRangeProvider]
    time_grain_list: Annotated[list[TimeGrain], ProblemFactCollectionProperty, ValueRangeProvider]
    room_list: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
    person_list: Annotated[list[Person], ProblemFactCollectionProperty]
    meeting_list: Annotated[list[Meeting], ProblemFactCollectionProperty]
    meeting_assignment_list: Annotated[list[MeetingAssignment], PlanningEntityCollectionProperty]
    score: Annotated[HardMediumSoftScore | None, PlanningScore] = None
    solver_status: SolverStatus = SolverStatus.NOT_SOLVING

What it represents: The complete scheduling problem and its solution.

Key fields:

  • time_grain_list: All available time slots (value range for starting_time_grain)
  • room_list: All available rooms (value range for room)
  • person_list: All people who might attend meetings
  • meeting_list: All meetings that need scheduling
  • meeting_assignment_list: The planning entities (what the solver optimizes)
  • score: Solution quality metric
  • solver_status: Whether solving is active

Annotations explained:

  • @planning_solution: Marks this as the top-level problem definition
  • ProblemFactCollectionProperty: Immutable input data
  • ValueRangeProvider: Collections that provide possible values for planning variables
  • PlanningEntityCollectionProperty: The entities being optimized
  • PlanningScore: Where the solver stores calculated quality

Optimization concept: The ValueRangeProvider annotations tell the solver: “When assigning starting_time_grain, choose from time_grain_list; when assigning room, choose from room_list.”


How Scheduling Optimization Works

Before diving into constraints, let’s understand the scheduling process.

The Three-Tier Scoring System

Unlike employee scheduling (Hard/Soft) or vehicle routing (Hard/Soft), meeting scheduling uses three levels:

Score format: "0hard/0medium/-1234soft"

Hard constraints (priority 1):

  • Room conflicts
  • Required attendee conflicts
  • Room capacity
  • Meetings within available time
  • Same-day constraints

Medium constraints (priority 2):

  • Required vs preferred attendee conflicts
  • Preferred attendee conflicts

Soft constraints (priority 3):

  • Schedule meetings early
  • Breaks between meetings
  • Minimize overlaps
  • Room utilization
  • Room stability

Why three tiers?

This creates a hierarchy: hard > medium > soft. A solution with 0hard/-100medium/-5000soft is better than 0hard/-50medium/-1000soft even though soft score is worse, because medium takes priority.

Optimization concept: This is lexicographic scoring with three levels instead of two. It’s useful when you have multiple categories of preferences with clear priority relationships.

The Search Process

  1. Initial solution: Often all meetings unassigned or randomly assigned
  2. Evaluate score: Calculate all constraint penalties across three tiers
  3. Make a move:
    • Change a meeting’s time slot
    • Change a meeting’s room
    • Swap two meetings’ times or rooms
  4. Re-evaluate score (incrementally)
  5. Accept if improvement (considering all three score levels)
  6. Repeat millions of times
  7. Return best solution found

Move types specific to meeting scheduling:

  • Change time: Move meeting to different time slot
  • Change room: Move meeting to different room
  • Change both: Simultaneously change time and room
  • Swap times: Exchange times of two meetings
  • Swap rooms: Exchange rooms of two meetings

Why Multiple Planning Variables Matter

Having two planning variables (time and room) per entity creates interesting dynamics:

Independent optimization: The solver can change time without changing room, or vice versa.

Coordinated moves: Sometimes changing both together is better than changing separately.

Search space: With T time slots and R rooms, each meeting has T × R possible assignments (much larger than T or R alone).


Writing Constraints: The Business Rules

Now the heart of the system. Open src/meeting_scheduling/constraints.py.

The Constraint Provider Pattern

All constraints are registered in one function:

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
    return [
        # Hard constraints
        room_conflict(constraint_factory),
        avoid_overtime(constraint_factory),
        required_attendance_conflict(constraint_factory),
        required_room_capacity(constraint_factory),
        start_and_end_on_same_day(constraint_factory),
        
        # Medium constraints
        required_and_preferred_attendance_conflict(constraint_factory),
        preferred_attendance_conflict(constraint_factory),
        
        # Soft constraints
        do_meetings_as_soon_as_possible(constraint_factory),
        one_break_between_consecutive_meetings(constraint_factory),
        overlapping_meetings(constraint_factory),
        assign_larger_rooms_first(constraint_factory),
        room_stability(constraint_factory),
    ]

Let’s examine each constraint category.


Hard Constraints

Hard Constraint: Room Conflict

Business rule: “No two meetings can use the same room at overlapping times.”

def room_conflict(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each_unique_pair(
            MeetingAssignment,
            Joiners.equal(lambda meeting_assignment: meeting_assignment.room),
        )
        .filter(lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2) > 0)
        .penalize(
            HardMediumSoftScore.ONE_HARD,
            lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2)
        )
        .as_constraint("Room conflict")
    )

How to read this:

  1. for_each_unique_pair(MeetingAssignment, ...): Create pairs of meeting assignments
  2. Joiners.equal(...): Only pair meetings assigned to the same room
  3. .filter(...): Keep only pairs that overlap in time
  4. .penalize(ONE_HARD, ...): Penalize by number of overlapping time grains

Example scenario:

Room A, Time grains 0-11 (8:00 AM - 11:00 AM):

  • Meeting 1: Time grains 0-7 (8:00-10:00 AM, 2 hours)
  • Meeting 2: Time grains 4-11 (9:00-11:00 AM, 2 hours)
  • Overlap: Time grains 4-7 (9:00-10:00 AM) = 4 grains → Penalty: 4 hard points

Optimization concept: This is a resource conflict constraint. The resource (room) has limited capacity (one meeting at a time), and the penalty is proportional to the conflict severity (overlap duration).

Hard Constraint: Avoid Overtime

Business rule: “Meetings cannot extend beyond available time slots.”

def avoid_overtime(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .filter(lambda meeting_assignment: 
            meeting_assignment.get_last_time_grain_index() >= len(time_grain_list))
        .penalize(
            HardMediumSoftScore.ONE_HARD,
            lambda meeting_assignment: 
                meeting_assignment.get_last_time_grain_index() - len(time_grain_list) + 1
        )
        .as_constraint("Don't go in overtime")
    )

How to read this:

  1. for_each(MeetingAssignment): Consider every meeting
  2. .filter(...): Keep meetings that end beyond the last available time grain
  3. .penalize(...): Penalize by how far past the boundary

Example scenario:

Time grains available: 0-155 (156 total grains = 4 days × 39 grains/day)

  • Meeting starts at grain 150, duration 8 grains
  • Ends at grain 157 (150 + 8 - 1 = 157)
  • Overtime: 157 - 155 = 2 grains → Penalty: 2 hard points

Note: This constraint assumes time_grain_list is available in the scope. In practice, it’s passed via closure or accessed from the solution object.

Hard Constraint: Required Attendance Conflict

Business rule: “Required attendees cannot be double-booked.”

def required_attendance_conflict(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(RequiredAttendance)
        .join(
            RequiredAttendance,
            Joiners.equal(lambda attendance: attendance.person),
            Joiners.less_than(lambda attendance: attendance.id)
        )
        .filter(
            lambda attendance1, attendance2:
                attendance1.meeting.meeting_assignment.calculate_overlap(
                    attendance2.meeting.meeting_assignment
                ) > 0
        )
        .penalize(
            HardMediumSoftScore.ONE_HARD,
            lambda attendance1, attendance2:
                attendance1.meeting.meeting_assignment.calculate_overlap(
                    attendance2.meeting.meeting_assignment
                )
        )
        .as_constraint("Required attendance conflict")
    )

How to read this:

  1. for_each(RequiredAttendance): Consider every required attendance
  2. .join(RequiredAttendance, ...): Pair with other required attendances
  3. Joiners.equal(...): Only pair attendances for the same person
  4. Joiners.less_than(...): Ensure each pair counted once (ordering by ID)
  5. .filter(...): Keep pairs where meetings overlap in time
  6. .penalize(...): Penalize by overlap duration

Example scenario:

Person “Amy Cole” is required at:

  • Meeting A: Time grains 0-7 (8:00-10:00 AM)
  • Meeting B: Time grains 6-13 (8:30-10:30 AM)
  • Overlap: Time grains 6-7 (9:30-10:00 AM) = 2 grains → Penalty: 2 hard points

Optimization concept: This is a person-centric conflict constraint. Unlike room conflicts (resource conflict), this is about a person’s availability (capacity of 1 meeting at a time).

Why join on RequiredAttendance instead of MeetingAssignment?

By joining on attendance records, we automatically filter to only the meetings each person is required at. This is more efficient than checking all meeting pairs and then filtering by attendees.

Hard Constraint: Required Room Capacity

Business rule: “Rooms must have enough capacity for all attendees (required + preferred).”

def required_room_capacity(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .filter(
            lambda meeting_assignment:
                len(meeting_assignment.meeting.required_attendances) +
                len(meeting_assignment.meeting.preferred_attendances) >
                meeting_assignment.room.capacity
        )
        .penalize(
            HardMediumSoftScore.ONE_HARD,
            lambda meeting_assignment:
                len(meeting_assignment.meeting.required_attendances) +
                len(meeting_assignment.meeting.preferred_attendances) -
                meeting_assignment.room.capacity
        )
        .as_constraint("Required room capacity")
    )

How to read this:

  1. for_each(MeetingAssignment): Consider every assigned meeting
  2. .filter(...): Keep meetings where attendee count exceeds room capacity
  3. .penalize(...): Penalize by the capacity shortage

Example scenario:

Meeting has:

  • Required attendees: 8 people
  • Preferred attendees: 4 people
  • Total: 12 people

Assigned to Room B (capacity 10):

  • Shortage: 12 - 10 = 2 people → Penalty: 2 hard points

Design choice: Count both required and preferred attendees. You could alternatively only count required attendees (and make preferred overflow a soft constraint).

Hard Constraint: Start and End on Same Day

Business rule: “Meetings cannot span multiple days.”

def start_and_end_on_same_day(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .join(
            TimeGrain,
            Joiners.equal(
                lambda meeting_assignment: meeting_assignment.get_last_time_grain_index(),
                lambda time_grain: time_grain.grain_index
            )
        )
        .filter(
            lambda meeting_assignment, last_time_grain:
                meeting_assignment.starting_time_grain.day_of_year != 
                last_time_grain.day_of_year
        )
        .penalize(HardMediumSoftScore.ONE_HARD)
        .as_constraint("Start and end on same day")
    )

How to read this:

  1. for_each(MeetingAssignment): All meetings
  2. .join(TimeGrain, ...): Join with the time grain where meeting ends
  3. .filter(...): Keep meetings where start day ≠ end day
  4. .penalize(ONE_HARD): Simple binary penalty

Example scenario:

Meeting starts at:

  • Time grain 35 (day 1, 5:15 PM, starting_minute_of_day = 1035)
  • Duration: 8 grains (2 hours)
  • Ends at grain 42 (day 2, 8:00 AM)
  • Start day (1) ≠ End day (2) → Penalty: 1 hard point

Optimization concept: This enforces a temporal boundary constraint. Meetings respect day boundaries, which is realistic for most organizations.


Medium Constraints

Medium constraints sit between hard (must satisfy) and soft (nice to have). They represent strong preferences that should rarely be violated.

Medium Constraint: Required and Preferred Attendance Conflict

Business rule: “Strongly discourage conflicts where a person is required at one meeting and preferred at another.”

def required_and_preferred_attendance_conflict(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(RequiredAttendance)
        .join(
            PreferredAttendance,
            Joiners.equal(
                lambda required: required.person,
                lambda preferred: preferred.person
            )
        )
        .filter(
            lambda required, preferred:
                required.meeting.meeting_assignment.calculate_overlap(
                    preferred.meeting.meeting_assignment
                ) > 0
        )
        .penalize(
            HardMediumSoftScore.ONE_MEDIUM,
            lambda required, preferred:
                required.meeting.meeting_assignment.calculate_overlap(
                    preferred.meeting.meeting_assignment
                )
        )
        .as_constraint("Required and preferred attendance conflict")
    )

How to read this:

  1. for_each(RequiredAttendance): All required attendances
  2. .join(PreferredAttendance, ...): Pair with preferred attendances for same person
  3. .filter(...): Keep pairs where meetings overlap
  4. .penalize(ONE_MEDIUM, ...): Medium-level penalty by overlap

Example scenario:

Person “Bob Smith”:

  • Required at Meeting A: Time grains 0-7
  • Preferred at Meeting B: Time grains 4-11
  • Overlap: 4 grains → Penalty: 4 medium points

Why medium instead of hard?

This is a degraded service scenario. The person can’t attend the preferred meeting (unfortunate) but can fulfill their required attendance (essential). It’s not ideal but acceptable.

Medium Constraint: Preferred Attendance Conflict

Business rule: “Discourage conflicts between two preferred attendances for the same person.”

def preferred_attendance_conflict(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(PreferredAttendance)
        .join(
            PreferredAttendance,
            Joiners.equal(lambda attendance: attendance.person),
            Joiners.less_than(lambda attendance: attendance.id)
        )
        .filter(
            lambda attendance1, attendance2:
                attendance1.meeting.meeting_assignment.calculate_overlap(
                    attendance2.meeting.meeting_assignment
                ) > 0
        )
        .penalize(
            HardMediumSoftScore.ONE_MEDIUM,
            lambda attendance1, attendance2:
                attendance1.meeting.meeting_assignment.calculate_overlap(
                    attendance2.meeting.meeting_assignment
                )
        )
        .as_constraint("Preferred attendance conflict")
    )

Similar to required attendance conflict but for preferred attendees.

Why medium instead of soft?

Preferred attendees are still important — just not critical. Medium priority expresses “try hard to avoid this” without making it a hard requirement.


Soft Constraints

Soft constraints represent optimization goals and nice-to-have preferences.

Soft Constraint: Schedule Meetings Early

Business rule: “Prefer scheduling meetings earlier in the day/week rather than later.”

def do_meetings_as_soon_as_possible(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .penalize(
            HardMediumSoftScore.ONE_SOFT,
            lambda meeting_assignment: meeting_assignment.get_last_time_grain_index()
        )
        .as_constraint("Do all meetings as soon as possible")
    )

How to read this:

  1. for_each(MeetingAssignment): All meetings
  2. .penalize(ONE_SOFT, ...): Penalize by the ending time grain index

Why penalize by end time?

The later a meeting ends, the higher the penalty. This naturally pushes meetings toward earlier time slots.

Example scenarios:

  • Meeting ends at grain 10 → Penalty: 10 soft points
  • Meeting ends at grain 50 → Penalty: 50 soft points
  • Meeting ends at grain 100 → Penalty: 100 soft points

The solver will prefer the first meeting’s timing.

Alternative formulation:

You could penalize by start time instead:

.penalize(ONE_SOFT, lambda ma: ma.starting_time_grain.grain_index)

Penalizing by end time accounts for both start time and duration, which can be more balanced.

Soft Constraint: Breaks Between Meetings

Business rule: “Encourage at least one 15-minute break between consecutive meetings.”

def one_break_between_consecutive_meetings(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .join(
            MeetingAssignment,
            Joiners.less_than(
                lambda meeting: meeting.get_last_time_grain_index(),
                lambda meeting: meeting.starting_time_grain.grain_index
            )
        )
        .filter(
            lambda meeting1, meeting2:
                meeting1.get_last_time_grain_index() + 1 ==
                meeting2.starting_time_grain.grain_index
        )
        .penalize(HardMediumSoftScore.of_soft(100))
        .as_constraint("One time grain break between two consecutive meetings")
    )

How to read this:

  1. for_each(MeetingAssignment): All meetings
  2. .join(MeetingAssignment, ...): Pair with meetings that start after this one ends
  3. .filter(...): Keep pairs that are back-to-back (no gap)
  4. .penalize(100 soft): Fixed penalty for consecutive meetings

Example scenario:

  • Meeting A: Time grains 0-7 (ends at grain 7)
  • Meeting B: Time grains 8-15 (starts at grain 8)
  • Back-to-back: 7 + 1 == 8Penalty: 100 soft points

Why not check for shared attendees?

This constraint applies globally — any consecutive meetings are discouraged. You could enhance it to only penalize when attendees overlap:

.filter(lambda m1, m2: has_shared_attendees(m1, m2))

Soft Constraint: Minimize Overlapping Meetings

Business rule: “Generally discourage overlapping meetings, even in different rooms.”

def overlapping_meetings(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each_unique_pair(
            MeetingAssignment,
            Joiners.less_than(lambda meeting: meeting.id)
        )
        .filter(lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2) > 0)
        .penalize(
            HardMediumSoftScore.of_soft(10),
            lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2)
        )
        .as_constraint("Overlapping meetings")
    )

How to read this:

  1. for_each_unique_pair(MeetingAssignment, ...): All pairs of meetings
  2. .filter(...): Keep overlapping pairs
  3. .penalize(10 soft, ...): Penalize by overlap × 10

Why discourage overlaps in different rooms?

This creates a temporal spread of meetings. Benefits:

  • Reduces hallway congestion
  • Easier to find substitute attendees
  • Better utilization of time slots

Weight tuning: The penalty weight (10) can be adjusted based on preference. Higher values more strongly discourage overlaps.

Soft Constraint: Assign Larger Rooms First

Business rule: “Prefer using larger rooms over smaller rooms.”

def assign_larger_rooms_first(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .join(
            Room,
            Joiners.greater_than(
                lambda meeting_assignment: meeting_assignment.room.capacity,
                lambda room: room.capacity
            )
        )
        .penalize(
            HardMediumSoftScore.ONE_SOFT,
            lambda meeting_assignment, room:
                room.capacity - meeting_assignment.room.capacity
        )
        .as_constraint("Assign larger rooms first")
    )

How to read this:

  1. for_each(MeetingAssignment): All meetings
  2. .join(Room, ...): Join with rooms larger than the assigned room
  3. .penalize(ONE_SOFT, ...): Penalize by capacity difference

Example scenario:

Available rooms: 30, 20, 16 capacity

Meeting assigned to Room B (capacity 20):

  • Room A (capacity 30) exists and is larger
  • Penalty: 30 - 20 = 10 soft points

If all larger rooms are used, no penalty.

Why prefer larger rooms?

This implements conservative resource allocation — use larger rooms by default, save smaller rooms for when larger ones are occupied. This maximizes flexibility.

Alternative approach: You could prefer smaller rooms that fit (minimize waste):

# Prefer smallest room that fits
.filter(lambda ma: ma.room.capacity >= required_capacity)
.reward(ONE_SOFT, lambda ma: ma.room.capacity)

The choice depends on your organization’s room utilization patterns.

Soft Constraint: Room Stability

Business rule: “Encourage attendees to stay in the same room for nearby meetings.”

This constraint handles both required and preferred attendees using concat() to combine both attendance types into a single stream:

def room_stability(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Encourages room stability for people attending multiple meetings.

    Penalizes when a person attends meetings in different rooms that are close in time,
    encouraging room stability. This handles both required and preferred attendees by
    creating separate constraint streams that are combined.

    Since Python doesn't have a common Attendance base class for RequiredAttendance
    and PreferredAttendance, we use concat() to combine both attendance types into
    a single stream.
    """
    # Create a stream that combines both required and preferred attendances
    return (
        constraint_factory.for_each(RequiredAttendance)
        .map(lambda ra: (ra.person, ra.meeting_id))
        .concat(
            constraint_factory.for_each(PreferredAttendance)
            .map(lambda pa: (pa.person, pa.meeting_id))
        )
        .join(
            constraint_factory.for_each(RequiredAttendance)
            .map(lambda ra: (ra.person, ra.meeting_id))
            .concat(
                constraint_factory.for_each(PreferredAttendance)
                .map(lambda pa: (pa.person, pa.meeting_id))
            ),
            Joiners.equal(
                lambda left: left[0],  # person
                lambda right: right[0],  # person
            ),
            Joiners.filtering(
                lambda left, right: left[1] != right[1]  # different meeting_id
            ),
        )
        .join(
            MeetingAssignment,
            Joiners.equal(
                lambda left, right: left[1],  # left.meeting_id
                lambda assignment: assignment.meeting.id,
            ),
        )
        .join(
            MeetingAssignment,
            Joiners.equal(
                lambda left, right, left_assignment: right[1],  # right.meeting_id
                lambda assignment: assignment.meeting.id,
            ),
            Joiners.less_than(
                lambda left, right, left_assignment: left_assignment.get_grain_index(),
                lambda assignment: assignment.get_grain_index(),
            ),
            Joiners.filtering(
                lambda left, right, left_assignment, right_assignment:
                    left_assignment.room != right_assignment.room
            ),
            Joiners.filtering(
                lambda left, right, left_assignment, right_assignment:
                    right_assignment.get_grain_index()
                    - left_assignment.meeting.duration_in_grains
                    - left_assignment.get_grain_index()
                    <= 2
            ),
        )
        .penalize(HardMediumSoftScore.ONE_SOFT)
        .as_constraint("Room stability")
    )

How to read this:

  1. Create a combined stream of (person, meeting_id) tuples from both RequiredAttendance and PreferredAttendance using concat()
  2. Self-join on same person, different meetings
  3. Join to MeetingAssignment to get time and room for left meeting
  4. Join to MeetingAssignment to get time and room for right meeting
  5. Filter: different rooms and close in time (within 2 grains gap)
  6. .penalize(ONE_SOFT): Simple penalty for room switches

Why use concat()?

Python doesn’t have a common base class for RequiredAttendance and PreferredAttendance. The concat() method combines two constraint streams into one, allowing us to treat both attendance types uniformly.

Example scenario:

Person “Carol Johnson”:

  • Required at Meeting A at 9:00 AM in Room A
  • Preferred at Meeting B at 9:30 AM in Room B (different room, close in time)
  • Penalty: 1 soft point (room switch)

If Meeting B were also in Room A, no penalty.

Optimization concept: This is a locality constraint — encouraging spatial proximity for temporally close activities. It reduces attendee movement for both required and preferred attendees.

Time threshold: The <= 2 filter means within 2 time grains gap after the first meeting ends. Adjust this based on building size and walking times.


The Solver Engine

Now let’s see how the solver is configured. Open src/meeting_scheduling/solver.py:

solver_config = SolverConfig(
    solution_class=MeetingSchedule,
    entity_class_list=[MeetingAssignment],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)

solver_manager = SolverManager.create(solver_config)
solution_manager = SolutionManager.create(solver_manager)

Configuration Breakdown

solution_class: MeetingSchedule (the top-level planning solution)

entity_class_list: [MeetingAssignment] (the planning entities with variables)

score_director_factory_config: Links to define_constraints function

termination_config: Stops after 30 seconds

Multiple Planning Variables

Unlike simpler problems with one variable per entity, MeetingAssignment has two planning variables:

  • starting_time_grain (when)
  • room (where)

The solver must optimize both simultaneously. This creates:

More complex search space: T × R possible combinations per meeting

More move types: Can change time, change room, or change both

Better flexibility: Can optimize time and room independently

Optimization concept: This is multi-variable planning. The solver uses specialized move selectors that understand how to efficiently explore both variables.

SolverManager: Asynchronous Solving

Meeting scheduling can take time for large problems. SolverManager enables non-blocking solving:

# Start solving (returns immediately)
solver_manager.solve_and_listen(job_id, schedule, callback_function)

# Check status
status = solver_manager.get_solver_status(job_id)

# Get current best solution (updates live)
solution = solver_manager.get_final_best_solution(job_id)

# Stop early
solver_manager.terminate_early(job_id)

SolutionManager: Score Analysis

The solution_manager provides detailed score breakdowns:

# Analyze solution
analysis = solution_manager.analyze(schedule)

# See which constraints fired
for constraint_analysis in analysis.constraint_analyses:
    print(f"{constraint_analysis.name}: {constraint_analysis.score}")
    for match in constraint_analysis.matches:
        print(f"  {match.justification}")

This shows exactly which constraints are violated and by how much — invaluable for debugging.

Solving Timeline

Small problems (10-15 meetings, 2-3 rooms, 2 days):

  • Initial feasible solution: < 1 second
  • Good solution: 5-10 seconds
  • High-quality: 30 seconds

Medium problems (20-30 meetings, 3-5 rooms, 4 days):

  • Initial feasible solution: 1-5 seconds
  • Good solution: 30-60 seconds
  • High-quality: 2-5 minutes

Large problems (50+ meetings, 5+ rooms, 5+ days):

  • Initial feasible solution: 5-30 seconds
  • Good solution: 5-10 minutes
  • High-quality: 15-30 minutes

Factors affecting speed:

  • Number of meetings (primary factor)
  • Number of attendees per meeting (affects conflict constraints)
  • Time grain granularity (finer = more options = slower)
  • Constraint complexity

Web Interface and API

REST API Endpoints

Open src/meeting_scheduling/rest_api.py to see the API. It runs on port 8080.

GET /demo-data

Returns generated demo data:

Response:

{
  "dayList": [1, 2, 3, 4],
  "timeGrainList": [
    {"grainIndex": 0, "dayOfYear": 1, "startingMinuteOfDay": 480},
    {"grainIndex": 1, "dayOfYear": 1, "startingMinuteOfDay": 495},
    ...
  ],
  "roomList": [
    {"id": "room_0", "name": "Room 0", "capacity": 30},
    {"id": "room_1", "name": "Room 1", "capacity": 20},
    {"id": "room_2", "name": "Room 2", "capacity": 16}
  ],
  "personList": [
    {"id": "person_0", "fullName": "Amy Cole"},
    ...
  ],
  "meetingList": [
    {
      "id": "meeting_0",
      "topic": "Strategize B2B",
      "durationInGrains": 8,
      "requiredAttendances": [...],
      "preferredAttendances": [...],
      "entireGroupMeeting": false
    },
    ...
  ],
  "meetingAssignmentList": [
    {
      "id": "assignment_0",
      "meeting": "meeting_0",
      "startingTimeGrain": null,
      "room": null,
      "pinned": false
    },
    ...
  ]
}

Demo data specs:

  • 4 days
  • 156 time grains (39 per day, 8 AM - 6 PM)
  • 3 rooms (capacities 30, 20, 16)
  • 20 people
  • 24 meetings

POST /schedules

Submit a schedule for solving:

Request body: Same format as demo data

Response: Job ID

"a1b2c3d4-e5f6-7890-abcd-ef1234567890"

Implementation:

@app.post("/schedules")
async def solve(schedule: MeetingScheduleModel) -> str:
    job_id = str(uuid4())
    meeting_schedule = model_to_schedule(schedule)
    data_sets[job_id] = meeting_schedule
    
    solver_manager.solve_and_listen(
        job_id,
        meeting_schedule,
        lambda solution: update_solution(job_id, solution)
    )
    
    return job_id

The solver runs in the background, continuously updating the best solution.

GET /schedules

List all active job IDs:

Response:

["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "b2c3d4e5-f6a7-8901-bcde-f23456789012"]

GET /schedules/{schedule_id}

Get current solution:

Response (while solving):

{
  "meetingAssignmentList": [
    {
      "id": "assignment_0",
      "meeting": "meeting_0",
      "startingTimeGrain": {"grainIndex": 5, ...},
      "room": {"id": "room_1", ...}
    },
    ...
  ],
  "score": "0hard/-150medium/-8945soft",
  "solverStatus": "SOLVING_ACTIVE"
}

Response (finished):

{
  "score": "0hard/0medium/-6234soft",
  "solverStatus": "NOT_SOLVING"
}

GET /schedules/{problem_id}/status

Lightweight status check (doesn’t return full solution):

Response:

{
  "score": "0hard/0medium/-6234soft",
  "solverStatus": "NOT_SOLVING"
}

DELETE /schedules/{problem_id}

Stop solving early:

@app.delete("/schedules/{problem_id}")
async def stop_solving(problem_id: str) -> None:
    solver_manager.terminate_early(problem_id)

Returns best solution found so far.

PUT /schedules/analyze

Analyze a solution’s score:

Request body: Complete schedule with assignments

Response:

{
  "score": "-2hard/-50medium/-8945soft",
  "constraints": [
    {
      "name": "Room conflict",
      "score": "-2hard/0medium/0soft",
      "matches": [
        {
          "justification": "Room room_1: Meeting A (grains 5-12) overlaps Meeting B (grains 10-17) by 2 grains",
          "indictedObjects": ["assignment_5", "assignment_12"]
        }
      ]
    },
    {
      "name": "Required attendance conflict",
      "score": "0hard/-50medium/0soft",
      "matches": [...]
    }
  ]
}

This endpoint is extremely useful for understanding why a solution has a particular score.

Web UI Flow

The static/app.js implements this workflow:

  1. Load demo dataGET /demo-data
  2. Display unscheduled meetings and available resources
  3. User clicks “Solve”POST /schedules (get job ID)
  4. Poll GET /schedules/{id}/status every 2 seconds
  5. Update visualization with current assignments
  6. When solverStatus === "NOT_SOLVING" → Stop polling
  7. Display final schedule in timeline view

Visualization features:

  • Timeline view by room or by person
  • Color-coded meetings (by topic or priority)
  • Hover details (attendees, time, room)
  • Unassigned meetings highlighted
  • Score breakdown panel
  • Constraint analysis tab

Making Your First Customization

Let’s add a new constraint step-by-step.

Scenario: Limit Meetings Per Day

New business rule: “No more than 5 meetings should be scheduled on any single day.”

This is a soft constraint (preference, not requirement).

Step 1: Open constraints.py

Navigate to src/meeting_scheduling/constraints.py.

Step 2: Write the Constraint Function

Add this function:

def max_meetings_per_day(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Discourage having more than 5 meetings on the same day.
    """
    MAX_MEETINGS_PER_DAY = 5
    
    return (
        constraint_factory.for_each(MeetingAssignment)
        .group_by(
            lambda meeting: meeting.starting_time_grain.day_of_year,
            ConstraintCollectors.count()
        )
        .filter(lambda day, count: count > MAX_MEETINGS_PER_DAY)
        .penalize(
            HardMediumSoftScore.ONE_SOFT,
            lambda day, count: (count - MAX_MEETINGS_PER_DAY) * 100
        )
        .as_constraint("Max meetings per day")
    )

How this works:

  1. Group meetings by day
  2. Count meetings per day
  3. Filter to days exceeding 5 meetings
  4. Penalize by excess × 100

Example:

  • Day 1: 7 meetings → Excess: 2 → Penalty: 200 soft points
  • Day 2: 4 meetings → No penalty
  • Day 3: 6 meetings → Excess: 1 → Penalty: 100 soft points

Step 3: Register the Constraint

Add to define_constraints:

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
    return [
        # ... existing constraints ...
        # Soft constraints
        do_meetings_as_soon_as_possible(constraint_factory),
        one_break_between_consecutive_meetings(constraint_factory),
        overlapping_meetings(constraint_factory),
        assign_larger_rooms_first(constraint_factory),
        room_stability(constraint_factory),
        max_meetings_per_day(constraint_factory),  # ← Add here
    ]

Step 4: Test It

  1. Restart the server:

    run-app
    
  2. Load demo data and solve:

    • Open http://localhost:8080
    • Click “Solve”
    • Check meeting distribution across days
  3. Verify meetings are more evenly spread across days

Testing tip: To see the effect more clearly, increase the penalty weight (e.g., × 1000 instead of × 100) or lower the threshold to 3 meetings per day.

Step 5: Add Unit Test

Create a test in tests/test_constraints.py:

def test_max_meetings_per_day():
    """Test that exceeding 5 meetings per day creates penalty."""
    from meeting_scheduling.constraints import max_meetings_per_day
    
    # Create 6 meetings on the same day
    time_grain_day1 = TimeGrain(grain_index=0, day_of_year=1, starting_minute_of_day=480)
    
    meetings = []
    for i in range(6):
        meeting = Meeting(
            id=f"meeting_{i}",
            topic=f"Meeting {i}",
            duration_in_grains=4,
            required_attendances=[],
            preferred_attendances=[],
            speakers=[],
            entire_group_meeting=False
        )
        assignment = MeetingAssignment(
            id=f"assignment_{i}",
            meeting=meeting,
            starting_time_grain=time_grain_day1,
            room=test_room
        )
        meetings.append(assignment)
    
    # Should penalize by (6 - 5) × 100 = 100
    constraint_verifier.verify_that(max_meetings_per_day) \
        .given(*meetings) \
        .penalizes_by(100)

Run with:

pytest tests/test_constraints.py::test_max_meetings_per_day -v

Advanced Constraint Patterns

Pattern 1: Minimum Meeting Spacing

Scenario: Require at least 30 minutes between any two meetings (not just consecutive ones).

def minimum_meeting_spacing(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Encourage 30-minute spacing between all meetings.
    """
    MIN_SPACING_GRAINS = 2  # 30 minutes
    
    return (
        constraint_factory.for_each_unique_pair(
            MeetingAssignment,
            Joiners.less_than(lambda m: m.id)
        )
        .filter(lambda m1, m2: 
            abs(m1.starting_time_grain.grain_index - 
                m2.starting_time_grain.grain_index) < MIN_SPACING_GRAINS and
            m1.calculate_overlap(m2) == 0  # Not overlapping
        )
        .penalize(
            HardMediumSoftScore.ONE_SOFT,
            lambda m1, m2: MIN_SPACING_GRAINS - 
                abs(m1.starting_time_grain.grain_index - 
                    m2.starting_time_grain.grain_index)
        )
        .as_constraint("Minimum meeting spacing")
    )

Pattern 2: Preferred Time Slots

Scenario: Some meetings should preferably be in the morning (before noon).

First, add a field to Meeting:

@dataclass
class Meeting:
    # ... existing fields ...
    preferred_time: str = "anytime"  # "morning", "afternoon", "anytime"

Then the constraint:

def preferred_time_slot(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Honor preferred time slots.
    """
    NOON_MINUTE = 12 * 60  # 720 minutes
    
    return (
        constraint_factory.for_each(MeetingAssignment)
        .filter(lambda ma: ma.meeting.preferred_time == "morning")
        .filter(lambda ma: 
            ma.starting_time_grain.starting_minute_of_day >= NOON_MINUTE)
        .penalize(
            HardMediumSoftScore.of_soft(500),
            lambda ma: 
                ma.starting_time_grain.starting_minute_of_day - NOON_MINUTE
        )
        .as_constraint("Preferred time slot")
    )

Penalty increases with how far past noon the meeting is scheduled.

Pattern 3: VIP Attendee Priority

Scenario: Meetings with executives should get preferred time slots and rooms.

Add a field to Person:

@dataclass
class Person:
    id: str
    full_name: str
    is_vip: bool = False

Then prioritize their meetings:

def vip_meeting_priority(constraint_factory: ConstraintFactory):
    """
    Soft constraint: VIP meetings scheduled early with best rooms.
    """
    return (
        constraint_factory.for_each(MeetingAssignment)
        .join(
            RequiredAttendance,
            Joiners.equal(
                lambda ma: ma.meeting,
                lambda att: att.meeting
            )
        )
        .filter(lambda ma, att: att.person.is_vip)
        .penalize(
            HardMediumSoftScore.of_soft(10),
            lambda ma, att: ma.starting_time_grain.grain_index
        )
        .as_constraint("VIP meeting priority")
    )

This penalizes later times more for VIP meetings, pushing them earlier.

Pattern 4: Recurring Meeting Consistency

Scenario: Recurring meetings should be at the same time each day.

Add a field to identify recurring meetings:

@dataclass
class Meeting:
    # ... existing fields ...
    recurrence_group: Optional[str] = None  # "weekly-standup", "daily-sync", etc.

Then enforce consistency:

def recurring_meeting_consistency(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Recurring meetings at same time each occurrence.
    """
    return (
        constraint_factory.for_each(MeetingAssignment)
        .filter(lambda ma: ma.meeting.recurrence_group is not None)
        .join(
            MeetingAssignment,
            Joiners.equal(
                lambda ma1: ma1.meeting.recurrence_group,
                lambda ma2: ma2.meeting.recurrence_group
            ),
            Joiners.less_than(lambda ma: ma.id)
        )
        .filter(lambda ma1, ma2:
            ma1.starting_time_grain.starting_minute_of_day !=
            ma2.starting_time_grain.starting_minute_of_day
        )
        .penalize(
            HardMediumSoftScore.ONE_SOFT,
            lambda ma1, ma2:
                abs(ma1.starting_time_grain.starting_minute_of_day -
                    ma2.starting_time_grain.starting_minute_of_day)
        )
        .as_constraint("Recurring meeting consistency")
    )

Pattern 5: Department Room Preference

Scenario: Departments prefer certain rooms (closer to their area).

Add department info:

@dataclass
class Person:
    # ... existing fields ...
    department: str = "General"

@dataclass
class Room:
    # ... existing fields ...
    preferred_department: Optional[str] = None

Then reward matches:

def department_room_preference(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Assign rooms preferred by attendees' departments.
    """
    return (
        constraint_factory.for_each(MeetingAssignment)
        .join(
            RequiredAttendance,
            Joiners.equal(
                lambda ma: ma.meeting,
                lambda att: att.meeting
            )
        )
        .filter(lambda ma, att:
            ma.room.preferred_department is not None and
            ma.room.preferred_department == att.person.department
        )
        .reward(HardMediumSoftScore.of_soft(50))
        .as_constraint("Department room preference")
    )

Each department match adds 50 soft points (reward).


Testing and Validation

Unit Testing Constraints

Best practice: Test constraints in isolation.

Open tests/test_constraints.py to see examples:

from meeting_scheduling.domain import *
from meeting_scheduling.constraints import define_constraints
from solverforge_legacy.test import ConstraintVerifier

# Create verifier
constraint_verifier = ConstraintVerifier.build(
    define_constraints,
    MeetingSchedule,
    MeetingAssignment
)

Example: Test Room Conflict

def test_room_conflict_penalized():
    """Two meetings in same room at overlapping times should penalize."""
    
    room = Room(id="room1", name="Room 1", capacity=20)
    
    # Meeting 1: Grains 0-7 (2 hours)
    meeting1 = create_test_meeting(id="m1", duration=8)
    assignment1 = MeetingAssignment(
        id="a1",
        meeting=meeting1,
        starting_time_grain=TimeGrain(0, 1, 480),
        room=room
    )
    
    # Meeting 2: Grains 5-12 (overlaps grains 5-7 = 3 grains)
    meeting2 = create_test_meeting(id="m2", duration=8)
    assignment2 = MeetingAssignment(
        id="a2",
        meeting=meeting2,
        starting_time_grain=TimeGrain(5, 1, 555),
        room=room
    )
    
    # Verify penalty of 3 hard points (overlap duration)
    constraint_verifier.verify_that(room_conflict) \
        .given(assignment1, assignment2) \
        .penalizes_by(3)

Example: Test No Conflict

def test_room_conflict_not_penalized():
    """Meetings in same room without overlap should not penalize."""
    
    room = Room(id="room1", name="Room 1", capacity=20)
    
    # Meeting 1: Grains 0-7
    assignment1 = MeetingAssignment(
        id="a1",
        meeting=create_test_meeting(id="m1", duration=8),
        starting_time_grain=TimeGrain(0, 1, 480),
        room=room
    )
    
    # Meeting 2: Grains 10-17 (no overlap)
    assignment2 = MeetingAssignment(
        id="a2",
        meeting=create_test_meeting(id="m2", duration=8),
        starting_time_grain=TimeGrain(10, 1, 630),
        room=room
    )
    
    # No penalty
    constraint_verifier.verify_that(room_conflict) \
        .given(assignment1, assignment2) \
        .penalizes_by(0)

Helper function:

def create_test_meeting(id: str, duration: int) -> Meeting:
    """Create a minimal meeting for testing."""
    return Meeting(
        id=id,
        topic=f"Test Meeting {id}",
        duration_in_grains=duration,
        required_attendances=[],
        preferred_attendances=[],
        speakers=[],
        entire_group_meeting=False
    )

Run tests:

pytest tests/test_constraints.py -v

Integration Testing: Full Solve

Test the complete solving cycle in tests/test_feasible.py:

def test_feasible():
    """Test that solver finds feasible solution for demo data."""
    
    # Get demo problem
    schedule = generate_demo_data()
    
    # Verify initially unassigned
    assert all(ma.starting_time_grain is None for ma in schedule.meeting_assignment_list)
    assert all(ma.room is None for ma in schedule.meeting_assignment_list)
    
    # Solve
    job_id = "test-feasible"
    solver_manager.solve(job_id, schedule)
    
    # Wait for completion
    timeout = 120  # 2 minutes
    start = time.time()
    
    while solver_manager.get_solver_status(job_id) == "SOLVING_ACTIVE":
        if time.time() - start > timeout:
            solver_manager.terminate_early(job_id)
            break
        time.sleep(2)
    
    # Get solution
    solution = solver_manager.get_final_best_solution(job_id)
    
    # Verify all assigned
    unassigned = [ma for ma in solution.meeting_assignment_list 
                  if ma.starting_time_grain is None or ma.room is None]
    assert len(unassigned) == 0, f"{len(unassigned)} meetings unassigned"
    
    # Verify feasible
    assert solution.score is not None
    assert solution.score.hard_score == 0, \
        f"Solution infeasible: {solution.score}"
    
    print(f"Final score: {solution.score}")

Manual Testing via UI

  1. Start application:

    run-app
    
  2. Open browser console (F12) to monitor API calls

  3. Load and inspect data:

    • Verify 24 meetings, 20 people, 3 rooms displayed
    • Check time grains span 4 days
  4. Solve and observe:

    • Click “Solve”
    • Watch score improve in real-time
    • See meetings get assigned to rooms and times
    • Monitor constraint violations decrease
  5. Verify solution quality:

    • Hard score should be 0 (feasible)
    • All meetings assigned (no unassigned list)
    • Room capacity respected (check stats)
    • No double-bookings (visual timeline check)
  6. Test constraint analysis:

    • Click “Analyze” tab
    • Review constraint breakdown
    • Verify matches make sense
  7. Test early termination:

    • Start solving
    • Click “Stop solving” after 5 seconds
    • Verify partial solution returned

Production Considerations

Performance: Constraint Evaluation Speed

Constraints are evaluated millions of times during solving. Performance matters.

❌ DON’T: Complex calculations in constraints

def bad_constraint(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .filter(lambda ma: 
            expensive_api_call(ma.meeting.topic))  # SLOW!
        .penalize(HardMediumSoftScore.ONE_SOFT)
        .as_constraint("Bad")
    )

✅ DO: Pre-compute before solving

# Before solving, once
blocked_topics = fetch_blocked_topics_from_api()

def good_constraint(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(MeetingAssignment)
        .filter(lambda ma: ma.meeting.topic in blocked_topics)  # Fast set lookup
        .penalize(HardMediumSoftScore.ONE_SOFT)
        .as_constraint("Good")
    )

Time Grain Granularity

The default is 15-minute grains. Consider trade-offs:

Finer granularity (5 minutes):

  • ✅ More scheduling flexibility
  • ✅ Better fits meeting durations
  • ❌ 3× more time slots → larger search space → slower

Coarser granularity (30 minutes):

  • ✅ Fewer time slots → faster solving
  • ❌ Less flexibility (all meetings snap to 30-min intervals)

Recommendation: 15 minutes is a good balance for most organizations.

Scaling Strategies

Problem size guidelines (30 second solve):

  • Up to 30 meetings, 5 rooms, 5 days: Good solutions
  • 30-50 meetings: Increase solve time to 2-5 minutes
  • 50-100 meetings: Consider decomposition

Decomposition approaches:

By time period:

# Schedule week 1, then week 2
week1_meetings = [m for m in meetings if m.week == 1]
week2_meetings = [m for m in meetings if m.week == 2]

solution_week1 = solve(week1_meetings, rooms)
solution_week2 = solve(week2_meetings, rooms)

By department:

# Schedule each department separately
for dept in ["Engineering", "Sales", "Marketing"]:
    dept_meetings = [m for m in meetings if m.department == dept]
    dept_solution = solve(dept_meetings, dept_rooms)

By priority:

# Schedule high-priority meetings first, then fill in rest
high_pri = [m for m in meetings if m.priority == "high"]
solution_high = solve(high_pri, rooms)

# Pin high-priority assignments
for assignment in solution_high.meeting_assignment_list:
    assignment.pinned = True

# Add low-priority meetings and re-solve
all_meetings = high_pri + low_priority_meetings
final_solution = solve(all_meetings, rooms)

The pinned field prevents the solver from changing certain assignments.

Handling Infeasible Problems

Sometimes no feasible solution exists (e.g., too many meetings, insufficient rooms).

Detect and diagnose:

solution = solver_manager.get_final_best_solution(job_id)

if solution.score.hard_score < 0:
    # Analyze what's infeasible
    analysis = solution_manager.analyze(solution)
    
    violations = {}
    for constraint in analysis.constraint_analyses:
        if constraint.score.hard_score < 0:
            violations[constraint.name] = {
                "score": constraint.score.hard_score,
                "count": len(constraint.matches)
            }
    
    return {
        "status": "infeasible",
        "hard_score": solution.score.hard_score,
        "violations": violations,
        "suggestions": generate_suggestions(violations)
    }

def generate_suggestions(violations):
    suggestions = []
    if "Room conflict" in violations:
        suggestions.append("Add more rooms or reduce meeting durations")
    if "Required attendance conflict" in violations:
        suggestions.append("Mark some attendees as 'preferred' instead of 'required'")
    if "Required room capacity" in violations:
        suggestions.append("Use larger rooms or reduce attendee counts")
    return suggestions

Real-Time Rescheduling

Scenario: Need to reschedule due to:

  • Meeting canceled
  • Room unavailable
  • Attendee conflict added

Incremental re-solving:

def cancel_meeting(schedule: MeetingSchedule, meeting_id: str):
    """Remove a meeting and re-optimize."""
    
    # Find and remove the assignment
    schedule.meeting_assignment_list = [
        ma for ma in schedule.meeting_assignment_list
        if ma.meeting.id != meeting_id
    ]
    
    # Re-solve (starting from current solution)
    job_id = f"replan-{uuid4()}"
    solver_manager.solve_and_listen(job_id, schedule, callback)
    
    return job_id

def add_urgent_meeting(schedule: MeetingSchedule, new_meeting: Meeting):
    """Add urgent meeting and re-optimize."""
    
    # Add meeting to schedule
    schedule.meeting_list.append(new_meeting)
    
    # Create assignment (initially unassigned)
    new_assignment = MeetingAssignment(
        id=f"assignment_{new_meeting.id}",
        meeting=new_meeting,
        starting_time_grain=None,
        room=None
    )
    schedule.meeting_assignment_list.append(new_assignment)
    
    # Re-solve
    solver_manager.solve_and_listen(f"urgent-{uuid4()}", schedule, callback)

Optimization concept: Warm starting from the current solution makes re-scheduling fast — the solver only adjusts what’s necessary.

Monitoring and Logging

Track key metrics:

import logging

logger = logging.getLogger(__name__)

start_time = time.time()
solver_manager.solve_and_listen(job_id, schedule, callback)

# ... wait for completion ...

solution = solver_manager.get_final_best_solution(job_id)
duration = time.time() - start_time

# Metrics
total_meetings = len(solution.meeting_assignment_list)
assigned = sum(1 for ma in solution.meeting_assignment_list 
               if ma.starting_time_grain and ma.room)

logger.info(
    f"Solved schedule {job_id}: "
    f"duration={duration:.1f}s, "
    f"score={solution.score}, "
    f"assigned={assigned}/{total_meetings}, "
    f"feasible={solution.score.hard_score == 0}"
)

# Alert if infeasible
if solution.score.hard_score < 0:
    logger.warning(
        f"Infeasible schedule {job_id}: "
        f"hard_score={solution.score.hard_score}"
    )

Quick Reference

File Locations

Need to…Edit this file
Add/change business rulesrc/meeting_scheduling/constraints.py
Add field to Meetingsrc/meeting_scheduling/domain.py + converters.py
Add field to Person/Roomsrc/meeting_scheduling/domain.py + converters.py
Change solve timesrc/meeting_scheduling/solver.py
Change time grain sizesrc/meeting_scheduling/domain.py (GRAIN_LENGTH_IN_MINUTES)
Add REST endpointsrc/meeting_scheduling/rest_api.py
Change demo datasrc/meeting_scheduling/demo_data.py
Change UIstatic/index.html, static/app.js

Common Constraint Patterns

Unary constraint (single meeting):

constraint_factory.for_each(MeetingAssignment)
    .filter(lambda ma: # condition)
    .penalize(HardMediumSoftScore.ONE_HARD)

Binary constraint (pairs of meetings):

constraint_factory.for_each_unique_pair(
    MeetingAssignment,
    Joiners.equal(lambda ma: ma.room)  # Same room
)
    .filter(lambda ma1, ma2: ma1.calculate_overlap(ma2) > 0)
    .penalize(HardMediumSoftScore.ONE_HARD, 
              lambda ma1, ma2: ma1.calculate_overlap(ma2))

Attendance-based constraint:

constraint_factory.for_each(RequiredAttendance)
    .join(RequiredAttendance,
          Joiners.equal(lambda att: att.person))
    .filter(lambda att1, att2: # overlapping meetings)
    .penalize(...)

Grouping and counting:

constraint_factory.for_each(MeetingAssignment)
    .group_by(
        lambda ma: ma.starting_time_grain.day_of_year,
        ConstraintCollectors.count()
    )
    .filter(lambda day, count: count > MAX)
    .penalize(...)

Reward instead of penalize:

.reward(HardMediumSoftScore.ONE_SOFT)

Common Domain Patterns

Check if meeting assigned:

if ma.starting_time_grain is not None and ma.room is not None:
    # Meeting is fully assigned

Calculate meeting end time:

end_grain_index = ma.get_last_time_grain_index()
# or
end_grain_index = ma.starting_time_grain.grain_index + ma.meeting.duration_in_grains - 1

Check overlap:

overlap_grains = meeting1.calculate_overlap(meeting2)
if overlap_grains > 0:
    # Meetings overlap

Get attendee count:

total_attendees = (
    len(meeting.required_attendances) + 
    len(meeting.preferred_attendances)
)

Time grain conversions:

# Grain index to time
hour = time_grain.starting_minute_of_day // 60
minute = time_grain.starting_minute_of_day % 60

# Check if morning
is_morning = time_grain.starting_minute_of_day < 12 * 60

Debugging Tips

Enable verbose logging:

import logging
logging.basicConfig(level=logging.DEBUG)

Analyze solution score:

from meeting_scheduling.solver import solution_manager

analysis = solution_manager.analyze(schedule)

for constraint in analysis.constraint_analyses:
    print(f"{constraint.name}: {constraint.score}")
    for match in constraint.matches[:5]:  # First 5 matches
        print(f"  {match.justification}")

Test constraint in isolation:

from solverforge_legacy.test import ConstraintVerifier

verifier = ConstraintVerifier.build(
    define_constraints,
    MeetingSchedule,
    MeetingAssignment
)

verifier.verify_that(room_conflict) \
    .given(assignment1, assignment2) \
    .penalizes_by(expected_penalty)

Print meeting details:

def print_schedule(schedule: MeetingSchedule):
    """Debug helper."""
    for ma in schedule.meeting_assignment_list:
        if ma.starting_time_grain and ma.room:
            start_min = ma.starting_time_grain.starting_minute_of_day
            hour = start_min // 60
            minute = start_min % 60
            print(f"{ma.meeting.topic}: Day {ma.starting_time_grain.day_of_year}, "
                  f"{hour:02d}:{minute:02d} in {ma.room.name}")
        else:
            print(f"{ma.meeting.topic}: UNASSIGNED")

Common Gotchas

  1. Forgot to handle None values

    • Check ma.starting_time_grain is not None before accessing properties
    • Symptom: AttributeError: ‘NoneType’ object has no attribute ‘grain_index’
  2. Time grain list not in scope

    • The avoid_overtime constraint needs access to time_grain_list
    • Solution: Pass via closure or access from solution object
    • Symptom: NameError: name ’time_grain_list’ is not defined
  3. Overlapping vs touching meetings

    • Meeting ends at grain 7, next starts at grain 8: not overlapping
    • Use calculate_overlap() > 0 to check
    • Symptom: False positives in conflict detection
  4. Forgot to register constraint

    • Add to define_constraints() return list
    • Symptom: Constraint not enforced
  5. Score level confusion

    • Hard: HardMediumSoftScore.ONE_HARD
    • Medium: HardMediumSoftScore.ONE_MEDIUM
    • Soft: HardMediumSoftScore.ONE_SOFT
    • Or: HardMediumSoftScore.of_soft(100)
    • Symptom: Constraint at wrong priority level
  6. Attendance navigation

    • RequiredAttendance has .person and .meeting
    • Meeting has .meeting_assignment
    • Person doesn’t directly link to meetings
    • Symptom: Can’t navigate relationship

Performance Benchmarks

Typical evaluation speeds (on modern hardware):

Problem SizeEvaluations/Second30-Second Results
10 meetings, 3 rooms, 2 days5,000+Near-optimal
24 meetings, 3 rooms, 4 days2,000+High quality
50 meetings, 5 rooms, 5 days500-1000Good quality
100 meetings, 8 rooms, 10 days200-500Decent quality

If significantly slower, review constraint complexity and look for expensive operations.


Conclusion

You now have a complete understanding of constraint-based meeting scheduling:

Multi-resource modeling — Coordinating time slots, rooms, and people simultaneously
Hierarchical scoring — Three-tier constraints (hard/medium/soft) with clear priorities
Multiple planning variables — Optimizing both time and room for each meeting
Conflict resolution — Handling required vs preferred attendance gracefully
Customization patterns — Extending for your organization’s policies

Next Steps

  1. Run the application and experiment with the demo data
  2. Modify an existing constraint — change capacity limits or time preferences
  3. Add your own constraint — implement a rule from your organization
  4. Test thoroughly — write unit tests for your constraints
  5. Customize the data model — add departments, priorities, or other business fields
  6. Deploy with real data — integrate with your calendar system

Key Takeaways

Three-Tier Scoring:

  • Hard: Non-negotiable requirements
  • Medium: Strong preferences (degraded service acceptable)
  • Soft: Optimization goals and nice-to-haves

Multiple Planning Variables:

  • Each MeetingAssignment has two independent variables: time and room
  • Solver optimizes both simultaneously
  • Creates richer search space and better solutions

Discrete Time Grains:

  • Convert continuous time into 15-minute slots
  • Simplifies overlap detection and constraint evaluation
  • Matches real-world calendar behavior

Attendance Hierarchy:

  • Required attendance: Hard constraint (must attend)
  • Preferred attendance: Soft constraint (should attend if possible)
  • Enables flexible scheduling when conflicts arise

The Power of Constraints:

  • Most business logic in one file (constraints.py)
  • Easy to add new scheduling policies
  • Declarative: describe what you want, solver finds how

Comparison to Other Quickstarts

vs. Employee Scheduling:

  • Employee: Single resource (employees assigned to shifts)
  • Meeting: Three resources (time + room + people)
  • Employee: Two-tier scoring (hard/soft)
  • Meeting: Three-tier scoring (hard/medium/soft)

vs. Vehicle Routing:

  • Routing: Spatial optimization (minimize distance)
  • Meeting: Temporal optimization (minimize conflicts, pack early)
  • Routing: List variables (route sequences)
  • Meeting: Multiple simple variables per entity (time + room)

Each quickstart teaches complementary optimization techniques.

Additional Resources


Questions? Start by solving the demo data and observing how meetings get assigned. Try modifying constraints to see how the schedule changes. The best way to learn scheduling optimization is to experiment and visualize the results.

Happy scheduling! 📅🗓️

6 - Vehicle Routing

A comprehensive quickstart guide to understanding and building intelligent vehicle routing with SolverForge

Legacy Implementation Guide

This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.

SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.


Table of Contents

  1. Introduction
  2. Getting Started
  3. The Problem We’re Solving
  4. Understanding the Data Model
  5. How Route Optimization Works
  6. Writing Constraints: The Business Rules
  7. The Solver Engine
  8. Web Interface and API
  9. Making Your First Customization
  10. Advanced Constraint Patterns
  11. Testing and Validation
  12. Production Considerations
  13. Quick Reference

Introduction

What You’ll Learn

This guide walks you through a complete vehicle routing application built with SolverForge, a constraint-based optimization framework. You’ll learn:

  • How to model real-world logistics problems as optimization problems
  • How to construct efficient delivery routes with time windows and capacity constraints
  • How optimization algorithms balance competing objectives automatically
  • How to customize the system for your specific routing needs

No optimization background required — we’ll explain concepts as we encounter them in the code.

Prerequisites

  • Basic Python knowledge (classes, functions, type annotations)
  • Familiarity with REST APIs
  • Comfort with command-line operations
  • Understanding of basic geographic concepts (latitude/longitude)

What is Vehicle Routing Optimization?

Traditional planning: Manually assign deliveries to drivers and plan routes using maps.

Vehicle routing optimization: You describe your vehicles, customers, and constraints — the solver automatically generates efficient routes that minimize travel time while satisfying all requirements.

Think of it like having an expert logistics planner who can evaluate millions of route combinations per second to find near-optimal solutions.

SolverForge Enhancements

This implementation includes several enhancements over the standard Timefold quickstart:

FeatureBenefit
Adaptive time windowsTime windows dynamically scale based on problem area and visit count, ensuring feasible solutions
Haversine formulaFast great-circle distances without external API dependencies (default mode)
Real Roads modeOptional OSMnx integration for actual road network routing with visual route display
Real street addressesDemo data uses actual locations in Philadelphia, Hartford, and Florence for realistic routing

These features give you more control over the performance/accuracy tradeoff during development and production.


Getting Started

Running the Application

  1. Navigate to the project directory:

    cd solverforge-quickstarts/legacy/vehicle-routing-fast
    
  2. Create and activate virtual environment:

    python -m venv .venv
    source .venv/bin/activate  # On Windows: .venv\Scripts\activate
    
  3. Install the package:

    pip install -e .
    
  4. Start the server:

    run-app
    
  5. Open your browser:

    http://localhost:8082
    

You’ll see a map interface with customer locations plotted. Click “Solve” and watch the solver automatically create delivery routes for multiple vehicles, respecting capacity limits and time windows.

File Structure Overview

src/vehicle_routing/
├── domain.py              # Data classes (Vehicle, Visit, Location)
├── constraints.py         # Business rules (capacity, time windows, distance)
├── solver.py              # Solver configuration
├── demo_data.py           # Sample datasets (Philadelphia, Hartford, Florence)
├── rest_api.py            # HTTP API endpoints
├── routing.py             # Distance matrix and OSMnx routing
├── converters.py          # REST ↔ Domain model conversion
├── json_serialization.py  # JSON helpers
└── score_analysis.py      # Score breakdown DTOs

static/
├── index.html             # Web UI
└── app.js                 # UI logic and map visualization

tests/
├── test_constraints.py    # Unit tests for constraints
├── test_routing.py        # Unit tests for routing module
└── test_feasible.py       # Integration tests

Key insight: Most business customization happens in constraints.py alone. The domain model defines what can be routed, but constraints define what makes a good route.


The Problem We’re Solving

The Vehicle Routing Challenge

You need to assign customer visits to vehicles and determine the order of visits for each vehicle while satisfying:

Hard constraints (must be satisfied):

  • Vehicle capacity limits (total customer demand ≤ vehicle capacity)
  • Time windows (arrive at customer before their deadline)

Soft constraints (objectives to minimize):

  • Total driving time across all vehicles

This is known as the Capacitated Vehicle Routing Problem with Time Windows (CVRPTW) — a classic optimization problem in logistics.

Why This is Hard

Even with just 10 customers and 3 vehicles, there are over 3.6 million possible route configurations. With 50 customers and 6 vehicles, the possibilities become astronomical.

The challenges:

  • Combinatorial explosion: Number of possibilities grows exponentially with problem size
  • Multiple objectives: Minimize distance while respecting capacity and time constraints
  • Interdependencies: Assigning one customer affects available capacity and time for others

Route optimization algorithms use sophisticated strategies to explore this space efficiently, finding high-quality solutions in seconds.


Understanding the Data Model

Let’s examine the core classes that model our routing problem. Open src/vehicle_routing/domain.py:

The Location Class

@dataclass
class Location:
    latitude: float
    longitude: float

    # Earth radius in meters
    _EARTH_RADIUS_M = 6371000
    _TWICE_EARTH_RADIUS_M = 2 * _EARTH_RADIUS_M
    # Average driving speed assumption: 50 km/h
    _AVERAGE_SPEED_KMPH = 50

    def driving_time_to(self, other: "Location") -> int:
        """
        Get driving time in seconds to another location using Haversine formula.
        """
        return self._calculate_driving_time_haversine(other)

What it represents: A geographic coordinate (latitude/longitude).

Key method:

  • driving_time_to(): Calculates driving time using the Haversine formula

Haversine formula details:

  • Accounts for Earth’s curvature using great-circle distance
  • Assumes 50 km/h average driving speed
  • Example: Philadelphia to New York (~130 km) → ~9,400 seconds (~2.6 hours)

Optimization concept: The Haversine formula provides realistic geographic distances without external API dependencies. For production with real road networks, you can replace the distance calculation with a routing API (Google Maps, OSRM, etc.).

The Visit Class (Planning Entity)

@planning_entity
@dataclass
class Visit:
    id: Annotated[str, PlanningId]
    name: str                                          # Customer name
    location: Location                                # Where to visit
    demand: int                                       # Capacity units required
    min_start_time: datetime                          # Earliest service start
    max_end_time: datetime                            # Latest service end (deadline)
    service_duration: timedelta                       # How long service takes
    
    # Shadow variables (automatically updated by solver)
    vehicle: Annotated[
        Optional['Vehicle'],
        InverseRelationShadowVariable(source_variable_name="visits"),
    ] = None
    
    previous_visit: Annotated[
        Optional['Visit'], 
        PreviousElementShadowVariable(source_variable_name="visits")
    ] = None
    
    next_visit: Annotated[
        Optional['Visit'], 
        NextElementShadowVariable(source_variable_name="visits")
    ] = None
    
    arrival_time: Annotated[
        Optional[datetime],
        CascadingUpdateShadowVariable(target_method_name="update_arrival_time"),
    ] = None

What it represents: A customer location that needs a delivery/service visit.

Key fields:

  • demand: How much vehicle capacity this visit consumes (e.g., number of packages, weight)
  • min_start_time: Earliest acceptable arrival (customer opens at 8 AM)
  • max_end_time: Latest acceptable service completion (customer closes at 6 PM)
  • service_duration: Time spent at customer location (unloading, paperwork, etc.)

Shadow variable annotations explained:

  • InverseRelationShadowVariable(source_variable_name="visits"): Automatically set to the Vehicle when this Visit is added to a vehicle’s visits list
  • PreviousElementShadowVariable(source_variable_name="visits"): Points to the previous visit in the route chain (or None if first)
  • NextElementShadowVariable(source_variable_name="visits"): Points to the next visit in the route chain (or None if last)
  • CascadingUpdateShadowVariable(target_method_name="update_arrival_time"): Triggers the update_arrival_time() method when dependencies change, cascading arrival time calculations through the route

Shadow variables (automatically maintained by solver):

  • vehicle: Which vehicle is assigned to this visit (inverse relationship)
  • previous_visit/next_visit: Links forming the route chain
  • arrival_time: When vehicle arrives at this location (cascades through chain)

Optimization concept: Shadow variables implement derived data — values that depend on planning decisions but aren’t directly decided by the solver. They update automatically when the solver modifies routes.

Important methods:

def calculate_departure_time(self) -> datetime:
    """When vehicle leaves after service."""
    return max(self.arrival_time, self.min_start_time) + self.service_duration

def is_service_finished_after_max_end_time(self) -> bool:
    """Check if time window violated."""
    return self.calculate_departure_time() > self.max_end_time

def service_finished_delay_in_minutes(self) -> int:
    """How many minutes late (for penalty calculation)."""
    if not self.is_service_finished_after_max_end_time():
        return 0
    return int((self.calculate_departure_time() - self.max_end_time).total_seconds() / 60)

These methods support constraint evaluation without duplicating logic.

The Vehicle Class (Planning Entity)

@planning_entity
@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    name: str                                          # Vehicle name (e.g., "Alpha", "Bravo")
    capacity: int                                      # Maximum demand it can handle
    home_location: Location                            # Depot location
    departure_time: datetime                           # When vehicle leaves depot
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

What it represents: A delivery vehicle that starts from a depot, visits customers, and returns.

Key fields:

  • id: Unique identifier for the vehicle
  • name: Human-readable name (e.g., “Alpha”, “Bravo” from phonetic alphabet)
  • capacity: Total demand the vehicle can carry (e.g., 100 packages, 1000 kg)
  • home_location: Depot where vehicle starts and ends its route
  • departure_time: When vehicle begins its route
  • visits: Ordered list of customer visits — this is what the solver optimizes!

Annotations:

  • @planning_entity: Tells SolverForge this class contains decisions
  • PlanningListVariable: Marks visits as a list variable — the solver assigns visits to vehicles AND determines their order

Important properties:

@property
def arrival_time(self) -> datetime:
    """When vehicle returns to depot."""
    if not self.visits:
        return self.departure_time
    return self.visits[-1].calculate_departure_time() + timedelta(
        seconds=self.visits[-1].location.driving_time_to(self.home_location)
    )

@property
def total_demand(self) -> int:
    """Sum of all visit demands."""
    return sum(visit.demand for visit in self.visits)

@property
def total_driving_time_seconds(self) -> int:
    """Total travel time including return to depot."""
    # Includes depot → first visit, between visits, and last visit → depot

These properties enable constraints to easily evaluate route feasibility and quality.

Optimization concept: Using a list variable for the route means the solver simultaneously solves:

  1. Assignment: Which vehicle serves which customers?
  2. Sequencing: In what order should each vehicle visit its customers?

This is more powerful than separate assignment and routing phases.

The VehicleRoutePlan Class (Planning Solution)

@planning_solution
@dataclass
class VehicleRoutePlan:
    name: str
    south_west_corner: Location                        # Map bounds
    north_east_corner: Location                        # Map bounds
    vehicles: Annotated[list[Vehicle], PlanningEntityCollectionProperty]
    visits: Annotated[list[Visit], PlanningEntityCollectionProperty, ValueRangeProvider]
    score: Annotated[Optional[HardSoftScore], PlanningScore] = None
    solver_status: SolverStatus = SolverStatus.NOT_SOLVING

What it represents: The complete routing problem and its solution.

Key fields:

  • vehicles: All available vehicles (planning entities)
  • visits: All customer visits to be routed (planning entities + value range)
  • score: Solution quality metric
  • Map bounds: Used for visualization

Annotations explained:

  • @planning_solution: Marks this as the top-level problem definition
  • PlanningEntityCollectionProperty: Collections of entities being optimized
  • ValueRangeProvider: The visits list provides possible values for vehicle assignments
  • PlanningScore: Where the solver stores calculated quality

Optimization concept: Unlike employee scheduling where only shifts are entities, here both vehicles and visits are planning entities. Vehicles have list variables (routes), and visits have shadow variables tracking their position in routes.


How Route Optimization Works

Before diving into constraints, let’s understand route construction.

The Route Chaining Mechanism

Routes are built using shadow variable chaining:

  1. Solver modifies: vehicle.visits = [visit_A, visit_B, visit_C]

  2. Shadow variables automatically update:

    • Each visit’s vehicle points to the vehicle
    • Each visit’s previous_visit and next_visit link the chain
    • Each visit’s arrival_time cascades through the chain
  3. Arrival time cascade:

    visit_A.arrival_time = vehicle.departure_time + travel(depot → A)
    visit_B.arrival_time = visit_A.departure_time + travel(A → B)
    visit_C.arrival_time = visit_B.departure_time + travel(B → C)
    
  4. Constraints evaluate: Check capacity, time windows, and distance

Optimization concept: This is incremental score calculation. When the solver moves one visit, only affected arrival times recalculate — not the entire solution. This enables evaluating millions of route modifications per second.

Why this matters for performance: Shadow variables enable efficient incremental updates. With Pydantic models, validation overhead would occur on every update—compounding across millions of moves per second. The dataclass approach avoids this overhead entirely. See benchmark analysis for details on this architectural choice.

The Search Process

  1. Initial solution: Often all visits unassigned or randomly assigned
  2. Evaluate score: Calculate capacity violations, time window violations, and total distance
  3. Make a move:
    • Assign visit to different vehicle
    • Change visit order in a route
    • Swap visits between routes
  4. Re-evaluate score (incrementally)
  5. Accept if improvement (with controlled randomness to escape local optima)
  6. Repeat millions of times
  7. Return best solution found

Metaheuristics used:

  • Variable Neighborhood Descent: Tries different types of moves
  • Late Acceptance: Accepts solutions close to recent best
  • Strategic Oscillation: Temporarily allows infeasibility to explore more space

The Score: Measuring Route Quality

Every solution gets a score with two parts:

0hard/-45657soft
  • Hard score: Counts capacity overages and time window violations (must be 0)
  • Soft score: Total driving time in seconds (lower magnitude is better)

Examples:

  • -120hard/-50000soft: Infeasible (120 minutes late or 120 units over capacity)
  • 0hard/-45657soft: Feasible route with 45,657 seconds (12.7 hours) driving
  • 0hard/-30000soft: Better route with only 30,000 seconds (8.3 hours) driving

Optimization concept: The scoring system implements constraint prioritization. We absolutely require capacity and time window compliance before optimizing distance.


Writing Constraints: The Business Rules

Now the heart of the system. Open src/vehicle_routing/constraints.py.

The Constraint Provider Pattern

All constraints are registered in one function:

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
    return [
        # Hard constraints (must satisfy)
        vehicle_capacity(constraint_factory),
        service_finished_after_max_end_time(constraint_factory),
        # Soft constraints (minimize)
        minimize_travel_time(constraint_factory),
    ]

Let’s examine each constraint in detail.

Hard Constraint: Vehicle Capacity

Business rule: “A vehicle’s total customer demand cannot exceed its capacity.”

def vehicle_capacity(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Vehicle)
        .filter(lambda vehicle: vehicle.calculate_total_demand() > vehicle.capacity)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda vehicle: vehicle.calculate_total_demand() - vehicle.capacity
        )
        .as_constraint("vehicleCapacity")
    )

How to read this:

  1. for_each(Vehicle): Consider every vehicle
  2. .filter(...): Keep only vehicles exceeding capacity
  3. .penalize(ONE_HARD, ...): Penalize by the amount of excess demand (overage)
  4. .as_constraint(...): Name it for debugging

Why penalize by overage amount?

Example scenarios:

  • Vehicle capacity: 100 units
  • Assigned demand: 80 units → No penalty (feasible)
  • Assigned demand: 105 units → Penalty of 5 hard points (5 units over)
  • Assigned demand: 120 units → Penalty of 20 hard points (20 units over)

Optimization concept: This creates graded penalties that guide the solver. Being slightly over capacity (penalty 5) is “less wrong” than being very over (penalty 20), helping the solver navigate toward feasibility incrementally.

Implementation detail: The calculate_total_demand() method in domain.py:

def calculate_total_demand(self) -> int:
    return sum(visit.demand for visit in self.visits)

Hard Constraint: Time Window Compliance

Business rule: “Service at each customer must finish before their deadline (max_end_time).”

def service_finished_after_max_end_time(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Visit)
        .filter(lambda visit: visit.is_service_finished_after_max_end_time())
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda visit: visit.service_finished_delay_in_minutes()
        )
        .as_constraint("serviceFinishedAfterMaxEndTime")
    )

How to read this:

  1. for_each(Visit): Consider every customer visit
  2. .filter(...): Keep only visits finishing after their deadline
  3. .penalize(ONE_HARD, ...): Penalize by minutes late

Example scenario:

Customer has max_end_time = 18:00 (6 PM deadline):

  • Arrive at 17:00, 30-minute service → Finish at 17:30 → No penalty (on time)
  • Arrive at 17:50, 30-minute service → Finish at 18:20 → Penalty of 20 minutes
  • Arrive at 19:00, 30-minute service → Finish at 19:30 → Penalty of 90 minutes

Wait time handling:

If vehicle arrives before min_start_time, it waits:

def calculate_departure_time(self) -> datetime:
    # If arrive at 7:00 but min_start_time is 8:00, wait until 8:00
    return max(self.arrival_time, self.min_start_time) + self.service_duration

Optimization concept: Time windows create temporal dependencies in routes. The order of visits affects whether deadlines can be met. The solver must balance early visits (to respect time windows) with short distances (to minimize travel).

Helper methods in domain.py:

def is_service_finished_after_max_end_time(self) -> bool:
    """Check deadline violation."""
    if self.arrival_time is None:
        return False
    return self.calculate_departure_time() > self.max_end_time

def service_finished_delay_in_minutes(self) -> int:
    """Calculate penalty magnitude."""
    if not self.is_service_finished_after_max_end_time():
        return 0
    delay = self.calculate_departure_time() - self.max_end_time
    return int(delay.total_seconds() / 60)

Soft Constraint: Minimize Total Distance

Business rule: “Minimize the total driving time across all vehicles.”

def minimize_travel_time(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Vehicle)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda vehicle: vehicle.calculate_total_driving_time_seconds()
        )
        .as_constraint("minimizeTravelTime")
    )

How to read this:

  1. for_each(Vehicle): Consider every vehicle
  2. .penalize(ONE_SOFT, ...): Penalize by total driving seconds for that vehicle
  3. No filter: Every vehicle contributes to the soft score

Why penalize all vehicles?

Each vehicle’s driving time adds to the penalty:

  • Vehicle 1: 10,000 seconds → -10,000 soft score
  • Vehicle 2: 15,000 seconds → -15,000 soft score
  • Vehicle 3: 8,000 seconds → -8,000 soft score
  • Total soft score: -33,000

The solver tries different route configurations to reduce this total.

Optimization concept: This constraint implements the routing objective function. After ensuring feasibility (hard constraints), the solver focuses on minimizing this distance measure.

Implementation detail in domain.py:

def calculate_total_driving_time_seconds(self) -> int:
    """Total travel time including depot → first → ... → last → depot."""
    if not self.visits:
        return 0
    
    total = 0
    
    # Depot to first visit
    total += self.home_location.driving_time_to(self.visits[0].location)
    
    # Between consecutive visits
    for i in range(len(self.visits) - 1):
        total += self.visits[i].location.driving_time_to(self.visits[i + 1].location)
    
    # Last visit back to depot
    total += self.visits[-1].location.driving_time_to(self.home_location)
    
    return total

Why include depot return? Real routes must return vehicles to the depot. Not including this would incentivize ending routes far from the depot.


The Solver Engine

Now let’s see how the solver is configured. Open src/vehicle_routing/solver.py:

solver_config = SolverConfig(
    solution_class=VehicleRoutePlan,
    entity_class_list=[Vehicle, Visit],
    score_director_factory_config=ScoreDirectorFactoryConfig(
        constraint_provider_function=define_constraints
    ),
    termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)

solver_manager = SolverManager.create(solver_config)
solution_manager = SolutionManager.create(solver_manager)

Configuration Breakdown

solution_class: Your planning solution class (VehicleRoutePlan)

entity_class_list: Both Vehicle and Visit are planning entities

  • Vehicle has a planning variable (visits list)
  • Visit has shadow variables that depend on vehicle assignments

score_director_factory_config: Contains the constraint provider function

  • This is where your business rules are registered

termination_config: When to stop solving

  • spent_limit=Duration(seconds=30): Stop after 30 seconds
  • Could also use: unimproved_spent_limit (stop if no improvement for X seconds)

SolverManager: Asynchronous Solving

Routing problems can take time to solve well. SolverManager handles solving in the background:

# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, route_plan, callback_function)

# Check status
status = solver_manager.get_solver_status(job_id)

# Get current best solution (updates in real-time)
solution = solver_manager.get_final_best_solution(job_id)

# Stop early if satisfied
solver_manager.terminate_early(job_id)

Optimization concept: Vehicle routing uses anytime algorithms that continuously improve solutions. You get a valid answer quickly, then progressively better answers as solving continues. You can stop whenever the solution is “good enough.”

SolutionManager: Score Analysis

The solution_manager helps explain scores:

solution_manager = SolutionManager.create(solver_manager)

# Analyze which constraints fired and why
analysis = solution_manager.analyze(route_plan)

# Shows breakdown like:
# - Vehicle capacity: -50 hard (Vehicle 1: 20 over, Vehicle 2: 30 over)
# - Time windows: -15 hard (Visit 42: 15 minutes late)
# - Total distance: -45657 soft

This is invaluable for debugging infeasible solutions or understanding score composition.

Solving Timeline

Small problems (20-30 visits, 2-3 vehicles):

  • Initial valid solution: < 1 second
  • Good solution: 5-10 seconds
  • High-quality: 30 seconds

Medium problems (50-100 visits, 5-8 vehicles):

  • Initial valid solution: 1-5 seconds
  • Good solution: 30-60 seconds
  • High-quality: 2-5 minutes

Large problems (200+ visits, 10+ vehicles):

  • Initial valid solution: 5-30 seconds
  • Good solution: 5-10 minutes
  • High-quality: 30-60 minutes

Factors affecting speed:

  • Number of visits (main factor)
  • Number of vehicles (less impact than visits)
  • How tight time windows are (tighter = harder)
  • How tight capacity constraints are (fuller vehicles = less flexibility)

Web Interface and API

REST API Endpoints

Open src/vehicle_routing/rest_api.py to see the API. It runs on port 8082 (different from employee scheduling’s 8080).

GET /demo-data

Returns available demo datasets:

["PHILADELPHIA", "HARTFORT", "FIRENZE"]

Each dataset uses real city coordinates with different problem sizes.

GET /demo-data/{demo_name}

Returns a specific demo dataset:

Parameters:

  • demo_name: Name of the demo dataset (PHILADELPHIA, HARTFORT, FIRENZE)
  • routing (query, optional): Routing mode - haversine (default) or real_roads

Request:

GET /demo-data/PHILADELPHIA?routing=haversine

Response:

{
  "name": "demo",
  "southWestCorner": [39.7656, -76.8378],
  "northEastCorner": [40.7764, -74.9301],
  "vehicles": [
    {
      "id": "0",
      "name": "Alpha",
      "capacity": 25,
      "homeLocation": [40.5154, -75.3721],
      "departureTime": "2025-12-10T06:00:00",
      "visits": [],
      "totalDemand": 0,
      "totalDrivingTimeSeconds": 0
    }
  ],
  "visits": [
    {
      "id": "0",
      "name": "Amy Cole",
      "location": [40.7831, -74.9376],
      "demand": 1,
      "minStartTime": "2025-12-10T17:00:00",
      "maxEndTime": "2025-12-10T20:00:00",
      "serviceDuration": 420,
      "vehicle": null,
      "arrivalTime": null
    }
  ],
  "score": null,
  "solverStatus": null,
  "totalDrivingTimeSeconds": 0
}

Field notes:

  • Coordinates are [latitude, longitude] arrays
  • Times use ISO format strings
  • serviceDuration is in seconds (420 = 7 minutes)
  • Initially vehicle is null (unassigned) and visits lists are empty
  • Vehicles have names from the phonetic alphabet (Alpha, Bravo, Charlie, etc.)
  • Customer names are randomly generated from first/last name combinations

Demo datasets:

  • PHILADELPHIA: 55 visits, 6 vehicles, moderate capacity (15-30)
  • HARTFORT: 50 visits, 6 vehicles, tighter capacity (20-30)
  • FIRENZE: 77 visits (largest), 6 vehicles, varied capacity (20-40)

GET /demo-data/{demo_name}/stream

Server-Sent Events (SSE) endpoint for loading demo data with progress updates. Use this when routing=real_roads to show download/computation progress.

Parameters:

  • demo_name: Name of the demo dataset
  • routing (query, optional): haversine (default) or real_roads

Request:

GET /demo-data/PHILADELPHIA/stream?routing=real_roads

SSE Events:

Progress event (during computation):

{"event": "progress", "phase": "network", "message": "Downloading OpenStreetMap road network...", "percent": 10, "detail": "Area: 0.08° × 0.12°"}

Complete event (when ready):

{"event": "complete", "solution": {...}, "geometries": {"0": ["encodedPolyline1", "encodedPolyline2"], "1": [...]}}

Error event (on failure):

{"event": "error", "message": "Demo data not found: INVALID"}

Geometry format: Each vehicle’s geometries are an array of encoded polylines (Google polyline format), one per route segment:

  • First: depot → first visit
  • Middle: visit → visit
  • Last: last visit → depot

GET /route-plans/{problem_id}/geometry

Get route geometries for displaying actual road paths:

Response:

{
  "geometries": {
    "0": ["_p~iF~ps|U_ulLnnqC_mqNvxq`@", "afvkFnps|U~hbE~reK"],
    "1": ["_izlFnps|U_ulLnnqC"]
  }
}

Decode polylines on the frontend to display actual road routes instead of straight lines.

POST /route-plans

Submit a routing problem to solve:

Request body: Same format as demo data response

Response: Job ID as plain text

"a1b2c3d4-e5f6-7890-abcd-ef1234567890"

Implementation:

@app.post("/route-plans")
async def solve(problem: VehicleRoutePlanModel) -> str:
    job_id = str(uuid4())
    route_plan = model_to_plan(problem)
    data_sets[job_id] = route_plan
    
    solver_manager.solve_and_listen(
        job_id,
        route_plan,
        lambda solution: update_solution(job_id, solution)
    )
    
    return job_id

Key detail: Uses solve_and_listen() with a callback that updates the stored solution in real-time. This enables live progress tracking in the UI.

GET /route-plans/{problem_id}

Get current best solution:

Request:

GET /route-plans/a1b2c3d4-e5f6-7890-abcd-ef1234567890

Response (while solving):

{
  "vehicles": [
    {
      "id": "vehicle_0",
      "visits": ["5", "12", "23", "7"],
      "totalDemand": 28,
      "totalDrivingTimeSeconds": 15420
    }
  ],
  "visits": [
    {
      "id": "5",
      "vehicle": "vehicle_0",
      "arrivalTime": "2025-11-27T08:15:30"
    }
  ],
  "score": "-15hard/-187594soft",
  "solverStatus": "SOLVING_ACTIVE"
}

Response (finished):

{
  "score": "0hard/-45657soft",
  "solverStatus": "NOT_SOLVING"
}

Important: The response updates in real-time as solving progresses. Clients should poll this endpoint (e.g., every 2 seconds) to show live progress.

GET /route-plans

List all active job IDs:

Response:

["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "b2c3d4e5-f6a7-8901-bcde-f23456789012"]

GET /route-plans/{problem_id}/status

Lightweight status check (score and solver status only):

Response:

{
  "name": "PHILADELPHIA",
  "score": "0hard/-45657soft",
  "solverStatus": "SOLVING_ACTIVE"
}

DELETE /route-plans/{problem_id}

Terminate solving early:

@app.delete("/route-plans/{problem_id}")
async def stop_solving(problem_id: str) -> VehicleRoutePlanModel:
    solver_manager.terminate_early(problem_id)
    return plan_to_model(data_sets.get(problem_id))

Returns the best solution found so far. Useful if the user is satisfied with current quality and doesn’t want to wait for the full 30 seconds.

POST /route-plans/recommendation

Request recommendations for assigning a new visit to vehicles:

Request body:

{
  "solution": { /* complete route plan */ },
  "visitId": "new_visit_42"
}

Response:

[
  {
    "proposition": {
      "vehicleId": "vehicle_2",
      "index": 3
    },
    "scoreDiff": "0hard/-1234soft"
  },
  {
    "proposition": {
      "vehicleId": "vehicle_0",
      "index": 5
    },
    "scoreDiff": "0hard/-2345soft"
  }
]

Returns up to 5 recommendations sorted by score impact. The first recommendation is the best option.

POST /route-plans/recommendation/apply

Apply a selected recommendation:

Request body:

{
  "solution": { /* complete route plan */ },
  "visitId": "new_visit_42",
  "vehicleId": "vehicle_2",
  "index": 3
}

Response: Updated route plan with the visit inserted at the specified position.

PUT /route-plans/analyze

Analyze a solution’s score breakdown:

Request body: Complete route plan (assigned solution)

Response:

{
  "score": "-20hard/-45657soft",
  "constraints": [
    {
      "name": "Vehicle capacity",
      "score": "-20hard/0soft",
      "matches": [
        {
          "justification": "Vehicle vehicle_2 exceeds capacity by 20 units",
          "score": "-20hard/0soft"
        }
      ]
    },
    {
      "name": "Minimize travel time", 
      "score": "0hard/-45657soft",
      "matches": [
        {
          "justification": "Vehicle vehicle_0 drives 12345 seconds",
          "score": "0hard/-12345soft"
        }
      ]
    }
  ]
}

This is invaluable for understanding why a solution has a particular score and which constraints are violated.

Web UI Flow

The static/app.js implements this workflow:

  1. User opens page → Load demo data (GET /demo-data/PHILADELPHIA)
  2. Display map with:
    • Depot marked as home icon
    • Customer locations as numbered markers
    • Sidebar showing visit details
  3. User clicks “Solve”POST /route-plans (get job ID)
  4. Poll GET /route-plans/{id} every 2 seconds
  5. Update UI with:
    • Routes drawn as colored lines connecting visits
    • Each vehicle’s route in different color
    • Stats: total distance, capacity used, time windows status
  6. When solverStatus === "NOT_SOLVING" → Stop polling
  7. Display final score and route statistics

Visual features:

  • Color-coded routes (one color per vehicle)
  • Depot-to-depot complete routes visualized
  • Visit details on hover (arrival time, demand, time window status)
  • Stats panel showing capacity utilization and total distance per vehicle

Making Your First Customization

Let’s add a new constraint step-by-step.

Scenario: Limit Maximum Route Duration

New business rule: “No vehicle can be out for more than 8 hours (from depot departure to depot return).”

This is a hard constraint (must be satisfied).

Step 1: Open constraints.py

Navigate to src/vehicle_routing/constraints.py.

Step 2: Write the Constraint Function

Add this function:

def max_route_duration(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Vehicle routes cannot exceed 8 hours total duration.
    """
    MAX_DURATION_SECONDS = 8 * 60 * 60  # 8 hours
    
    return (
        constraint_factory.for_each(Vehicle)
        .filter(lambda vehicle: len(vehicle.visits) > 0)  # Skip empty vehicles
        .filter(lambda vehicle: 
            (vehicle.arrival_time - vehicle.departure_time).total_seconds() 
            > MAX_DURATION_SECONDS
        )
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda vehicle: int(
                ((vehicle.arrival_time - vehicle.departure_time).total_seconds() 
                 - MAX_DURATION_SECONDS) / 60
            )
        )
        .as_constraint("Max route duration 8 hours")
    )

How this works:

  1. Examine each vehicle with visits
  2. Calculate total time: depot departure → visit customers → depot return
  3. If exceeds 8 hours, penalize by excess minutes
  4. Example: 9-hour route → 60-minute penalty

Why penalize by excess? A 8.5-hour route (penalty 30 minutes) is closer to feasible than a 12-hour route (penalty 240 minutes), guiding the solver incrementally.

Step 3: Register the Constraint

Add it to the define_constraints function:

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
    return [
        # Hard constraints
        vehicle_capacity(constraint_factory),
        service_finished_after_max_end_time(constraint_factory),
        max_route_duration(constraint_factory),  # ← Add this line
        # Soft constraints
        minimize_travel_time(constraint_factory),
    ]

Step 4: Test It

  1. Restart the server:

    run-app
    
  2. Load demo data and solve:

    • Open http://localhost:8082
    • Load PHILADELPHIA dataset
    • Click “Solve”
  3. Verify constraint:

    • Check vehicle stats in the UI
    • Each vehicle’s total time should be ≤ 8 hours
    • If infeasible, some vehicles may still exceed (hard score < 0)

Testing tip: Temporarily lower the limit to 4 hours to see the constraint actively preventing long routes. You might get an infeasible solution (negative hard score) showing the constraint is working but can’t be satisfied with current vehicles.

Step 5: Make It Configurable

For production use, make the limit configurable per vehicle:

Modify domain.py:

@dataclass
class Vehicle:
    id: Annotated[str, PlanningId]
    name: str
    capacity: int
    home_location: Location
    departure_time: datetime
    max_duration_seconds: int = 8 * 60 * 60  # New field with default
    visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)

Update constraint:

def max_route_duration(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Vehicle)
        .filter(lambda vehicle: len(vehicle.visits) > 0)
        .filter(lambda vehicle: 
            (vehicle.arrival_time - vehicle.departure_time).total_seconds() 
            > vehicle.max_duration_seconds  # Use vehicle-specific limit
        )
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda vehicle: int(
                ((vehicle.arrival_time - vehicle.departure_time).total_seconds() 
                 - vehicle.max_duration_seconds) / 60
            )
        )
        .as_constraint("Max route duration")
    )

Now each vehicle can have a different maximum duration.

Understanding What You Did

You just implemented a temporal constraint — limiting time-based aspects of routes. This pattern is common in routing:

  • Driver shift limits (8-hour, 10-hour, etc.)
  • Maximum distance per route
  • Required breaks after X hours
  • Service windows for depot operations

The pattern is always:

  1. Calculate the temporal/distance measure
  2. Compare to limit
  3. Penalize by the excess amount (for graded guidance)

Advanced Constraint Patterns

Pattern 1: Priority Customers

Scenario: Some customers are high-priority and should be served earlier in their vehicle’s route.

def priority_customers_early(constraint_factory: ConstraintFactory):
    """
    Soft constraint: High-priority customers should be visited early in route.
    """
    HIGH_PRIORITY_CUSTOMERS = {"Customer A", "Customer B"}
    
    return (
        constraint_factory.for_each(Visit)
        .filter(lambda visit: visit.name in HIGH_PRIORITY_CUSTOMERS)
        .filter(lambda visit: visit.vehicle is not None)  # Assigned visits only
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda visit: visit.vehicle.visits.index(visit) * 100
        )
        .as_constraint("Priority customers early")
    )

How it works: Penalize by position in route × weight:

  • 1st position: penalty 0
  • 2nd position: penalty 100
  • 5th position: penalty 400

This incentivizes placing priority customers earlier without making it a hard requirement.

Optimization concept: This implements soft sequencing preferences. Unlike hard sequencing (e.g., “A must come before B”), this just prefers certain orders.

Pattern 2: Vehicle-Customer Compatibility

Scenario: Certain vehicles cannot serve certain customers (e.g., refrigerated trucks for frozen goods, size restrictions).

First, add compatibility data to domain:

@dataclass
class Vehicle:
    # ... existing fields ...
    vehicle_type: str = "standard"  # e.g., "refrigerated", "large", "standard"

@dataclass
class Visit:
    # ... existing fields ...
    required_vehicle_type: Optional[str] = None  # None = any vehicle OK

Then the constraint:

def vehicle_customer_compatibility(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Only compatible vehicles can serve customers.
    """
    return (
        constraint_factory.for_each(Visit)
        .filter(lambda visit: visit.required_vehicle_type is not None)
        .filter(lambda visit: visit.vehicle is not None)
        .filter(lambda visit: visit.vehicle.vehicle_type != visit.required_vehicle_type)
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Vehicle customer compatibility")
    )

Pattern 3: Balanced Workload Across Vehicles

Scenario: Distribute visits evenly across vehicles (avoid one vehicle doing everything).

def balance_vehicle_workload(constraint_factory: ConstraintFactory):
    """
    Soft constraint: Balance number of visits across vehicles.
    """
    return (
        constraint_factory.for_each(Vehicle)
        .group_by(
            ConstraintCollectors.count_distinct(lambda vehicle: vehicle)
        )
        .complement(Vehicle, lambda v: 0)  # Include empty vehicles
        .group_by(
            ConstraintCollectors.load_balance(
                lambda vehicle: vehicle,
                lambda vehicle: len(vehicle.visits)
            )
        )
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda load_balance: int(load_balance.unfairness() * 100)
        )
        .as_constraint("Balance workload")
    )

Optimization concept: This uses the load balancing collector which calculates variance in workload distribution. It’s more sophisticated than simple quadratic penalties.

Pattern 4: Break Requirements

Scenario: Drivers must take a 30-minute break after 4 hours of driving.

This requires modeling breaks explicitly:

@dataclass
class Break:
    """Represents a required break in a vehicle's route."""
    id: str
    duration: timedelta = timedelta(minutes=30)
    min_driving_before: timedelta = timedelta(hours=4)

@dataclass
class Vehicle:
    # ... existing fields ...
    required_breaks: list[Break] = field(default_factory=list)

Then a complex constraint checking cumulative driving time:

def break_enforcement(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Breaks must occur within required intervals.
    """
    # Implementation would track cumulative driving time and verify
    # breaks are inserted appropriately in the route
    # This is advanced and requires careful handling of route chains
    pass

Note: Break enforcement is complex and often requires custom move selectors to ensure breaks are positioned correctly. This is beyond basic constraint writing.

Pattern 5: Distance Limits

Scenario: Each vehicle has a maximum total distance it can travel (fuel constraint).

def max_vehicle_distance(constraint_factory: ConstraintFactory):
    """
    Hard constraint: Vehicle cannot exceed maximum distance.
    """
    return (
        constraint_factory.for_each(Vehicle)
        .filter(lambda vehicle: len(vehicle.visits) > 0)
        .filter(lambda vehicle: 
            vehicle.calculate_total_driving_time_seconds() 
            > vehicle.max_driving_seconds  # New field needed
        )
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda vehicle: int(
                (vehicle.calculate_total_driving_time_seconds() 
                 - vehicle.max_driving_seconds) / 60
            )
        )
        .as_constraint("Max vehicle distance")
    )

You’d need to add max_driving_seconds to the Vehicle class in domain.py.


Testing and Validation

Unit Testing Constraints

Best practice: Test each constraint in isolation without running full solver.

Open tests/test_constraints.py to see examples:

from vehicle_routing.domain import Vehicle, Visit, Location, VehicleRoutePlan
from vehicle_routing.constraints import define_constraints
from solverforge_legacy.solver.test import ConstraintVerifier

# Create verifier with your constraints
constraint_verifier = ConstraintVerifier.build(
    define_constraints,
    VehicleRoutePlan,
    Vehicle,
    Visit
)

Example: Test Capacity Constraint

def test_vehicle_capacity_unpenalized():
    """Capacity within limit should not penalize."""
    vehicle = Vehicle(
        id="v1",
        name="Alpha",
        capacity=100,
        home_location=Location(0.0, 0.0),
        departure_time=datetime(2025, 11, 27, 8, 0)
    )

    visit = Visit(
        id="visit1",
        name="Customer A",
        location=Location(1.0, 1.0),
        demand=80,  # Within capacity
        min_start_time=datetime(2025, 11, 27, 9, 0),
        max_end_time=datetime(2025, 11, 27, 18, 0),
        service_duration=timedelta(minutes=30)
    )

    # Connect visit to vehicle (helper from tests)
    connect(vehicle, visit)

    # Verify no penalty
    constraint_verifier.verify_that(vehicle_capacity) \
        .given(vehicle, visit) \
        .penalizes_by(0)

def test_vehicle_capacity_penalized():
    """Exceeding capacity should penalize by overage amount."""
    vehicle = Vehicle(id="v1", name="Alpha", capacity=100, ...)

    visit1 = Visit(id="v1", demand=80, ...)
    visit2 = Visit(id="v2", demand=40, ...)  # Total 120 > 100

    connect(vehicle, visit1, visit2)

    # Should penalize by 20 (overage amount)
    constraint_verifier.verify_that(vehicle_capacity) \
        .given(vehicle, visit1, visit2) \
        .penalizes_by(20)

Helper function for tests:

def connect(vehicle: Vehicle, *visits: Visit):
    """Helper to set up vehicle-visit relationships."""
    vehicle.visits = list(visits)
    
    for i, visit in enumerate(visits):
        visit.vehicle = vehicle
        visit.previous_visit = visits[i - 1] if i > 0 else None
        visit.next_visit = visits[i + 1] if i < len(visits) - 1 else None
        
        # Calculate arrival times
        if i == 0:
            travel_time = vehicle.home_location.driving_time_to(visit.location)
            visit.arrival_time = vehicle.departure_time + timedelta(seconds=travel_time)
        else:
            travel_time = visits[i-1].location.driving_time_to(visit.location)
            visit.arrival_time = visits[i-1].calculate_departure_time() + timedelta(seconds=travel_time)

Example: Test Time Window Constraint

def test_time_window_violation():
    """Finishing after deadline should penalize by delay minutes."""
    vehicle = Vehicle(
        id="v1",
        name="Alpha",
        departure_time=datetime(2025, 11, 27, 7, 0),
        ...
    )

    visit = Visit(
        id="visit1",
        max_end_time=datetime(2025, 11, 27, 12, 0),  # Noon deadline
        service_duration=timedelta(minutes=30),
        ...
    )

    # Set arrival causing late finish
    visit.arrival_time = datetime(2025, 11, 27, 11, 45)  # Arrive 11:45
    # Service ends at 12:15 (30 min service) → 15 minutes late

    constraint_verifier.verify_that(service_finished_after_max_end_time) \
        .given(visit) \
        .penalizes_by(15)

Run tests:

pytest tests/test_constraints.py -v

Integration Testing: Full Solve

Test the complete solving cycle in tests/test_feasible.py:

import time
from vehicle_routing.demo_data import DemoData, generate_demo_data
from vehicle_routing.solver import solver_manager

def test_solve_philadelphia():
    """Test that solver finds feasible solution for Philadelphia dataset."""
    
    # Get demo problem
    route_plan = generate_demo_data(DemoData.PHILADELPHIA)
    
    # Verify initially unassigned
    assert all(visit.vehicle is None for visit in route_plan.visits)
    assert all(len(vehicle.visits) == 0 for vehicle in route_plan.vehicles)
    
    # Start solving
    job_id = "test-philadelphia"
    solver_manager.solve(job_id, route_plan)
    
    # Wait for completion (with timeout)
    max_wait = 120  # 2 minutes
    start = time.time()
    
    while solver_manager.get_solver_status(job_id) != "NOT_SOLVING":
        if time.time() - start > max_wait:
            solver_manager.terminate_early(job_id)
            break
        time.sleep(2)
    
    # Get solution
    solution = solver_manager.get_final_best_solution(job_id)
    
    # Verify all visits assigned
    unassigned = [v for v in solution.visits if v.vehicle is None]
    assert len(unassigned) == 0, f"{len(unassigned)} visits remain unassigned"
    
    # Verify feasible (hard score = 0)
    assert solution.score is not None
    assert solution.score.hard_score == 0, \
        f"Solution infeasible with hard score {solution.score.hard_score}"
    
    # Report quality
    print(f"Final score: {solution.score}")
    print(f"Total driving time: {solution.score.soft_score / -1} seconds")
    
    # Verify route integrity
    for vehicle in solution.vehicles:
        if vehicle.visits:
            # Check capacity
            total_demand = sum(v.demand for v in vehicle.visits)
            assert total_demand <= vehicle.capacity, \
                f"Vehicle {vehicle.id} over capacity: {total_demand}/{vehicle.capacity}"
            
            # Check time windows
            for visit in vehicle.visits:
                assert not visit.is_service_finished_after_max_end_time(), \
                    f"Visit {visit.id} finishes late"

Run integration tests:

pytest tests/test_feasible.py -v

Manual Testing via UI

  1. Start the application:

    run-app
    
  2. Open browser console (F12) to see API calls and responses

  3. Load demo data:

    • Select “PHILADELPHIA” from dropdown
    • Verify map shows 55 customer markers + 1 depot
    • Check visit list shows all customers unassigned
  4. Solve and observe:

    • Click “Solve”
    • Watch score improve in real-time
    • See routes appear on map (colored lines)
    • Monitor stats panel for capacity and time info
  5. Verify solution quality:

    • Final hard score should be 0 (feasible)
    • All visits should be assigned (no gray markers)
    • Routes should form complete loops (depot → customers → depot)
    • Check vehicle stats: demand ≤ capacity for each
  6. Test different datasets:

    • Try HARTFORT (tighter capacity constraints)
    • Try FIRENZE (more visits, harder problem)
    • Compare solve times and final scores
  7. Test early termination:

    • Start solving FIRENZE
    • Click “Stop” after 10 seconds
    • Verify you get a partial solution (may be infeasible)
  8. Test with different datasets:

    • Try PHILADELPHIA (55 visits), HARTFORT (50 visits), and FIRENZE (77 visits)
    • Larger datasets take longer to solve but demonstrate scalability

Production Considerations

Performance: Constraint Evaluation Speed

Constraints are evaluated millions of times per second during solving. Performance is critical.

❌ DON’T: Expensive operations in constraints

def bad_constraint(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Visit)
        .filter(lambda visit: 
            fetch_customer_credit_score(visit.name) < 500)  # API call!
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Bad")
    )

✅ DO: Pre-compute before solving

# Before solving, once
blocked_customers = {
    name for name, score in fetch_all_credit_scores().items()
    if score < 500
}

def good_constraint(constraint_factory: ConstraintFactory):
    return (
        constraint_factory.for_each(Visit)
        .filter(lambda visit: visit.name in blocked_customers)  # Fast set lookup
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Good")
    )

Performance tips:

  • Pre-compute expensive calculations (distances, lookup tables)
  • Cache property calculations in domain objects if safe
  • Avoid loops and complex logic in lambda functions
  • Use efficient data structures (sets for membership, dicts for lookup)

Distance Calculation: Two Modes

This quickstart supports two routing modes, selectable via the UI toggle or API parameter:

Haversine Mode (Default)

Fast great-circle distance calculation using the Haversine formula:

  • No external dependencies or network calls
  • Assumes 50 km/h average driving speed
  • Routes display as straight lines on the map
  • Best for: development, testing, and quick iterations

Real Roads Mode

Actual road network routing using OpenStreetMap data via OSMnx:

  • Downloads and caches road network data locally
  • Computes shortest paths using Dijkstra’s algorithm
  • Routes display as actual road paths on the map
  • Progress streaming via Server-Sent Events (SSE)

First-time use: The initial load downloads ~5-15 MB of road network data for the demo area (cached for subsequent runs).

How it works:

  1. Enable “Real Roads” toggle in the UI before loading demo data
  2. The system downloads/loads the OSM road network for the bounding box
  3. A distance matrix is precomputed for all location pairs
  4. The solver uses real driving times; the UI displays actual road routes
# The Location class automatically uses the distance matrix when set
Location.set_distance_matrix(matrix)

# Solver calls driving_time_to() which checks for matrix first
time = loc1.driving_time_to(loc2)  # Uses matrix if available, else haversine

Custom Routing APIs

For production with proprietary routing (Google Maps, Mapbox, OSRM), pre-compute a distance matrix before solving:

def build_real_distance_matrix(locations):
    """Fetch actual driving times from routing API (run once, before solving)."""
    matrix = {}
    for loc1 in locations:
        for loc2 in locations:
            if loc1 != loc2:
                # Call Google Maps / Mapbox / OSRM once per pair
                matrix[(loc1, loc2)] = call_routing_api(loc1, loc2)
    return matrix

Never call external APIs during solving — pre-compute everything.

Scaling Strategies

Problem size guidelines (30 second solve):

  • Up to 100 visits × 5 vehicles: Good solutions
  • 100-200 visits: Increase solve time to 2-5 minutes
  • 200-500 visits: Increase to 10-30 minutes
  • 500+ visits: Consider decomposition strategies

Decomposition approaches:

Geographic clustering:

# Split large problem into regions
north_region_visits = [v for v in visits if v.location.latitude > 40.5]
south_region_visits = [v for v in visits if v.location.latitude <= 40.5]

# Solve each region separately
north_solution = solve(north_vehicles, north_region_visits)
south_solution = solve(south_vehicles, south_region_visits)

Time-based decomposition:

# Solve AM deliveries, then PM deliveries
am_visits = [v for v in visits if v.max_end_time.hour < 13]
pm_visits = [v for v in visits if v.min_start_time.hour >= 13]

Multi-day routing:

# Route planning for weekly schedule
for day in ["Monday", "Tuesday", "Wednesday", ...]:
    day_visits = get_visits_for_day(day)
    solution = solve(vehicles, day_visits)
    save_solution(day, solution)

Handling Infeasible Problems

Sometimes no feasible solution exists (e.g., time windows impossible to meet, insufficient vehicle capacity).

Detect and report:

solution = solver_manager.get_final_best_solution(job_id)

if solution.score.hard_score < 0:
    # Analyze what's infeasible
    unassigned = [v for v in solution.visits if v.vehicle is None]
    over_capacity = [
        v for v in solution.vehicles 
        if v.calculate_total_demand() > v.capacity
    ]
    late_visits = [
        v for v in solution.visits 
        if v.is_service_finished_after_max_end_time()
    ]
    
    return {
        "status": "infeasible",
        "hard_score": solution.score.hard_score,
        "issues": {
            "unassigned_visits": len(unassigned),
            "capacity_violations": len(over_capacity),
            "time_window_violations": len(late_visits)
        },
        "suggestions": [
            "Add more vehicles" if unassigned else None,
            "Increase vehicle capacity" if over_capacity else None,
            "Relax time windows" if late_visits else None
        ]
    }

Relaxation strategies:

  1. Soft capacity violations:

    # Change capacity from hard to soft constraint
    .penalize(HardSoftScore.ONE_SOFT, lambda v: v.total_demand - v.capacity)
    
  2. Penalized unassigned visits:

    # Allow unassigned visits with large penalty
    factory.for_each(Visit)
        .filter(lambda v: v.vehicle is None)
        .penalize(HardSoftScore.of_soft(100000))
    
  3. Flexible time windows:

    # Allow late arrivals with graduated penalty
    .penalize(HardSoftScore.ONE_SOFT, lambda v: v.service_finished_delay_in_minutes())
    

Real-Time Routing Adjustments

Scenario: Need to re-route due to:

  • New urgent orders received
  • Vehicle breakdown
  • Traffic delays

Dynamic re-routing:

def add_urgent_visit(current_solution: VehicleRoutePlan, new_visit: Visit):
    """Add urgent visit and re-optimize."""
    
    # Add to problem
    current_solution.visits.append(new_visit)
    
    # Use current solution as warm start
    job_id = f"urgent-{uuid4()}"
    solver_manager.solve_and_listen(
        job_id,
        current_solution,  # Starts from current routes
        callback,
        problem_change=ProblemChange.add_entity(new_visit)
    )
    
    return job_id

def handle_vehicle_breakdown(solution: VehicleRoutePlan, broken_vehicle_id: str):
    """Re-assign visits from broken vehicle."""
    
    broken_vehicle = next(v for v in solution.vehicles if v.id == broken_vehicle_id)
    
    # Unassign all visits from this vehicle
    for visit in broken_vehicle.visits:
        visit.vehicle = None
    broken_vehicle.visits = []
    
    # Mark vehicle unavailable
    broken_vehicle.capacity = 0
    
    # Re-solve
    solver_manager.solve_and_listen("emergency-replan", solution, callback)

Optimization concept: Warm starting from current solution makes re-routing much faster than solving from scratch. The solver starts with current routes and only modifies what’s necessary.

Monitoring and Logging

Track key metrics:

import logging
import time

logger = logging.getLogger(__name__)

start_time = time.time()
solver_manager.solve_and_listen(job_id, route_plan, callback)

# ... wait for completion ...

solution = solver_manager.get_final_best_solution(job_id)
duration = time.time() - start_time

# Calculate metrics
total_visits = len(solution.visits)
total_vehicles = len(solution.vehicles)
assigned_visits = sum(1 for v in solution.visits if v.vehicle is not None)
total_distance = -solution.score.soft_score if solution.score else 0

logger.info(
    f"Solved route plan {job_id}: "
    f"duration={duration:.1f}s, "
    f"score={solution.score}, "
    f"visits={assigned_visits}/{total_visits}, "
    f"vehicles={total_vehicles}, "
    f"distance={total_distance}s"
)

# Alert if infeasible
if solution.score and solution.score.hard_score < 0:
    logger.warning(
        f"Infeasible solution for {job_id}: "
        f"hard_score={solution.score.hard_score}"
    )

Production monitoring:

  • Solve duration: Alert if suddenly increases (data quality issue?)
  • Infeasibility rate: Track percentage of infeasible solutions
  • Score trends: Monitor if soft scores degrading over time
  • Capacity utilization: Are vehicles underutilized? (might need fewer vehicles)
  • Time window tightness: Frequent time violations? (might need more vehicles)

Quick Reference

File Locations

Need to…Edit this file
Add/change business rulesrc/vehicle_routing/constraints.py
Add field to Vehiclesrc/vehicle_routing/domain.py + converters.py
Add field to Visitsrc/vehicle_routing/domain.py + converters.py
Change solve timesrc/vehicle_routing/solver.py
Change distance calculationsrc/vehicle_routing/routing.py
Configure routing modesrc/vehicle_routing/routing.py
Add REST endpointsrc/vehicle_routing/rest_api.py
Change demo datasrc/vehicle_routing/demo_data.py
Change UI/mapstatic/index.html, static/app.js

Common Constraint Patterns

Unary constraint (single entity):

constraint_factory.for_each(Vehicle)
    .filter(lambda vehicle: # condition)
    .penalize(HardSoftScore.ONE_HARD)

Filtering by route property:

constraint_factory.for_each(Vehicle)
    .filter(lambda vehicle: len(vehicle.visits) > 0)  # Has visits
    .filter(lambda vehicle: vehicle.calculate_total_demand() > vehicle.capacity)
    .penalize(...)

Visit constraints:

constraint_factory.for_each(Visit)
    .filter(lambda visit: visit.vehicle is not None)  # Assigned only
    .filter(lambda visit: # condition)
    .penalize(...)

Summing over vehicles:

constraint_factory.for_each(Vehicle)
    .penalize(
        HardSoftScore.ONE_SOFT,
        lambda vehicle: vehicle.calculate_total_driving_time_seconds()
    )

Variable penalty amount:

.penalize(
    HardSoftScore.ONE_HARD,
    lambda entity: calculate_penalty_amount(entity)
)

Common Domain Patterns

Check if visit assigned:

if visit.vehicle is not None:
    # Visit is assigned to a vehicle

Iterate through route:

for i, visit in enumerate(vehicle.visits):
    print(f"Stop {i+1}: {visit.name}")

Calculate route metrics:

total_demand = sum(v.demand for v in vehicle.visits)
total_time = (vehicle.arrival_time - vehicle.departure_time).total_seconds()
avg_demand = total_demand / len(vehicle.visits) if vehicle.visits else 0

Time calculations:

arrival = visit.arrival_time
service_start = max(arrival, visit.min_start_time)  # Wait if arrived early
service_end = service_start + visit.service_duration
is_late = service_end > visit.max_end_time

Debugging Tips

Enable verbose logging:

import logging
logging.basicConfig(level=logging.DEBUG)

Analyze solution score:

from vehicle_routing.solver import solution_manager

analysis = solution_manager.analyze(route_plan)
print(analysis.summary())

# See detailed constraint breakdown
for constraint in analysis.constraint_analyses:
    print(f"{constraint.name}: {constraint.score}")
    for match in constraint.matches:
        print(f"  - {match.justification}")

Test constraint in isolation:

from vehicle_routing.constraints import define_constraints
from solverforge_legacy.test import ConstraintVerifier

verifier = ConstraintVerifier.build(
    define_constraints,
    VehicleRoutePlan,
    Vehicle,
    Visit
)

# Test specific scenario
verifier.verify_that(vehicle_capacity) \
    .given(test_vehicle, test_visit) \
    .penalizes_by(expected_penalty)

Visualize route in tests:

def print_route(vehicle: Vehicle):
    """Debug helper to print route."""
    print(f"Vehicle {vehicle.id}:")
    print(f"  Capacity: {vehicle.total_demand}/{vehicle.capacity}")
    print(f"  Route:")
    print(f"    Depot → {vehicle.departure_time}")
    for i, visit in enumerate(vehicle.visits):
        print(f"    {visit.name} → arrive {visit.arrival_time}, "
              f"depart {visit.calculate_departure_time()}")
    print(f"    Depot → {vehicle.arrival_time}")

Common Gotchas

  1. Forgot to handle empty routes

    • Check len(vehicle.visits) > 0 before accessing route properties
    • Symptom: IndexError or None errors
  2. Shadow variables not updated

    • Use the connect() helper in tests to properly link visits
    • In production, solver maintains these automatically
    • Symptom: arrival_time is None or incorrect
  3. Distance calculation too slow

    • Pre-compute distance matrices before solving
    • Never call external APIs during constraint evaluation
    • Symptom: Solving extremely slow (< 100 evaluations/second)
  4. Forgot to register constraint

    • Add to define_constraints() return list
    • Symptom: Constraint not enforced
  5. Time zone issues

    • Use timezone-aware datetime objects consistently
    • Or use naive datetime (no timezone) consistently
    • Symptom: Time calculations off by hours
  6. Capacity violations not penalized

    • Ensure calculate_total_demand() is used, not manual sum
    • Check filter logic: should penalize when demand > capacity
    • Symptom: Solutions with impossible loads

Performance Benchmarks

Typical evaluation speeds (on modern hardware):

Problem SizeEvaluations/Second30-Second Results
20 visits, 2 vehicles10,000+Near-optimal
50 visits, 5 vehicles5,000+High quality
100 visits, 8 vehicles2,000+Good quality
200 visits, 10 vehicles500-1000Decent quality

If your speeds are significantly lower, review constraint complexity and pre-compute expensive operations.


Conclusion

You now have a complete understanding of constraint-based vehicle routing:

Problem modeling — How to represent routing problems with vehicles, visits, and locations
Constraint logic — How to express capacity, time windows, and distance minimization
Route construction — How list variables and shadow variables build efficient routes
Customization patterns — How to extend for your routing needs
Production readiness — Performance, scaling, and infeasibility handling

Next Steps

  1. Run the application and experiment with the three demo datasets
  2. Modify an existing constraint — change capacity limits or time windows
  3. Add your own constraint — implement a rule from your domain (max distance, breaks, priorities)
  4. Test thoroughly — write unit tests for your constraints
  5. Customize the data model — add vehicle types, visit priorities, or other business fields
  6. Deploy with real data — integrate with your customer database and mapping service

Key Takeaways

List Variables for Routing:

  • PlanningListVariable on vehicle.visits handles both assignment and sequencing
  • Shadow variables automatically maintain route chain integrity
  • Arrival times cascade through the chain for efficient time calculations

Hard vs Soft Constraints:

  • Hard: Capacity and time windows (must satisfy for valid routes)
  • Soft: Total distance (optimize after ensuring validity)

Graded Penalties:

  • Penalize by excess amount (not just binary yes/no)
  • Helps solver navigate incrementally toward feasibility
  • Example: 20 units over capacity is “less wrong” than 50 units over

Metaheuristics for Routing:

  • Efficiently explore massive solution spaces (millions of possibilities)
  • Anytime algorithms: improve continuously, stop when satisfied
  • No guarantee of global optimum, but high-quality solutions in practical time

The Power of Constraints:

  • Most business logic lives in one file (constraints.py)
  • Easy to add new rules without changing core routing logic
  • Declarative: describe what you want, solver figures out how

Comparison to Other Quickstarts

vs. Employee Scheduling:

  • Scheduling: Temporal (when to schedule shifts)
  • Routing: Spatial + temporal (where to go and when to arrive)
  • Scheduling: Simple planning variable (which employee)
  • Routing: List variable (which visits, in what order)

vs. Other Routing Variants:

  • CVRP (Capacitated Vehicle Routing): Just capacity, no time windows
  • VRPTW (this quickstart): Capacity + time windows
  • VRPPD (Pickup & Delivery): Precedence constraints between pickup/delivery pairs
  • MDVRP (Multi-Depot): Multiple starting locations

This quickstart teaches core concepts applicable to all routing variants.

Additional Resources


Questions? Start by solving the demo datasets and observing how the routes are constructed. Try modifying capacity or time windows to see how the solver adapts. The best way to learn routing optimization is to experiment and visualize the results.

Happy routing! 🚚📦