Getting Started
Quickstart Guides — repository layout, prerequisites, and how to run examples locally.
Legacy Implementation Guides
These quickstart guides use solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.
SolverForge has been completely rewritten as a native constraint solver in Rust, with its own solving engine built from scratch. These guides are preserved as:
- Reference material for understanding constraint solving concepts
- Educational examples of constraint modeling patterns
- Demonstration of optimization problem domains
The JPype bridge and Timefold-based architecture described in these guides do not apply to current SolverForge.
Native Python bindings for the Rust implementation are under active development.
Choose a Quickstart
Assign staff to shifts based on skills and availability.
Perfect for learning core optimization concepts.
[Start Tutorial →](employee-scheduling/)
Find optimal times and rooms for meetings while avoiding conflicts.
[Start Tutorial →](meeting-scheduling/)
Plan delivery routes that minimize travel time with capacity constraints.
[Start Tutorial →](vehicle-routing/)
Schedule lessons to rooms and timeslots without teacher or room conflicts.
[Start Tutorial →](school-timetabling/)
Select stocks for a diversified portfolio while maximizing expected returns.
[Start Tutorial →](portfolio-optimization/)
Place virtual machines on servers respecting capacity, affinity, and consolidation goals.
[Start Tutorial →](vm-placement/)
Build a solver using the core Rust library directly. For advanced users interested in the internals.
[Start Tutorial →](rust-quickstart/)
This page covers:
- Repository layout and quickstart variants
- Prerequisites and installation notes
- How to run an example locally
- Where to find benchmarks, technical notes and individual quickstart READMEs
Repository layout
The repository is organised so you can choose between pedagogical, reference implementations and optimized, performance-minded variants:
legacy/ — Refactored quickstarts that minimize runtime overhead by constraining Pydantic to the API boundary and using lighter-weight models during solver moves.benchmarks/ — Benchmarks, results and a short performance report comparing implementations and use cases.
Common quickstarts available now:
legacy/meeting-scheduling-fastlegacy/vehicle-routing-fastlegacy/employee-scheduling-fastlegacy/portfolio-optimization-fastlegacy/vm-placement-fast
Each use case folder includes a README describing how to run the example, expected inputs, and any implementation-specific details.
Prerequisites
Typical requirements (may vary per quickstart):
- Python 3.8+ (use a virtual environment)
- pip to install dependencies
- Optional: Docker if you prefer containerised execution
Some examples expose a small FastAPI UI or HTTP API and will list FastAPI and related packages in their requirements.txt or pyproject.toml.
Installation
Clone or download the SolverForge quickstarts repository.
Create and activate a virtual environment:
- Unix/macOS:
python -m venv .venvsource .venv/bin/activate
- Windows:
python -m venv .venv.\\.venv\\Scripts\\activate
Install dependencies from the chosen quickstart directory:
pip install -r requirements.txt- Or follow the quickstart’s
pyproject.toml instructions if provided.
Each quickstart README documents any extra dependencies or optional tooling.
Setup
- Inspect the quickstart folder for example data, configuration files, and environment variables.
- If the quickstart includes Docker assets, follow the README for Docker or docker-compose instructions.
- Confirm any required ports or external resources before starting the example.
Try it out!
Most quickstarts offer one or both run modes:
- A minimal FastAPI service that serves a tiny UI and HTTP endpoints.
- A CLI script that runs the solver on example data and outputs results.
To try a quick example:
- Open the quickstart folder of interest (for example
legacy/meeting-scheduling-fast). - Follow the run instructions in that folder’s README. Common commands are:
python -m <module> or uvicorn for FastAPI-based examples.python run_demo.py or similar CLI entrypoints described in the README.
Check these README files for concrete run commands:
legacy/vehicle-routing/README.MDlegacy/vehicle-routing-fast/README.MDlegacy/meeting-scheduling-fast/README.adoclegacy/employee-scheduling-fast/README.MD
Performance-focused work and benchmark artifacts live in the benchmarks/ folder:
benchmarks/results_meeting-scheduling.mdbenchmarks/results_vehicle-routing.mdbenchmarks/report.md
Where to read more
- Start at the repository top-level README for an overview and the full use-case list.
- Read the individual quickstart READMEs for run instructions, configuration and design notes.
- Consult
benchmarks/ for performance comparisons and technical rationale.
Legal note
This repository derives from prior quickstarts and carries permissive licensing details documented in the top-level README and LICENSE files. Refer to those files for full copyright and licensing information.
1 - Employee Scheduling (Rust)
Build efficient employee scheduling with SolverForge’s native Rust constraint solver
Native Rust Implementation
This guide uses SolverForge’s native Rust constraint solver — a fully monomorphized, zero-erasure implementation with no JVM bridge. All constraints compile to concrete types at build time, enabling aggressive compiler optimizations and true native performance.
If you’re looking for the Python implementation (legacy JPype bridge), see Employee Scheduling (Python).
Table of Contents
- Introduction
- Getting Started
- The Problem We’re Solving
- Understanding the Data Model
- How Optimization Works
- Writing Constraints: The Business Rules
- The Solver Engine
- Web Interface and API
- Making Your First Customization
- Advanced Constraint Patterns
- Quick Reference
Introduction
What You’ll Learn
This guide walks you through a complete employee scheduling application built with SolverForge, a native Rust constraint-based optimization framework. You’ll learn:
- How to model real-world scheduling problems as optimization problems using Rust’s type system
- How to express business rules as constraints using a fluent API
- How SolverForge’s zero-erasure architecture enables native performance
- How to customize the system for your specific needs
No optimization background required — we’ll explain concepts as we encounter them in the code.
Prerequisites
- Rust knowledge (structs, traits, closures, derive macros)
- Familiarity with REST APIs
- Comfort with command-line operations
What is Constraint-Based Optimization?
Traditional programming: You write explicit logic that says “do this, then that.”
Constraint-based optimization: You describe what a good solution looks like and the solver figures out how to achieve it.
Think of it like describing what puzzle pieces you have and what rules they must follow — then having a computer try millions of arrangements per second to find the best fit.
Why Native Rust?
SolverForge’s Rust implementation offers key advantages:
- Zero-erasure architecture: All generic types resolve at compile time — no
Box<dyn Trait>, no Arc, no runtime dispatch - Full monomorphization: Each constraint compiles to specialized machine code
- Memory efficiency: Index-based references instead of string cloning
- True parallelism: No GIL, no JVM pause, native threading
Getting Started
Running the Application
Clone the quickstarts repository:
git clone https://github.com/SolverForge/solverforge-quickstarts
cd ./solverforge-quickstarts/rust/employee-scheduling
Build the project:
Run the server:
Open your browser:
http://localhost:7860
You’ll see a scheduling interface with employees, shifts and a “Solve” button. Click it and watch the solver automatically assign employees to shifts while respecting business rules.
File Structure Overview
rust/employee-scheduling/
├── src/
│ ├── main.rs # Axum server entry point
│ ├── lib.rs # Library crate root
│ ├── domain.rs # Data models (Employee, Shift, EmployeeSchedule)
│ ├── constraints.rs # Business rules (90% of customization happens here)
│ ├── api.rs # REST endpoint handlers
│ ├── dto.rs # JSON serialization types
│ └── demo_data.rs # Sample data generation
├── static/
│ ├── index.html # Web UI
│ └── app.js # UI logic and visualization
└── Cargo.toml # Dependencies
Key insight: Most business customization happens in constraints.rs alone. You rarely need to modify other files.
The Problem We’re Solving
The Scheduling Challenge
You need to assign employees to shifts while satisfying rules like:
Hard constraints (must be satisfied):
- Employee must have the required skill for the shift
- Employee can’t work overlapping shifts
- Employee needs 10 hours rest between shifts
- Employee can’t work more than one shift per day
- Employee can’t work on days they’re unavailable
Soft constraints (preferences to optimize):
- Avoid scheduling on days the employee marked as “undesired”
- Prefer scheduling on days the employee marked as “desired”
- Balance workload fairly across all employees
Why This is Hard
For even 20 shifts and 10 employees, there are 10^20 possible assignments (100 quintillion). A human can’t evaluate them all. Even a computer trying random assignments would take years.
Optimization algorithms use smart strategies to explore this space efficiently, finding high-quality solutions in seconds.
Understanding the Data Model
Let’s examine the three core structs that model our problem. Open src/domain.rs:
The Employee Struct (Problem Fact)
#[problem_fact]
#[derive(Serialize, Deserialize)]
pub struct Employee {
/// Index of this employee in `EmployeeSchedule.employees` for O(1) join matching.
pub index: usize,
pub name: String,
pub skills: HashSet<String>,
#[serde(rename = "unavailableDates", default)]
pub unavailable_dates: HashSet<NaiveDate>,
#[serde(rename = "undesiredDates", default)]
pub undesired_dates: HashSet<NaiveDate>,
#[serde(rename = "desiredDates", default)]
pub desired_dates: HashSet<NaiveDate>,
/// Sorted unavailable dates for `flatten_last` compatibility.
#[serde(skip)]
pub unavailable_days: Vec<NaiveDate>,
#[serde(skip)]
pub undesired_days: Vec<NaiveDate>,
#[serde(skip)]
pub desired_days: Vec<NaiveDate>,
}
What it represents: A person who can be assigned to shifts.
Key fields:
index: Position in the employees array — enables O(1) lookups without string comparisonname: Human-readable identifierskills: What skills this employee possesses (e.g., {"Doctor", "Cardiology"})unavailable_dates: Days the employee absolutely cannot work (hard constraint)undesired_dates / desired_dates: Soft preference fields
Rust-specific design:
#[problem_fact]: Derive macro that marks this as immutable solver dataHashSet<NaiveDate> for O(1) membership testing during JSON deserializationVec<NaiveDate> sorted copies for flatten_last stream compatibility
The Builder Pattern with finalize()
The Employee struct uses a builder pattern with explicit finalization:
impl Employee {
pub fn new(index: usize, name: impl Into<String>) -> Self {
Self {
index,
name: name.into(),
skills: HashSet::new(),
// ... fields initialized empty
}
}
/// Populates derived Vec fields from HashSets for zero-erasure stream compatibility.
/// Must be called after all dates have been added to HashSets.
pub fn finalize(&mut self) {
self.unavailable_days = self.unavailable_dates.iter().copied().collect();
self.unavailable_days.sort();
// ... same for undesired_days, desired_days
}
pub fn with_skill(mut self, skill: impl Into<String>) -> Self {
self.skills.insert(skill.into());
self
}
}
Why finalize()? The constraint stream API’s flatten_last operation requires sorted slices for O(1) index-based lookups. After JSON deserialization or programmatic construction, finalize() converts HashSets to sorted Vecs.
The Shift Struct (Planning Entity)
#[planning_entity]
#[derive(Serialize, Deserialize)]
pub struct Shift {
#[planning_id]
pub id: String,
pub start: NaiveDateTime,
pub end: NaiveDateTime,
pub location: String,
#[serde(rename = "requiredSkill")]
pub required_skill: String,
/// Index into `EmployeeSchedule.employees` (O(1) lookup, no String cloning).
#[planning_variable(allows_unassigned = true)]
pub employee_idx: Option<usize>,
}
What it represents: A time slot that needs an employee assignment.
Key annotations:
#[planning_entity]: Derive macro marking this as containing decisions to optimize#[planning_id]: Marks id as the unique identifier#[planning_variable(allows_unassigned = true)]: The decision variable — what the solver assigns
Critical design choice — index-based references:
pub employee_idx: Option<usize> // ✓ O(1) lookup, no allocation
// NOT: pub employee: Option<String> // ✗ String clone on every comparison
Using usize indices instead of employee names provides:
- O(1) lookups via
schedule.employees[idx] - Zero allocations during constraint evaluation
- Direct equality comparison (integer vs string)
The EmployeeSchedule Struct (Planning Solution)
#[planning_solution]
#[basic_variable_config(
entity_collection = "shifts",
variable_field = "employee_idx",
variable_type = "usize",
value_range = "employees"
)]
#[solverforge_constraints_path = "crate::constraints::create_fluent_constraints"]
#[derive(Serialize, Deserialize)]
pub struct EmployeeSchedule {
#[problem_fact_collection]
pub employees: Vec<Employee>,
#[planning_entity_collection]
pub shifts: Vec<Shift>,
#[planning_score]
pub score: Option<HardSoftDecimalScore>,
#[serde(rename = "solverStatus", skip_serializing_if = "Option::is_none")]
pub solver_status: Option<String>,
}
What it represents: The complete problem and its solution.
Annotations explained:
#[planning_solution]: Top-level problem definition#[basic_variable_config(...)]: Declarative configuration specifying:- Which collection contains planning entities (
shifts) - Which field is the planning variable (
employee_idx) - The variable’s type (
usize) - Where valid values come from (
employees)
#[solverforge_constraints_path]: Points to the constraint factory function#[problem_fact_collection]: Immutable data (doesn’t change during solving)#[planning_entity_collection]: Entities being optimized#[planning_score]: Where the solver stores the calculated score
How Optimization Works
Before diving into constraints, let’s understand how the solver finds solutions.
The Search Process (Simplified)
- Start with an initial solution (often random or all unassigned)
- Evaluate the score using your constraint functions
- Make a small change (assign a different employee to one shift)
- Evaluate the new score
- Keep the change if it improves the score (with some controlled randomness)
- Repeat millions of times in seconds
- Return the best solution found
SolverForge uses sophisticated metaheuristic algorithms like:
- Tabu Search: Remembers recent moves to avoid cycling
- Simulated Annealing: Occasionally accepts worse solutions to escape local optima
- Late Acceptance: Compares current solution to recent history, not just the immediate previous
These techniques efficiently explore the massive solution space without getting stuck.
The Score: How “Good” is a Solution?
Every solution gets a score with two parts:
0hard/-45soft
- Hard score: Counts hard constraint violations (must be 0 for a valid solution)
- Soft score: Counts soft constraint violations/rewards (higher is better)
Scoring rules:
- Hard score must be 0 or positive (negative = invalid/infeasible)
- Among valid solutions (hard score = 0), highest soft score wins
- Hard score always takes priority over soft score
Writing Constraints: The Business Rules
Now the heart of the system. Open src/constraints.rs.
The Constraint Factory Pattern
All constraints are created in one function:
pub fn create_fluent_constraints() -> impl ConstraintSet<EmployeeSchedule, HardSoftDecimalScore> {
let factory = ConstraintFactory::<EmployeeSchedule, HardSoftDecimalScore>::new();
// Build each constraint...
let required_skill = factory.clone()
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
// ...
// Return all constraints as a tuple
(
required_skill,
no_overlap,
at_least_10_hours,
one_per_day,
unavailable,
undesired,
desired,
balanced,
)
}
The function returns impl ConstraintSet — a trait implemented for tuples of constraints. Each constraint is fully typed with no runtime dispatch.
Hard Constraint: Required Skill
Business rule: “An employee assigned to a shift must have the required skill.”
let required_skill = factory
.clone()
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.join(
|s: &EmployeeSchedule| s.employees.as_slice(),
equal_bi(
|shift: &Shift| shift.employee_idx,
|emp: &Employee| Some(emp.index),
),
)
.filter(|shift: &Shift, emp: &Employee| {
shift.employee_idx.is_some() && !emp.skills.contains(&shift.required_skill)
})
.penalize(HardSoftDecimalScore::ONE_HARD)
.as_constraint("Required skill");
How to read this:
for_each(|s| s.shifts.as_slice()): Stream over all shifts.join(..., equal_bi(...)): Join with employees where shift.employee_idx == Some(emp.index).filter(...): Keep only where employee lacks the required skill.penalize(ONE_HARD): Each violation subtracts 1 from hard score.as_constraint(...): Name for debugging
Key Rust adaptation — equal_bi joiner:
equal_bi(
|shift: &Shift| shift.employee_idx, // Option<usize>
|emp: &Employee| Some(emp.index), // Option<usize>
)
The equal_bi joiner takes two closures — one for each side of the join. This enables joining different types with potentially different key extraction logic.
Hard Constraint: No Overlapping Shifts
Business rule: “An employee can’t work two shifts that overlap in time.”
let no_overlap = factory
.clone()
.for_each_unique_pair(
|s: &EmployeeSchedule| s.shifts.as_slice(),
joiner::equal(|shift: &Shift| shift.employee_idx),
)
.filter(|a: &Shift, b: &Shift| {
a.employee_idx.is_some() && a.start < b.end && b.start < a.end
})
.penalize_hard_with(|a: &Shift, b: &Shift| {
HardSoftDecimalScore::of_hard_scaled(overlap_minutes(a, b) * 100000)
})
.as_constraint("Overlapping shift");
How to read this:
for_each_unique_pair(...): Create pairs of shifts from the same collectionjoiner::equal(|shift| shift.employee_idx): Only pair shifts with the same employee.filter(...): Check time overlap with interval comparison.penalize_hard_with(...): Variable penalty based on overlap duration
Optimization concept: for_each_unique_pair ensures we don’t count violations twice (A,B vs B,A). The joiner uses hash indexing for O(1) pair matching.
Hard Constraint: Rest Between Shifts
Business rule: “Employees need at least 10 hours rest between shifts.”
let at_least_10_hours = factory
.clone()
.for_each_unique_pair(
|s: &EmployeeSchedule| s.shifts.as_slice(),
joiner::equal(|shift: &Shift| shift.employee_idx),
)
.filter(|a: &Shift, b: &Shift| a.employee_idx.is_some() && gap_penalty_minutes(a, b) > 0)
.penalize_hard_with(|a: &Shift, b: &Shift| {
HardSoftDecimalScore::of_hard_scaled(gap_penalty_minutes(a, b) * 100000)
})
.as_constraint("At least 10 hours between 2 shifts");
Helper function:
fn gap_penalty_minutes(a: &Shift, b: &Shift) -> i64 {
const MIN_GAP_MINUTES: i64 = 600; // 10 hours
let (earlier, later) = if a.end <= b.start {
(a, b)
} else if b.end <= a.start {
(b, a)
} else {
return 0; // Overlapping, handled by different constraint
};
let gap = (later.start - earlier.end).num_minutes();
if (0..MIN_GAP_MINUTES).contains(&gap) {
MIN_GAP_MINUTES - gap
} else {
0
}
}
Optimization concept: The penalty 600 - actual_gap creates incremental guidance. 9 hours rest (penalty 60) is better than 5 hours rest (penalty 300).
Hard Constraint: One Shift Per Day
Business rule: “Employees can work at most one shift per calendar day.”
let one_per_day = factory
.clone()
.for_each_unique_pair(
|s: &EmployeeSchedule| s.shifts.as_slice(),
joiner::equal(|shift: &Shift| (shift.employee_idx, shift.date())),
)
.filter(|a: &Shift, b: &Shift| a.employee_idx.is_some() && b.employee_idx.is_some())
.penalize(HardSoftDecimalScore::ONE_HARD)
.as_constraint("One shift per day");
Key pattern — tuple joiner:
joiner::equal(|shift: &Shift| (shift.employee_idx, shift.date()))
The joiner matches on a tuple (Option<usize>, NaiveDate) — same employee AND same date.
Hard Constraint: Unavailable Employee
Business rule: “Employees cannot work on days they marked as unavailable.”
let unavailable = factory
.clone()
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.join(
|s: &EmployeeSchedule| s.employees.as_slice(),
equal_bi(
|shift: &Shift| shift.employee_idx,
|emp: &Employee| Some(emp.index),
),
)
.flatten_last(
|emp: &Employee| emp.unavailable_days.as_slice(),
|date: &NaiveDate| *date, // C → index key
|shift: &Shift| shift.date(), // A → lookup key
)
.filter(|shift: &Shift, date: &NaiveDate| {
shift.employee_idx.is_some() && shift_date_overlap_minutes(shift, *date) > 0
})
.penalize_hard_with(|shift: &Shift, date: &NaiveDate| {
HardSoftDecimalScore::of_hard_scaled(shift_date_overlap_minutes(shift, *date) * 100000)
})
.as_constraint("Unavailable employee");
The flatten_last Operation:
.flatten_last(
|emp: &Employee| emp.unavailable_days.as_slice(), // Collection to flatten
|date: &NaiveDate| *date, // Index key extractor
|shift: &Shift| shift.date(), // Lookup key extractor
)
This is a powerful pattern unique to SolverForge’s fluent API:
- Takes the last element of the current tuple (the
Employee) - Flattens their
unavailable_days collection - Pre-indexes by the date key
- On lookup, finds matching dates in O(1) using the shift’s date
Why sorted Vecs? The flatten_last operation uses binary search internally, requiring sorted input. That’s why Employee::finalize() sorts the date vectors.
Soft Constraint: Undesired Days
Business rule: “Prefer not to schedule employees on days they marked as undesired.”
let undesired = factory
.clone()
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.join(
|s: &EmployeeSchedule| s.employees.as_slice(),
equal_bi(
|shift: &Shift| shift.employee_idx,
|emp: &Employee| Some(emp.index),
),
)
.flatten_last(
|emp: &Employee| emp.undesired_days.as_slice(),
|date: &NaiveDate| *date,
|shift: &Shift| shift.date(),
)
.filter(|shift: &Shift, _date: &NaiveDate| shift.employee_idx.is_some())
.penalize(HardSoftDecimalScore::ONE_SOFT)
.as_constraint("Undesired day for employee");
Key difference: Uses ONE_SOFT instead of ONE_HARD. The solver will try to avoid undesired days but may violate this if necessary.
Soft Constraint: Desired Days (Reward)
Business rule: “Prefer to schedule employees on days they marked as desired.”
let desired = factory
.clone()
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.join(
|s: &EmployeeSchedule| s.employees.as_slice(),
equal_bi(
|shift: &Shift| shift.employee_idx,
|emp: &Employee| Some(emp.index),
),
)
.flatten_last(
|emp: &Employee| emp.desired_days.as_slice(),
|date: &NaiveDate| *date,
|shift: &Shift| shift.date(),
)
.filter(|shift: &Shift, _date: &NaiveDate| shift.employee_idx.is_some())
.reward(HardSoftDecimalScore::ONE_SOFT)
.as_constraint("Desired day for employee");
Key difference: Uses .reward() instead of .penalize(). Rewards increase the score.
Soft Constraint: Load Balancing
Business rule: “Distribute shifts fairly across employees.”
let balanced = factory
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.balance(|shift: &Shift| shift.employee_idx)
.penalize(HardSoftDecimalScore::of_soft(1))
.as_constraint("Balance employee assignments");
The balance() Operation:
This is the simplest and most powerful load balancing pattern:
- Groups shifts by the grouping key (employee index)
- Calculates standard deviation incrementally
- Penalizes based on unfairness metric
Unlike manual group_by + count + math, the balance() operation:
- Maintains O(1) incremental updates during solving
- Handles edge cases (empty groups, single element)
- Provides mathematically sound fairness calculation
The Solver Engine
Configuration via Derive Macros
Unlike configuration files, SolverForge uses compile-time configuration through derive macros:
#[planning_solution]
#[basic_variable_config(
entity_collection = "shifts",
variable_field = "employee_idx",
variable_type = "usize",
value_range = "employees"
)]
#[solverforge_constraints_path = "crate::constraints::create_fluent_constraints"]
pub struct EmployeeSchedule { ... }
This generates:
- The
Solvable trait implementation - Variable descriptor metadata
- Move selector configuration
- Constraint factory wiring
The Solvable Trait
The derive macro implements Solvable:
// Generated by #[planning_solution]
impl Solvable for EmployeeSchedule {
type Score = HardSoftDecimalScore;
fn solve(self, config: Option<SolverConfig>, callback: Sender<...>) {
// Metaheuristic search loop
}
}
Starting the Solver
In src/api.rs:
async fn create_schedule(
State(state): State<Arc<AppState>>,
Json(dto): Json<ScheduleDto>,
) -> String {
let id = Uuid::new_v4().to_string();
let schedule = dto.to_domain();
// Store initial state
{
let mut jobs = state.jobs.write();
jobs.insert(id.clone(), SolveJob {
solution: schedule.clone(),
solver_status: "SOLVING".to_string(),
});
}
// Create channel for solution updates
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// Spawn async task to receive updates
tokio::spawn(async move {
while let Some((solution, _score)) = rx.recv().await {
// Update stored solution
}
});
// Spawn solver on rayon thread pool
rayon::spawn(move || {
schedule.solve(None, tx);
});
id
}
Architecture notes:
- Solving runs on
rayon thread pool (CPU-bound work) - Updates sent via
tokio::sync::mpsc channel - Async Axum handler for non-blocking HTTP
parking_lot::RwLock for thread-safe state access
TypedScoreDirector for Analysis
For score breakdown without solving:
async fn analyze_schedule(Json(dto): Json<ScheduleDto>) -> Json<AnalyzeResponse> {
let schedule = dto.to_domain();
let constraints = create_fluent_constraints();
let director = TypedScoreDirector::new(schedule, constraints);
let score = director.get_score();
let analyses = director
.constraints()
.evaluate_detailed(director.working_solution());
// Convert to DTO...
}
The TypedScoreDirector:
- Evaluates all constraints against a solution
- Returns detailed match information per constraint
- No actual solving — just score calculation
Web Interface and API
REST API Endpoints
The API is built with Axum (src/api.rs):
pub fn router(state: Arc<AppState>) -> Router {
Router::new()
.route("/health", get(health))
.route("/healthz", get(health))
.route("/info", get(info))
.route("/demo-data", get(list_demo_data))
.route("/demo-data/{id}", get(get_demo_data))
.route("/schedules", post(create_schedule))
.route("/schedules", get(list_schedules))
.route("/schedules/analyze", put(analyze_schedule))
.route("/schedules/{id}", get(get_schedule))
.route("/schedules/{id}/status", get(get_schedule_status))
.route("/schedules/{id}", delete(stop_solving))
.with_state(state)
}
GET /demo-data
Returns available demo datasets:
GET /demo-data/{id}
Generates and returns sample data:
{
"employees": [
{
"name": "Amy Cole",
"skills": ["Doctor", "Cardiology"],
"unavailableDates": ["2026-01-25"],
"undesiredDates": ["2026-01-26"],
"desiredDates": ["2026-01-27"]
}
],
"shifts": [
{
"id": "0",
"start": "2026-01-25T06:00:00",
"end": "2026-01-25T14:00:00",
"location": "Ambulatory care",
"requiredSkill": "Doctor",
"employee": null
}
]
}
POST /schedules
Submit a schedule to solve. Returns job ID:
"a1b2c3d4-e5f6-7890-abcd-ef1234567890"
GET /schedules/{id}
Get current solution state:
{
"employees": [...],
"shifts": [...],
"score": "0hard/-12soft",
"solverStatus": "SOLVING"
}
DELETE /schedules/{id}
Stop solving early. Returns 204 No Content.
PUT /schedules/analyze
Analyze a schedule without solving:
{
"score": "-2hard/-45soft",
"constraints": [
{
"name": "Required skill",
"constraintType": "hard",
"weight": "1hard",
"score": "-2hard",
"matches": [
{
"score": "-1hard",
"justification": "Shift 5 assigned to Amy Cole without required skill"
}
]
}
]
}
Server Entry Point
src/main.rs:
#[tokio::main]
async fn main() {
solverforge::console::init();
let state = Arc::new(api::AppState::new());
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let static_path = if PathBuf::from("examples/employee-scheduling/static").exists() {
"examples/employee-scheduling/static"
} else {
"static"
};
let app = api::router(state)
.fallback_service(ServeDir::new(static_path))
.layer(cors);
let addr = SocketAddr::from(([0, 0, 0, 0], 7860));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
println!("Server running at http://localhost:{}", addr.port());
axum::serve(listener, app).await.unwrap();
}
Making Your First Customization
Let’s modify an existing constraint to understand the pattern.
Adjusting Constraint Weights
The balancing constraint currently uses a weight of 1:
let balanced = factory
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.balance(|shift: &Shift| shift.employee_idx)
.penalize(HardSoftDecimalScore::of_soft(1)) // Weight: 1
.as_constraint("Balance employee assignments");
To make fairness more important relative to other soft constraints:
.penalize(HardSoftDecimalScore::of_soft(10)) // Weight: 10
Now each unit of imbalance costs 10 soft points instead of 1, making the solver prioritize fair distribution over other soft preferences.
Adding a New Hard Constraint
Let’s add: “No employee can work more than 5 shifts total.”
In src/constraints.rs, add the constraint:
use solverforge::stream::collector::count;
// Inside create_fluent_constraints()
let max_shifts = factory
.clone()
.for_each(|s: &EmployeeSchedule| s.shifts.as_slice())
.filter(|shift: &Shift| shift.employee_idx.is_some())
.group_by(|shift: &Shift| shift.employee_idx, count())
.penalize_hard_with(|shift_count: &usize| {
if *shift_count > 5 {
HardSoftDecimalScore::of_hard((*shift_count - 5) as i64)
} else {
HardSoftDecimalScore::ZERO
}
})
.as_constraint("Max 5 shifts per employee");
Then add it to the return tuple:
(
required_skill,
no_overlap,
at_least_10_hours,
one_per_day,
unavailable,
undesired,
desired,
balanced,
max_shifts, // Add here
)
Rebuild and test:
cargo build --release
cargo run --release
Advanced Constraint Patterns
Zero-Erasure Architecture
SolverForge’s constraints are fully monomorphized — every generic parameter resolves to concrete types at compile time:
// This type is FULLY concrete at compile time:
ConstraintStream<
EmployeeSchedule,
HardSoftDecimalScore,
(Shift, Employee), // Tuple of concrete types
JoinedMatcher<...> // Concrete matcher type
>
No Box<dyn Constraint>, no Arc<dyn Stream>, no vtable dispatch. The compiler sees the entire constraint graph and can inline, vectorize, and optimize aggressively.
Index-Based References
The pattern of using indices instead of cloned values:
// In Shift
pub employee_idx: Option<usize> // Index into employees array
// In constraint - O(1) lookup
.filter(|shift: &Shift, emp: &Employee| {
shift.employee_idx == Some(emp.index)
})
Benefits:
- Integer comparison vs string comparison
- No allocation during constraint evaluation
- Cache-friendly memory access patterns
The flatten_last Pattern
For constraints involving collections on joined entities:
.join(employees, equal_bi(...))
.flatten_last(
|emp| emp.unavailable_days.as_slice(), // What to flatten
|date| *date, // Index key
|shift| shift.date(), // Lookup key
)
This creates an indexed structure during stream setup, enabling O(1) lookups during constraint evaluation.
Custom Penalty Functions
Variable penalties guide the solver more effectively:
.penalize_hard_with(|a: &Shift, b: &Shift| {
let overlap = overlap_minutes(a, b);
HardSoftDecimalScore::of_hard_scaled(overlap * 100000)
})
The 100000 multiplier ensures minute-level granularity affects the score meaningfully compared to unit penalties.
Quick Reference
File Locations
| Need to… | Edit this file |
|---|
| Add/change business rule | src/constraints.rs |
| Add field to Employee/Shift | src/domain.rs + src/dto.rs |
| Change API endpoints | src/api.rs |
| Change demo data | src/demo_data.rs |
| Change UI | static/index.html, static/app.js |
Common Constraint Patterns
Unary constraint (examine one entity):
factory.for_each(|s| s.shifts.as_slice())
.filter(|shift| /* condition */)
.penalize(HardSoftDecimalScore::ONE_HARD)
Binary constraint with join:
factory.for_each(|s| s.shifts.as_slice())
.join(|s| s.employees.as_slice(), equal_bi(...))
.filter(|shift, emp| /* condition */)
.penalize(...)
Unique pairs (same collection):
factory.for_each_unique_pair(
|s| s.shifts.as_slice(),
joiner::equal(|shift| shift.employee_idx),
)
Flatten collections:
.flatten_last(
|emp| emp.dates.as_slice(),
|date| *date,
|shift| shift.date(),
)
Load balancing:
factory.for_each(|s| s.shifts.as_slice())
.balance(|shift| shift.employee_idx)
.penalize(HardSoftDecimalScore::of_soft(1))
Python → Rust Translation
| Python | Rust |
|---|
@dataclass | #[derive(...)] struct |
@planning_entity decorator | #[planning_entity] derive macro |
PlanningId annotation | #[planning_id] attribute |
PlanningVariable annotation | #[planning_variable] attribute |
constraint_factory.for_each(Shift) | factory.for_each(|s| s.shifts.as_slice()) |
Joiners.equal(lambda: ...) | joiner::equal(|x| x.field) |
lambda shift: shift.employee | |shift: &Shift| shift.employee_idx |
| FastAPI server | Axum server |
pip install | cargo build |
Debugging Tips
Enable verbose logging:
// In Cargo.toml
solverforge = { ..., features = ["verbose-logging"] }
Print in constraints (debug only):
.filter(|shift: &Shift, emp: &Employee| {
eprintln!("Checking shift {} with {}", shift.id, emp.name);
!emp.skills.contains(&shift.required_skill)
})
Use the analyze endpoint:
curl -X PUT http://localhost:7860/schedules/analyze \
-H "Content-Type: application/json" \
-d @schedule.json
Common Gotchas
Forgot to call finalize() on employees after construction
- Symptom:
flatten_last constraints don’t match anything
Index out of sync — employee indices don’t match array positions
- Always use
enumerate() when constructing employees
Missing factory.clone() — factory is consumed by each constraint
- Clone before each constraint chain
Forgot to add constraint to return tuple
- Constraint silently not evaluated
Using String instead of usize for references
- Performance degradation and allocation overhead
Additional Resources
2 - Employee Scheduling
A comprehensive quickstart guide to understanding and building intelligent employee scheduling with SolverForge
Legacy Implementation Guide
This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.
SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.
Table of Contents
- Introduction
- Getting Started
- The Problem We’re Solving
- Understanding the Data Model
- How Optimization Works
- Writing Constraints: The Business Rules
- The Solver Engine
- Web Interface and API
- Making Your First Customization
- Advanced Constraint Patterns
- Testing and Validation
- Quick Reference
Introduction
What You’ll Learn
This guide walks you through a complete employee scheduling application built with SolverForge, a constraint-based optimization framework. You’ll learn:
- How to model real-world scheduling problems as optimization problems
- How to express business rules as constraints that guide the solution
- How optimization algorithms find high-quality solutions automatically
- How to customize the system for your specific needs
No optimization background required — we’ll explain concepts as we encounter them in the code.
Architecture Note: This guide uses the “fast” implementation pattern with dataclass domain models and Pydantic only at API boundaries. For the architectural reasoning behind this design, see Dataclasses vs Pydantic in Constraint Solvers.
Prerequisites
- Basic Python knowledge (classes, functions, type annotations)
- Familiarity with REST APIs
- Comfort with command-line operations
What is Constraint-Based Optimization?
Traditional programming: You write explicit logic that says “do this, then that.”
Constraint-based optimization: You describe what a good solution looks like and the solver figures out how to achieve it.
Think of it like describing what puzzle pieces you have and what rules they must follow — then having a computer try millions of arrangements per second to find the best fit.
Getting Started
Running the Application
Download and navigate to the project directory:
git clone https://github.com/SolverForge/solverforge-quickstarts
cd ./solverforge-quickstarts/legacy/employee-scheduling-fast
Create and activate virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
Install the package:
Start the server:
Open your browser:
http://localhost:8080
You’ll see a scheduling interface with employees, shifts and a “Solve” button. Click it and watch the solver automatically assign employees to shifts while respecting business rules.
File Structure Overview
legacy/employee-scheduling-fast/
├── src/employee_scheduling/
│ ├── domain.py # Data classes (Employee, Shift, Schedule)
│ ├── constraints.py # Business rules (90% of customization happens here)
│ ├── solver.py # Solver configuration
│ ├── demo_data.py # Sample data generation
│ ├── rest_api.py # HTTP API endpoints
│ ├── converters.py # REST ↔ Domain model conversion
│ ├── json_serialization.py # JSON helpers
│ └── score_analysis.py # Score breakdown DTOs
├── static/
│ ├── index.html # Web UI
│ └── app.js # UI logic and visualization
└── tests/
├── test_constraints.py # Unit tests for constraints
└── test_feasible.py # Integration tests
Key insight: Most business customization happens in constraints.py alone. You rarely need to modify other files.
The Problem We’re Solving
The Scheduling Challenge
You need to assign employees to shifts while satisfying rules like:
Hard constraints (must be satisfied):
- Employee must have the required skill for the shift
- Employee can’t work overlapping shifts
- Employee needs 10 hours rest between shifts
- Employee can’t work more than one shift per day
- Employee can’t work on days they’re unavailable
Soft constraints (preferences to optimize):
- Avoid scheduling on days the employee marked as “undesired”
- Prefer scheduling on days the employee marked as “desired”
- Balance workload fairly across all employees
Why This is Hard
For even 20 shifts and 10 employees, there are 10^20 possible assignments (100 quintillion). A human can’t evaluate them all. Even a computer trying random assignments would take years.
Optimization algorithms use smart strategies to explore this space efficiently, finding high-quality solutions in seconds.
Understanding the Data Model
Let’s examine the three core classes that model our problem. Open src/employee_scheduling/domain.py:
Domain Model Architecture
This quickstart separates domain models (dataclasses) from API models (Pydantic):
- Domain layer (
domain.py lines 17-39): Pure @dataclass models for solver operations - API layer (
domain.py lines 46-75): Pydantic BaseModel classes for REST endpoints - Converters (
converters.py): Translate between the two layers
*This separation provides better performance during solving—Pydantic’s validation overhead becomes expensive when constraints are evaluated millions of times per second. See the architecture article for benchmark comparisons. Note that while benchmarks on small problems show comparable iteration counts between Python and Java, the JPype bridge overhead may compound at larger scales.
The Employee Class
@dataclass
class Employee:
name: Annotated[str, PlanningId]
skills: set[str] = field(default_factory=set)
unavailable_dates: set[date] = field(default_factory=set)
undesired_dates: set[date] = field(default_factory=set)
desired_dates: set[date] = field(default_factory=set)
What it represents: A person who can be assigned to shifts.
Key fields:
name: Unique identifier (the PlanningId annotation tells SolverForge this is the primary key)skills: What skills this employee possesses (e.g., {"Doctor", "Cardiology"})unavailable_dates: Days the employee absolutely cannot work (hard constraint)undesired_dates: Days the employee prefers not to work (soft constraint)desired_dates: Days the employee wants to work (soft constraint)
Optimization concept: These availability fields demonstrate hard vs soft constraints. Unavailable is non-negotiable; undesired is a preference the solver will try to honor but may violate if necessary.
The Shift Class (Planning Entity)
@planning_entity
@dataclass
class Shift:
id: Annotated[str, PlanningId]
start: datetime
end: datetime
location: str
required_skill: str
employee: Annotated[Employee | None, PlanningVariable] = None
What it represents: A time slot that needs an employee assignment.
Key fields:
id: Unique identifierstart/end: When the shift occurslocation: Where the work happensrequired_skill: What skill is needed (must match employee’s skills)employee: The assignment decision — this is what the solver optimizes!
Important annotations:
@planning_entity: Tells SolverForge this class contains decisions to makePlanningVariable: Marks employee as the decision variable
Optimization concept: This is a planning variable — the value the solver assigns. Each shift starts with employee=None (unassigned). The solver tries different employee assignments, evaluating each according to your constraints.
The EmployeeSchedule Class (Planning Solution)
@planning_solution
@dataclass
class EmployeeSchedule:
employees: Annotated[list[Employee], ProblemFactCollectionProperty, ValueRangeProvider]
shifts: Annotated[list[Shift], PlanningEntityCollectionProperty]
score: Annotated[HardSoftDecimalScore | None, PlanningScore] = None
solver_status: SolverStatus = SolverStatus.NOT_SOLVING
What it represents: The complete problem and its solution.
Key fields:
employees: All available employees (these are the possible values for assignments)shifts: All shifts that need assignment (the planning entities)score: Solution quality metric (calculated by constraints)solver_status: Whether solving is active
Annotations explained:
@planning_solution: Marks this as the top-level problem definitionProblemFactCollectionProperty: Immutable data (doesn’t change during solving)PlanningEntityCollectionProperty: The entities being optimizedValueRangeProvider: Tells the solver which employees can be assigned to shiftsPlanningScore: Where the solver stores the calculated score
Optimization concept: This demonstrates the declarative modeling approach. You describe the problem structure (what can be assigned to what) and the solver handles the search process.
How Optimization Works
Before diving into constraints, let’s understand how the solver finds solutions.
The Search Process (Simplified)
- Start with an initial solution (often random or all unassigned)
- Evaluate the score using your constraint functions
- Make a small change (assign a different employee to one shift)
- Evaluate the new score
- Keep the change if it improves the score (with some controlled randomness)
- Repeat millions of times in seconds
- Return the best solution found
Timefold (the engine that powers SolverForge) uses sophisticated metaheuristic algorithms like:
- Tabu Search: Remembers recent moves to avoid cycling
- Simulated Annealing: Occasionally accepts worse solutions to escape local optima
- Late Acceptance: Compares current solution to recent history, not just the immediate previous
These techniques efficiently explore the massive solution space without getting stuck.
The Score: How “Good” is a Solution?
Every solution gets a score with two parts:
0hard/-45soft
- Hard score: Counts hard constraint violations (must be 0 for a valid solution)
- Soft score: Counts soft constraint violations/rewards (higher is better)
Scoring rules:
- Hard score must be 0 or positive (negative = invalid/infeasible)
- Among valid solutions (hard score = 0), highest soft score wins
- Hard score always takes priority over soft score
Optimization concept: This is multi-objective optimization with a lexicographic ordering. We absolutely prioritize hard constraints, then optimize soft ones.
Writing Constraints: The Business Rules
Now the heart of the system. Open src/employee_scheduling/constraints.py.
The Constraint Provider Pattern
All constraints are registered in one function:
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
return [
# Hard constraints
required_skill(constraint_factory),
no_overlapping_shifts(constraint_factory),
at_least_10_hours_between_two_shifts(constraint_factory),
one_shift_per_day(constraint_factory),
unavailable_employee(constraint_factory),
# Soft constraints
undesired_day_for_employee(constraint_factory),
desired_day_for_employee(constraint_factory),
balance_employee_shift_assignments(constraint_factory),
]
Each constraint is a function returning a Constraint object. Let’s examine them from simple to complex.
Domain Model Methods for Constraints
The Shift class in domain.py includes helper methods that support datetime calculations used by multiple constraints. Following object-oriented design principles, these methods are part of the domain model rather than standalone functions:
def has_required_skill(self) -> bool:
"""Check if assigned employee has the required skill."""
if self.employee is None:
return False
return self.required_skill in self.employee.skills
def is_overlapping_with_date(self, dt: date) -> bool:
"""Check if shift overlaps with a specific date."""
return self.start.date() == dt or self.end.date() == dt
These methods encapsulate shift-related logic within the domain model, making constraints more readable and maintainable.
Implementation Note: For datetime overlap calculations in constraint penalty lambdas, the codebase uses inline calculations with explicit time(0, 0, 0) and time(23, 59, 59) rather than calling domain methods. This avoids transpilation issues with datetime.min.time() and datetime.max.time() in the constraint stream API.
Hard Constraint: Required Skill
Business rule: “An employee assigned to a shift must have the required skill.”
def required_skill(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Shift)
.filter(lambda shift: not shift.has_required_skill())
.penalize(HardSoftDecimalScore.ONE_HARD)
.as_constraint("Missing required skill")
)
How to read this:
for_each(Shift): Consider every shift in the schedule.filter(...): Keep only shifts where the employee lacks the required skill.penalize(ONE_HARD): Each violation subtracts 1 from the hard score.as_constraint(...): Give it a name for debugging
Optimization concept: This is a unary constraint — it examines one entity at a time. The filter identifies violations and the penalty quantifies the impact.
Note: There’s no null check for shift.employee because constraints are only evaluated on complete assignments during the scoring phase.
Hard Constraint: No Overlapping Shifts
Business rule: “An employee can’t work two shifts that overlap in time.”
def no_overlapping_shifts(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each_unique_pair(
Shift,
Joiners.equal(lambda shift: shift.employee.name),
Joiners.overlapping(lambda shift: shift.start, lambda shift: shift.end),
)
.penalize(HardSoftDecimalScore.ONE_HARD, get_minute_overlap)
.as_constraint("Overlapping shift")
)
How to read this:
for_each_unique_pair(Shift, ...): Create pairs of shiftsJoiners.equal(lambda shift: shift.employee.name): Only pair shifts assigned to the same employeeJoiners.overlapping(...): Only pair shifts that overlap in time.penalize(ONE_HARD, get_minute_overlap): Penalize by the number of overlapping minutes
Optimization concept: This is a binary constraint — it examines pairs of entities. The for_each_unique_pair ensures we don’t count each violation twice (e.g., comparing shift A to B and B to A).
Helper function:
def get_minute_overlap(shift1: Shift, shift2: Shift) -> int:
return (
min(shift1.end, shift2.end) - max(shift1.start, shift2.start)
).total_seconds() // 60
Why penalize by minutes? This creates a graded penalty. A 5-minute overlap is less bad than a 5-hour overlap, giving the solver better guidance.
Hard Constraint: Rest Between Shifts
Business rule: “Employees need at least 10 hours rest between shifts.”
def at_least_10_hours_between_two_shifts(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Shift)
.join(
Shift,
Joiners.equal(lambda shift: shift.employee.name),
Joiners.less_than_or_equal(
lambda shift: shift.end, lambda shift: shift.start
),
)
.filter(
lambda first_shift, second_shift: (
second_shift.start - first_shift.end
).total_seconds() // (60 * 60) < 10
)
.penalize(
HardSoftDecimalScore.ONE_HARD,
lambda first_shift, second_shift: 600 - (
(second_shift.start - first_shift.end).total_seconds() // 60
),
)
.as_constraint("At least 10 hours between 2 shifts")
)
How to read this:
for_each(Shift): Start with all shifts.join(Shift, ...): Pair with other shiftsJoiners.equal(...): Same employeeJoiners.less_than_or_equal(...): First shift ends before or when second starts (ensures ordering).filter(...): Keep only pairs with less than 10 hours gap.penalize(...): Penalize by 600 - actual_minutes (the deficit from required 10 hours)
Optimization concept: The penalty function 600 - actual_minutes creates incremental guidance. 9 hours rest (penalty 60) is better than 5 hours rest (penalty 300), helping the solver navigate toward feasibility.
Hard Constraint: One Shift Per Day
Business rule: “Employees can work at most one shift per calendar day.”
def one_shift_per_day(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each_unique_pair(
Shift,
Joiners.equal(lambda shift: shift.employee.name),
Joiners.equal(lambda shift: shift.start.date()),
)
.penalize(HardSoftDecimalScore.ONE_HARD)
.as_constraint("Max one shift per day")
)
How to read this:
for_each_unique_pair(Shift, ...): Create pairs of shifts- First joiner: Same employee
- Second joiner: Same date (
shift.start.date() extracts calendar day) - Each pair found is a violation
Hard Constraint: Unavailable Dates
Business rule: “Employees cannot work on days they marked as unavailable.”
from datetime import time
def unavailable_employee(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Shift)
.join(
Employee,
Joiners.equal(lambda shift: shift.employee, lambda employee: employee),
)
.flatten_last(lambda employee: employee.unavailable_dates)
.filter(lambda shift, unavailable_date: is_overlapping_with_date(shift, unavailable_date))
.penalize(
HardSoftDecimalScore.ONE_HARD,
lambda shift, unavailable_date: int((
min(shift.end, datetime.combine(unavailable_date, time(23, 59, 59)))
- max(shift.start, datetime.combine(unavailable_date, time(0, 0, 0)))
).total_seconds() / 60),
)
.as_constraint("Unavailable employee")
)
How to read this:
for_each(Shift): All shifts.join(Employee, ...): Join with the assigned employee.flatten_last(lambda employee: employee.unavailable_dates): Expand each employee’s unavailable_dates set.filter(...): Keep only when shift overlaps the unavailable date.penalize(...): Penalize by overlapping duration in minutes (calculated inline)
Optimization concept: The flatten_last operation demonstrates constraint streaming with collections. We iterate over each date in the employee’s unavailable set, creating (shift, date) pairs to check.
Why inline calculation? The penalty lambda uses explicit time(0, 0, 0) and time(23, 59, 59) rather than datetime.min.time() or calling domain methods. This is required because certain datetime methods don’t transpile correctly to the constraint stream engine.
Soft Constraint: Undesired Days
Business rule: “Prefer not to schedule employees on days they marked as undesired.”
def undesired_day_for_employee(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Shift)
.join(
Employee,
Joiners.equal(lambda shift: shift.employee, lambda employee: employee),
)
.flatten_last(lambda employee: employee.undesired_dates)
.filter(lambda shift, undesired_date: shift.is_overlapping_with_date(undesired_date))
.penalize(
HardSoftDecimalScore.ONE_SOFT,
lambda shift, undesired_date: int((
min(shift.end, datetime.combine(undesired_date, time(23, 59, 59)))
- max(shift.start, datetime.combine(undesired_date, time(0, 0, 0)))
).total_seconds() / 60),
)
.as_constraint("Undesired day for employee")
)
Key difference from hard constraints: Uses ONE_SOFT instead of ONE_HARD.
Optimization concept: The solver will try to avoid undesired days but may violate this if necessary to satisfy hard constraints or achieve better overall soft score.
Soft Constraint: Desired Days (Reward)
Business rule: “Prefer to schedule employees on days they marked as desired.”
def desired_day_for_employee(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Shift)
.join(
Employee,
Joiners.equal(lambda shift: shift.employee, lambda employee: employee),
)
.flatten_last(lambda employee: employee.desired_dates)
.filter(lambda shift, desired_date: shift.is_overlapping_with_date(desired_date))
.reward(
HardSoftDecimalScore.ONE_SOFT,
lambda shift, desired_date: int((
min(shift.end, datetime.combine(desired_date, time(23, 59, 59)))
- max(shift.start, datetime.combine(desired_date, time(0, 0, 0)))
).total_seconds() / 60),
)
.as_constraint("Desired day for employee")
)
Key difference: Uses .reward() instead of .penalize().
Optimization concept: Rewards increase the score instead of decreasing it. This constraint actively pulls the solution toward desired assignments.
Soft Constraint: Load Balancing
Business rule: “Distribute shifts fairly across employees.”
def balance_employee_shift_assignments(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Shift)
.group_by(lambda shift: shift.employee, ConstraintCollectors.count())
.complement(Employee, lambda e: 0)
.group_by(
ConstraintCollectors.load_balance(
lambda employee, shift_count: employee,
lambda employee, shift_count: shift_count,
)
)
.penalize_decimal(
HardSoftDecimalScore.ONE_SOFT,
lambda load_balance: load_balance.unfairness(),
)
.as_constraint("Balance employee shift assignments")
)
How to read this:
for_each(Shift): All shifts.group_by(..., ConstraintCollectors.count()): Count shifts per employee.complement(Employee, lambda e: 0): Include employees with 0 shifts.group_by(ConstraintCollectors.load_balance(...)): Calculate fairness metric.penalize_decimal(..., unfairness()): Penalize by the unfairness amount
Optimization concept: This uses a sophisticated load balancing collector that calculates variance/unfairness in workload distribution. It’s more nuanced than simple quadratic penalties — it measures how far the distribution is from perfectly balanced.
Why complement? Without it, employees with zero shifts wouldn’t appear in the grouping, skewing the fairness calculation.
The Solver Engine
Now let’s see how the solver is configured. Open src/employee_scheduling/solver.py:
solver_config = SolverConfig(
solution_class=EmployeeSchedule,
entity_class_list=[Shift],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)
solver_manager = SolverManager.create(SolverFactory.create(solver_config))
solution_manager = SolutionManager.create(solver_manager)
Configuration Breakdown
solution_class: Your planning solution class (EmployeeSchedule)
entity_class_list: Planning entities to optimize ([Shift])
score_director_factory_config: Contains the constraint provider function
- Note: This is nested inside
ScoreDirectorFactoryConfig, not directly in SolverConfig
termination_config: When to stop solving
spent_limit=Duration(seconds=30): Stop after 30 seconds
SolverManager: Asynchronous Solving
SolverManager handles solving in the background without blocking your API:
# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, schedule, callback_function)
# Check status
status = solver_manager.get_status(job_id)
# Get current best solution
solution = solver_manager.get_solution(job_id)
# Stop early
solver_manager.terminate_early(job_id)
Optimization concept: Real-world problems may take minutes to hours. Anytime algorithms like metaheuristics continuously improve solutions over time, so you can stop whenever you’re satisfied with the quality.
Solving Timeline
Small problems (10-20 shifts, 5-10 employees):
- Initial valid solution: < 1 second
- Good solution: 5-10 seconds
- High-quality: 30 seconds
Medium problems (50-100 shifts, 20-30 employees):
- Initial valid solution: 1-5 seconds
- Good solution: 30-60 seconds
- High-quality: 5-10 minutes
Factors affecting speed:
- Number of employees × shifts (search space size)
- Constraint complexity
- How “tight” constraints are (fewer valid solutions = harder)
Web Interface and API
REST API Endpoints
Open src/employee_scheduling/rest_api.py to see the API:
GET /demo-data
Returns available demo datasets:
GET /demo-data/{dataset_id}
Generates and returns sample data:
{
"employees": [
{
"name": "Amy Cole",
"skills": ["Doctor", "Cardiology"],
"unavailableDates": ["2025-11-25"],
"undesiredDates": ["2025-11-26"],
"desiredDates": ["2025-11-27"]
}
],
"shifts": [
{
"id": "0",
"start": "2025-11-25T06:00:00",
"end": "2025-11-25T14:00:00",
"location": "Ambulatory care",
"requiredSkill": "Doctor",
"employee": null
}
]
}
Note: Field names use camelCase in JSON (REST API convention) but snake_case in Python (domain model). The converters.py handles this translation.
POST /schedules
Submit a schedule to solve:
Request body: Same format as demo-data response
Response: Job ID as plain text
"a1b2c3d4-e5f6-7890-abcd-ef1234567890"
Implementation:
@app.post("/schedules")
async def solve_timetable(schedule_model: EmployeeScheduleModel) -> str:
job_id = str(uuid4())
schedule = model_to_schedule(schedule_model)
data_sets[job_id] = schedule
solver_manager.solve_and_listen(
job_id,
schedule,
lambda solution: update_schedule(job_id, solution)
)
return job_id
Key detail: Uses solve_and_listen() with a callback that updates the stored solution in real-time as solving progresses.
GET /schedules/{problem_id}
Check solving status and get current solution:
Response (while solving):
{
"employees": [...],
"shifts": [...],
"score": "0hard/-45soft",
"solverStatus": "SOLVING_ACTIVE"
}
Response (finished):
{
"employees": [...],
"shifts": [...],
"score": "0hard/-12soft",
"solverStatus": "NOT_SOLVING"
}
GET /schedules
List all active job IDs:
Response:
["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "b2c3d4e5-f6a7-8901-bcde-f23456789012"]
GET /schedules/{problem_id}/status
Lightweight status check (score and solver status only):
Response:
{
"score": {
"hardScore": 0,
"softScore": -12
},
"solverStatus": "SOLVING_ACTIVE"
}
DELETE /schedules/{problem_id}
Stop solving early and return best solution found so far:
@app.delete("/schedules/{problem_id}")
async def stop_solving(problem_id: str) -> EmployeeScheduleModel:
solver_manager.terminate_early(problem_id)
return await get_timetable(problem_id)
PUT /schedules/analyze
Analyze a schedule without solving to understand constraint violations:
Request body: Same format as demo-data response
Response:
{
"constraints": [
{
"name": "Required skill",
"weight": "1hard",
"score": "-2hard",
"matches": [
{
"name": "Required skill",
"score": "-1hard",
"justification": "Shift(id=5) assigned to Employee(Amy Cole)"
}
]
}
]
}
Web UI Flow
The static/app.js implements this polling workflow:
- User opens page → Load demo data (
GET /demo-data/SMALL) - Display employees and shifts in timeline visualization
- User clicks “Solve” →
POST /schedules (get job ID back) - Poll
GET /schedules/{id} every 2 seconds - Update UI with latest assignments in real-time
- When
solverStatus === "NOT_SOLVING" → Stop polling - Display final score and solution
Visual feedback: The UI uses vis-timeline library to show:
- Shifts color-coded by availability (red=unavailable, orange=undesired, green=desired, blue=normal)
- Skills color-coded (red=missing skill, green=has skill)
- Two views: by employee and by location
Making Your First Customization
The quickstart includes an optional cardinality constraint that demonstrates a common pattern. Let’s understand how it works and then learn how to create similar constraints.
Understanding the Max Shifts Constraint
The codebase includes max_shifts_per_employee which limits workload imbalance. This constraint is disabled by default (commented out in define_constraints()) but serves as a useful example:
Business rule: “No employee can work more than 12 shifts in the schedule period.”
This is a hard constraint (must be satisfied when enabled).
The Constraint Implementation
This constraint is already in src/employee_scheduling/constraints.py:
def max_shifts_per_employee(constraint_factory: ConstraintFactory):
"""
Hard constraint: No employee can have more than 12 shifts.
The limit of 12 is chosen based on the demo data dimensions:
- SMALL dataset: 139 shifts / 15 employees = ~9.3 average
- This provides headroom while preventing extreme imbalance
Note: A limit that's too low (e.g., 5) would make the problem infeasible.
Always ensure your constraints are compatible with your data dimensions.
"""
return (
constraint_factory.for_each(Shift)
.group_by(lambda shift: shift.employee, ConstraintCollectors.count())
.filter(lambda employee, shift_count: shift_count > 12)
.penalize(
HardSoftDecimalScore.ONE_HARD,
lambda employee, shift_count: shift_count - 12,
)
.as_constraint("Max 12 shifts per employee")
)
How this works:
- Group shifts by employee and count them
- Filter to employees with more than 12 shifts
- Penalize by the excess amount (13 shifts = penalty 1, 14 shifts = penalty 2, etc.)
Why 12? The demo data has 139 shifts and 15 employees (~9.3 shifts per employee on average). A limit that’s too low (e.g., 5) would make the problem infeasible — there simply aren’t enough employees to cover all shifts. Always ensure your constraints are compatible with your problem’s dimensions.
How It’s Registered
To enable this constraint, uncomment it in define_constraints():
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
return [
# Hard constraints
required_skill(constraint_factory),
no_overlapping_shifts(constraint_factory),
at_least_10_hours_between_two_shifts(constraint_factory),
one_shift_per_day(constraint_factory),
unavailable_employee(constraint_factory),
# max_shifts_per_employee(constraint_factory), # ← Uncomment to enable
# Soft constraints
undesired_day_for_employee(constraint_factory),
desired_day_for_employee(constraint_factory),
balance_employee_shift_assignments(constraint_factory),
]
Experimenting With It
Try enabling and modifying the constraint to see its effect:
- Uncomment
max_shifts_per_employee(constraint_factory), in constraints.py - Change the limit from 12 to 8 if desired
- Restart the server:
python -m employee_scheduling.rest_api - Load demo data and click “Solve”
- Observe how the constraint affects the solution
Note: A very low limit (e.g., 5) will make the problem infeasible.
Why Unit Testing Constraints Matters
The quickstart includes unit tests in tests/test_constraints.py using ConstraintVerifier. Run them with:
pytest tests/test_constraints.py -v
Testing catches critical issues early. When we initially implemented this constraint with a limit of 5, the feasibility test (test_feasible.py) failed — the solver couldn’t find a valid solution because there weren’t enough employees to cover all shifts within that limit. Without tests, this would have silently broken the scheduling system. Always test new constraints — a typo in a filter or an overly restrictive limit can make your problem unsolvable.
Understanding What You Did
You just implemented a cardinality constraint — limiting the count of something. This pattern is extremely common in scheduling:
- Maximum hours per week
- Minimum shifts per employee
- Exact number of nurses per shift
The pattern is always:
- Group by what you’re counting
- Collect the count
- Filter by your limit
- Penalize/reward appropriately
Advanced Constraint Patterns
Pattern 1: Weighted Penalties
Scenario: Some skills are harder to staff — penalize their absence more heavily.
def preferred_skill_coverage(constraint_factory: ConstraintFactory):
"""
Soft constraint: Prefer specialized skills when available.
"""
SPECIALTY_SKILLS = {"Cardiology", "Anaesthetics", "Radiology"}
return (
constraint_factory.for_each(Shift)
.filter(lambda shift: shift.required_skill in SPECIALTY_SKILLS)
.filter(lambda shift: shift.required_skill in shift.employee.skills)
.reward(
HardSoftDecimalScore.of_soft(10), # 10x normal reward
)
.as_constraint("Preferred specialty coverage")
)
Optimization concept: Weighted constraints let you express relative importance. This rewards specialty matches 10 times more than standard matches.
Pattern 2: Conditional Constraints
Scenario: Night shifts (after 6 PM) require two employees at the same location.
def night_shift_minimum_staff(constraint_factory: ConstraintFactory):
"""
Hard constraint: Night shifts need at least 2 employees per location.
"""
def is_night_shift(shift: Shift) -> bool:
return shift.start.hour >= 18 # 6 PM or later
return (
constraint_factory.for_each(Shift)
.filter(is_night_shift)
.group_by(
lambda shift: (shift.start, shift.location),
ConstraintCollectors.count()
)
.filter(lambda timeslot_location, count: count < 2)
.penalize(
HardSoftDecimalScore.ONE_HARD,
lambda timeslot_location, count: 2 - count
)
.as_constraint("Night shift minimum 2 staff")
)
Pattern 3: Employee Pairing (Incompatibility)
Scenario: Certain employees shouldn’t work the same shift.
First, add the field to domain.py:
@dataclass
class Employee:
name: Annotated[str, PlanningId]
skills: set[str] = field(default_factory=set)
# ... existing fields ...
incompatible_with: set[str] = field(default_factory=set) # employee names
Then the constraint:
def avoid_incompatible_pairs(constraint_factory: ConstraintFactory):
"""
Hard constraint: Incompatible employees can't work overlapping shifts.
"""
return (
constraint_factory.for_each(Shift)
.join(
Shift,
Joiners.equal(lambda shift: shift.location),
Joiners.overlapping(lambda shift: shift.start, lambda shift: shift.end),
)
.filter(
lambda shift1, shift2:
shift2.employee.name in shift1.employee.incompatible_with
)
.penalize(HardSoftDecimalScore.ONE_HARD)
.as_constraint("Avoid incompatible pairs")
)
Pattern 4: Time-Based Accumulation
Scenario: Limit total hours worked per week.
def max_hours_per_week(constraint_factory: ConstraintFactory):
"""
Hard constraint: Maximum 40 hours per employee per week.
"""
def get_shift_hours(shift: Shift) -> float:
return (shift.end - shift.start).total_seconds() / 3600
def get_week(shift: Shift) -> int:
return shift.start.isocalendar()[1] # ISO week number
return (
constraint_factory.for_each(Shift)
.group_by(
lambda shift: (shift.employee, get_week(shift)),
ConstraintCollectors.sum(get_shift_hours)
)
.filter(lambda employee_week, total_hours: total_hours > 40)
.penalize(
HardSoftDecimalScore.ONE_HARD,
lambda employee_week, total_hours: int(total_hours - 40)
)
.as_constraint("Max 40 hours per week")
)
Optimization concept: This uses temporal aggregation — grouping by time periods (weeks) and summing durations. Common in workforce scheduling.
Testing and Validation
Unit Testing Constraints
Best practice: Test each constraint in isolation.
Create tests/test_my_constraints.py:
from datetime import datetime, date
from employee_scheduling.domain import Employee, Shift, EmployeeSchedule
from employee_scheduling.solver import solver_config
from solverforge_legacy.solver import SolverFactory
def test_max_shifts_constraint_violation():
"""Test that exceeding 5 shifts creates a hard constraint violation."""
employee = Employee(
name="Test Employee",
skills={"Doctor"}
)
# Create 6 shifts assigned to same employee
shifts = []
for i in range(6):
shifts.append(Shift(
id=str(i),
start=datetime(2025, 11, 25 + i, 9, 0),
end=datetime(2025, 11, 25 + i, 17, 0),
location="Test Location",
required_skill="Doctor",
employee=employee
))
schedule = EmployeeSchedule(
employees=[employee],
shifts=shifts
)
# Score the solution
solver_factory = SolverFactory.create(solver_config)
score_director = solver_factory.get_score_director_factory().build_score_director()
score_director.set_working_solution(schedule)
score = score_director.calculate_score()
# Verify hard constraint violation
assert score.hard_score == -1, f"Expected -1 hard score, got {score.hard_score}"
def test_max_shifts_constraint_satisfied():
"""Test that 5 or fewer shifts doesn't violate constraint."""
employee = Employee(
name="Test Employee",
skills={"Doctor"}
)
# Create only 5 shifts
shifts = []
for i in range(5):
shifts.append(Shift(
id=str(i),
start=datetime(2025, 11, 25 + i, 9, 0),
end=datetime(2025, 11, 25 + i, 17, 0),
location="Test Location",
required_skill="Doctor",
employee=employee
))
schedule = EmployeeSchedule(
employees=[employee],
shifts=shifts
)
solver_factory = SolverFactory.create(solver_config)
score_director = solver_factory.get_score_director_factory().build_score_director()
score_director.set_working_solution(schedule)
score = score_director.calculate_score()
# No violation from this constraint (may have soft penalties from balancing, etc.)
assert score.hard_score >= -0, f"Expected non-negative hard score, got {score.hard_score}"
Run with:
pytest tests/test_my_constraints.py -v
Integration Testing: Full Solve
Test the complete solving cycle in tests/test_feasible.py:
import time
from employee_scheduling.demo_data import DemoData, generate_demo_data
from employee_scheduling.solver import solver_manager
from solverforge_legacy.solver import SolverStatus
def test_solve_small_dataset():
"""Test that solver finds a feasible solution for small dataset."""
# Generate problem
schedule = generate_demo_data(DemoData.SMALL)
# Verify initially unassigned
assert all(shift.employee is None for shift in schedule.shifts)
# Solve
job_id = "test-job"
solver_manager.solve(job_id, schedule)
# Wait for completion (with timeout)
timeout_seconds = 60
start_time = time.time()
while solver_manager.get_solver_status(job_id) != SolverStatus.NOT_SOLVING:
if time.time() - start_time > timeout_seconds:
solver_manager.terminate_early(job_id)
break
time.sleep(1)
# Get solution
solution = solver_manager.get_solution(job_id)
# Verify all shifts assigned
assert all(shift.employee is not None for shift in solution.shifts), \
"Not all shifts were assigned"
# Verify feasible (hard score = 0)
assert solution.score.hard_score == 0, \
f"Solution is infeasible with hard score {solution.score.hard_score}"
print(f"Final score: {solution.score}")
Manual Testing via UI
- Start the application:
python -m employee_scheduling.rest_api - Open browser console (F12) to see API calls
- Load “SMALL” demo data
- Verify data displays correctly (employees with skills, shifts unassigned)
- Click “Solve” and watch:
- Score improving in real-time
- Shifts getting assigned (colored by availability)
- Final hard score reaches 0
- Manually verify constraint satisfaction:
- Check that assigned employees have required skills (green badges)
- Verify no overlapping shifts (timeline shouldn’t show overlaps)
- Confirm unavailable days are respected (no shifts on red-highlighted dates)
Quick Reference
File Locations
| Need to… | Edit this file |
|---|
| Add/change business rule | src/employee_scheduling/constraints.py |
| Add field to Employee | src/employee_scheduling/domain.py + converters.py |
| Add field to Shift | src/employee_scheduling/domain.py + converters.py |
| Change solve time | src/employee_scheduling/solver.py |
| Add REST endpoint | src/employee_scheduling/rest_api.py |
| Change demo data | src/employee_scheduling/demo_data.py |
| Change UI | static/index.html, static/app.js |
Common Constraint Patterns
Unary constraint (examine one entity):
constraint_factory.for_each(Shift)
.filter(lambda shift: # condition)
.penalize(HardSoftDecimalScore.ONE_HARD)
Binary constraint (examine pairs):
constraint_factory.for_each_unique_pair(
Shift,
Joiners.equal(lambda shift: shift.employee.name)
)
.penalize(HardSoftDecimalScore.ONE_HARD)
Grouping and counting:
constraint_factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
.filter(lambda employee, count: count > MAX)
.penalize(...)
Reward instead of penalize:
.reward(HardSoftDecimalScore.ONE_SOFT)
Variable penalty:
.penalize(
HardSoftDecimalScore.ONE_HARD,
lambda shift: calculate_penalty_amount(shift)
)
Working with collections (flatten):
constraint_factory.for_each(Shift)
.join(Employee, Joiners.equal(...))
.flatten_last(lambda employee: employee.unavailable_dates)
.filter(...)
Common Joiners
| Joiner | Purpose |
|---|
Joiners.equal(lambda x: x.field) | Match entities with same field value |
Joiners.less_than(lambda x: x.field) | First entity’s field < second’s (ensures ordering) |
Joiners.overlapping(start, end) | Time intervals overlap |
Debugging Tips
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Test constraint in isolation:
# Create minimal test case with just the constraint you're debugging
schedule = EmployeeSchedule(
employees=[test_employee],
shifts=[test_shift]
)
solver_factory = SolverFactory.create(solver_config)
score_director = solver_factory.get_score_director_factory().build_score_director()
score_director.set_working_solution(schedule)
score = score_director.calculate_score()
print(f"Score: {score}")
Check constraint matches:
Add print statements (remove in production):
.filter(lambda shift: (
print(f"Checking shift {shift.id}") or # Debug print
shift.required_skill not in shift.employee.skills
))
Common Gotchas
Forgot to register constraint in define_constraints() return list
- Symptom: Constraint not enforced
Using wrong Joiner
Joiners.equal when you need Joiners.less_than- Symptom: Pairs counted twice or constraint not working
Expensive operations in constraint functions
- Database/API calls in filters
- Symptom: Solving extremely slow
Score sign confusion
- Higher soft score is better (not worse!)
- Hard score must be ≥ 0 for feasible solution
Field name mismatch
- Guide said
skill_set, actual is skills - Guide said
employee_list, actual is employees
Additional Resources
3 - Portfolio Optimization
A comprehensive quickstart guide to understanding and building intelligent stock portfolio optimization with SolverForge
Legacy Implementation Guide
This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.
SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.
Table of Contents
- Introduction
- Getting Started
- The Problem We’re Solving
- Understanding the Data Model
- How Optimization Works
- Writing Constraints: The Business Rules
- The Solver Engine
- Web Interface and API
- Making Your First Customization
- Advanced Constraint Patterns
- Testing and Validation
- Quick Reference
Introduction
What You’ll Learn
This guide walks you through a complete stock portfolio optimization application built with SolverForge, a constraint-based optimization framework. You’ll learn:
- How to model investment decisions as optimization problems
- How to express diversification rules as constraints that guide the solution
- How optimization algorithms find high-quality portfolios automatically
- How to customize the system for your specific investment strategies
No optimization or finance background required — we’ll explain both optimization and finance concepts as we encounter them in the code.
Architecture Note: This guide uses the “fast” implementation pattern with dataclass domain models and Pydantic only at API boundaries. For the architectural reasoning behind this design, see Dataclasses vs Pydantic in Constraint Solvers.
Prerequisites
- Basic Python knowledge (classes, functions, type annotations)
- Familiarity with REST APIs
- Comfort with command-line operations
What is Portfolio Optimization?
Traditional portfolio selection: You write explicit rules like “pick the 20 stocks with highest predicted returns.”
Constraint-based portfolio optimization: You describe what a good portfolio looks like (diversified, high-return, exactly 20 stocks) and the solver figures out which specific stocks to select.
Think of it like describing the ideal portfolio characteristics and having a computer try millions of combinations per second to find the best fit.
Finance Concepts (Quick Primer)
| Term | Definition | Example |
|---|
| Portfolio | Collection of investments you own | 20 stocks |
| Weight | Percentage of money in each investment | 5% per stock (equal weight) |
| Sector | Industry category | Technology, Healthcare, Finance, Energy |
| Predicted Return | Expected profit/loss percentage | 12% means $12 profit per $100 invested |
| Diversification | Spreading risk across sectors | Don’t put all eggs in one basket |
Getting Started
Running the Application
Download and navigate to the project directory:
git clone https://github.com/SolverForge/solverforge-quickstarts
cd ./solverforge-quickstarts/legacy/portfolio-optimization-fast
Create and activate virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
Install the package:
Start the server:
Open your browser:
http://localhost:8080
You’ll see a portfolio optimization interface with stocks, sectors, and a “Solve” button. Click it and watch the solver automatically select the optimal stocks while respecting diversification rules.
File Structure Overview
fast/portfolio-optimization-fast/
├── src/portfolio_optimization/
│ ├── domain.py # Data classes (StockSelection, PortfolioOptimizationPlan)
│ ├── constraints.py # Business rules (90% of customization happens here)
│ ├── solver.py # Solver configuration
│ ├── demo_data.py # Sample stock data with ML predictions
│ ├── rest_api.py # HTTP API endpoints
│ ├── converters.py # REST ↔ Domain model conversion
│ ├── json_serialization.py # JSON helpers
│ └── score_analysis.py # Score breakdown DTOs
├── static/
│ ├── index.html # Web UI
│ └── app.js # UI logic and visualization
├── scripts/
│ └── comparison.py # Greedy vs Solver comparison
└── tests/
├── test_constraints.py # Unit tests for constraints
└── test_feasible.py # Integration tests
Key insight: Most business customization happens in constraints.py alone. You rarely need to modify other files.
The Problem We’re Solving
The Investment Challenge
You have $100,000 to invest and must select 20 stocks from a pool of candidates. Each stock has an ML-predicted return (e.g., “Apple is expected to return 12%”).
Hard constraints (must be satisfied):
- Select exactly 20 stocks
- No sector can exceed 25% of the portfolio (max 5 stocks per sector)
Soft constraints (preferences to optimize):
- Maximize total expected return (pick stocks with highest predictions)
Why Use a Constraint Solver?
For this simplified quickstart (Boolean selection with sector limits), a well-implemented greedy algorithm can find near-optimal solutions. So why use a constraint solver?
1. Declarative vs Imperative: With constraints, you describe what you want, not how to achieve it. Adding a new rule is one function, not a rewrite of your algorithm.
2. Constraint Interactions: As constraints multiply, greedy logic becomes brittle. Consider adding:
- Minimum 2 stocks per sector (diversification floor)
- No more than 3 correlated stocks together
- ESG score requirements
Each new constraint in greedy code means more if/else branches and edge cases. In a constraint solver, you just add another constraint function.
3. Real-World Complexity: Production portfolios have weight optimization (not just in/out), correlation matrices, risk budgets, and regulatory requirements. These create solution spaces where greedy approaches fail.
Understanding the Data Model
Let’s examine the core classes that model our problem. Open src/portfolio_optimization/domain.py:
Domain Model Architecture
This quickstart separates domain models (dataclasses) from API models (Pydantic):
- Domain layer (
domain.py lines 32-169): Pure @dataclass models for solver operations - API layer (
domain.py lines 268-307): Pydantic BaseModel classes for REST endpoints - Converters (
converters.py): Translate between the two layers
The StockSelection Class (Planning Entity)
@planning_entity
@dataclass
class StockSelection:
stock_id: Annotated[str, PlanningId] # "AAPL", "GOOGL", etc.
stock_name: str # "Apple Inc."
sector: str # "Technology"
predicted_return: float # 0.12 = 12%
selection: Annotated[SelectionValue | None, PlanningVariable] = None
What it represents: A stock that could be included in the portfolio.
Key fields:
stock_id: Unique identifier (ticker symbol)stock_name: Human-readable company namesector: Industry classification for diversificationpredicted_return: ML model’s expected return (decimal: 0.12 = 12%)selection: The decision — should this stock be in the portfolio?
Important annotations:
@planning_entity: Tells SolverForge this class contains decisions to makePlanningVariable: Marks selection as the decision variable
Optimization concept: This is a planning variable — the value the solver assigns. Each stock starts with selection=None (undecided). The solver tries SELECTED vs NOT_SELECTED for each stock, evaluating according to your constraints.
The SelectionValue Pattern
Unlike employee scheduling where the planning variable is a reference to another entity, portfolio optimization uses a Boolean selection pattern:
@dataclass
class SelectionValue:
"""Wrapper for True/False selection state."""
value: bool
SELECTED = SelectionValue(True)
NOT_SELECTED = SelectionValue(False)
Why a wrapper? SolverForge needs reference types for value ranges. We wrap the boolean in a dataclass so the solver can work with it.
Convenience property:
@property
def selected(self) -> bool | None:
"""Check if stock is selected."""
if self.selection is None:
return None
return self.selection.value
The PortfolioConfig Class (Problem Fact)
@dataclass
class PortfolioConfig:
target_count: int = 20 # How many stocks to select
max_per_sector: int = 5 # Max stocks in any sector
unselected_penalty: int = 10000 # Soft penalty per unselected stock
What it represents: Configurable parameters that constraints read.
Optimization concept: This is a problem fact — immutable data that doesn’t change during solving but influences constraint behavior. Making these configurable allows users to adjust the optimization without modifying constraint code.
The PortfolioOptimizationPlan Class (Planning Solution)
@planning_solution
@dataclass
class PortfolioOptimizationPlan:
stocks: Annotated[list[StockSelection], PlanningEntityCollectionProperty, ValueRangeProvider]
target_position_count: int = 20
max_sector_percentage: float = 0.25
portfolio_config: Annotated[PortfolioConfig, ProblemFactProperty]
selection_range: Annotated[list[SelectionValue], ValueRangeProvider(id="selection_range")]
score: Annotated[HardSoftScore | None, PlanningScore] = None
solver_status: SolverStatus = SolverStatus.NOT_SOLVING
What it represents: The complete problem and its solution.
Key fields:
stocks: All candidate stocks (planning entities)portfolio_config: Configuration parameters (problem fact)selection_range: [SELECTED, NOT_SELECTED] — possible values for each stockscore: Solution quality metric (calculated by constraints)solver_status: Whether solving is active
Annotations explained:
@planning_solution: Marks this as the top-level problem definitionPlanningEntityCollectionProperty: The entities being optimizedValueRangeProvider: Tells solver what values can be assignedProblemFactProperty: Immutable configuration dataPlanningScore: Where the solver stores the calculated score
Helper Methods for Business Metrics
The PortfolioOptimizationPlan class includes useful analytics:
def get_selected_stocks(self) -> list[StockSelection]:
"""Return only stocks that are selected."""
return [s for s in self.stocks if s.selected is True]
def get_expected_return(self) -> float:
"""Calculate total expected portfolio return."""
weight = self.get_weight_per_stock()
return sum(s.predicted_return * weight for s in self.get_selected_stocks())
def get_sector_weights(self) -> dict[str, float]:
"""Calculate weight per sector."""
weight = self.get_weight_per_stock()
sector_weights = {}
for stock in self.get_selected_stocks():
sector_weights[stock.sector] = sector_weights.get(stock.sector, 0) + weight
return sector_weights
How Optimization Works
Before diving into constraints, let’s understand how the solver finds solutions.
The Search Process (Simplified)
- Start with an initial solution (often random selections)
- Evaluate the score using your constraint functions
- Make a small change (toggle one stock’s selection)
- Evaluate the new score
- Keep the change if it improves the score (with some controlled randomness)
- Repeat millions of times in seconds
- Return the best solution found
The Search Space
For a portfolio problem with 50 candidate stocks selecting exactly 20, there are trillions of valid combinations. The solver explores this space using smart heuristics, not brute force.
The Score: How “Good” is a Portfolio?
Every solution gets a score with two parts:
0hard/-45000soft
- Hard score: Counts hard constraint violations (must be 0 for a valid portfolio)
- Soft score: Reflects optimization quality (higher/less negative is better)
Scoring rules:
- Hard score must be 0 or positive (negative = invalid portfolio)
- Among valid portfolios (hard score = 0), highest soft score wins
- Hard score always takes priority over soft score
Portfolio example:
-2hard/-35000soft → Invalid: 2 constraint violations
0hard/-50000soft → Valid but low quality
0hard/-25000soft → Valid and better quality
Writing Constraints: The Business Rules
Now the heart of the system. Open src/portfolio_optimization/constraints.py.
The Constraint Provider Pattern
All constraints are registered in one function:
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
return [
# Hard constraints
must_select_target_count(constraint_factory),
sector_exposure_limit(constraint_factory),
# Soft constraints
penalize_unselected_stock(constraint_factory),
maximize_expected_return(constraint_factory),
]
Each constraint is a function returning a Constraint object. Let’s examine them.
Hard Constraint: Must Select Target Count
Business rule: “Don’t select more than N stocks” (default 20)
def must_select_target_count(constraint_factory: ConstraintFactory) -> Constraint:
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(ConstraintCollectors.count())
.join(PortfolioConfig)
.filter(lambda count, config: count > config.target_count)
.penalize(
HardSoftScore.ONE_HARD,
lambda count, config: count - config.target_count
)
.as_constraint("Must select target count")
)
How to read this:
for_each(StockSelection): Consider every stock.filter(...): Keep only selected stocks.group_by(count()): Count how many are selected.join(PortfolioConfig): Access the configuration.filter(...): Keep only if count exceeds target.penalize(ONE_HARD, ...): Each extra stock adds 1 hard penalty
Why only “not more than”? We use a separate soft constraint to drive selection toward the target. This approach handles edge cases better than counting both over and under.
Soft Constraint: Penalize Unselected Stock
Business rule: “Strongly prefer selecting stocks to meet the target”
def penalize_unselected_stock(constraint_factory: ConstraintFactory) -> Constraint:
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is False)
.join(PortfolioConfig)
.penalize(
HardSoftScore.ONE_SOFT,
lambda stock, config: config.unselected_penalty # Default 10000
)
.as_constraint("Penalize unselected stock")
)
How to read this:
for_each(StockSelection): Consider every stock.filter(...): Keep only unselected stocks.join(PortfolioConfig): Access the penalty value.penalize(ONE_SOFT, 10000): Each unselected stock costs 10000 soft points
Why 10000? This penalty is higher than the maximum return reward (~2000 per stock). This ensures the solver prioritizes reaching 20 stocks before optimizing returns.
Example with 25 stocks:
- Optimal: 20 selected + 5 unselected = -50000 soft penalty
- If returns reward is ~30000, final soft score is around -20000soft
Hard Constraint: Sector Exposure Limit
Business rule: “No sector can exceed N stocks” (default 5 = 25% of 20)
def sector_exposure_limit(constraint_factory: ConstraintFactory) -> Constraint:
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(
lambda stock: stock.sector, # Group by sector name
ConstraintCollectors.count() # Count stocks per sector
)
.join(PortfolioConfig)
.filter(lambda sector, count, config: count > config.max_per_sector)
.penalize(
HardSoftScore.ONE_HARD,
lambda sector, count, config: count - config.max_per_sector
)
.as_constraint("Max stocks per sector")
)
How to read this:
for_each(StockSelection): All stocks.filter(...): Keep only selected stocks.group_by(sector, count()): Count selected stocks in each sector.join(PortfolioConfig): Access the sector limit.filter(...): Keep sectors exceeding the limit.penalize(...): Penalty = stocks over limit per sector
Finance concept: This enforces diversification. If Tech crashes 50%, you only lose 25% × 50% = 12.5% of your portfolio, not 40% × 50% = 20%.
Soft Constraint: Maximize Expected Return
Business rule: “Among valid portfolios, prefer stocks with higher predicted returns”
def maximize_expected_return(constraint_factory: ConstraintFactory) -> Constraint:
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.reward(
HardSoftScore.ONE_SOFT,
lambda stock: int(stock.predicted_return * 10000)
)
.as_constraint("Maximize expected return")
)
How to read this:
for_each(StockSelection): All stocks.filter(...): Keep only selected stocks.reward(...): Add points based on predicted return
Why multiply by 10000? Converts decimal returns (0.12) to integer scores (1200). A stock with 12% predicted return adds 1200 soft points.
Example calculation:
| Stock | Return | Score Contribution |
|---|
| NVDA | 20% | +2000 |
| AAPL | 12% | +1200 |
| JPM | 8% | +800 |
| XOM | 4% | +400 |
The Solver Engine
Now let’s see how the solver is configured. Open src/portfolio_optimization/solver.py:
def create_solver_config(termination_seconds: int = 30) -> SolverConfig:
return SolverConfig(
solution_class=PortfolioOptimizationPlan,
entity_class_list=[StockSelection],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=termination_seconds)
),
)
solver_config = create_solver_config()
solver_manager = SolverManager.create(SolverFactory.create(solver_config))
solution_manager = SolutionManager.create(solver_manager)
Configuration Breakdown
solution_class: Your planning solution class (PortfolioOptimizationPlan)
entity_class_list: Planning entities to optimize ([StockSelection])
score_director_factory_config: Contains the constraint provider function
- Note: Nested inside
ScoreDirectorFactoryConfig, not directly in SolverConfig
termination_config: When to stop solving
spent_limit=Duration(seconds=30): Stop after 30 seconds
SolverManager: Asynchronous Solving
SolverManager handles solving in the background without blocking your API:
# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, portfolio, callback_function)
# Check status
status = solver_manager.get_solver_status(job_id)
# Get current best solution
solution = solver_manager.get_solution(job_id)
# Stop early
solver_manager.terminate_early(job_id)
Solving Timeline
Small problems (25 stocks, select 20):
- Initial valid solution: < 1 second
- Good solution: 5-10 seconds
- Optimal or near-optimal: 30 seconds
Large problems (50+ stocks, select 20):
- Initial valid solution: 1-3 seconds
- Good solution: 15-30 seconds
- High-quality: 60-120 seconds
Factors affecting speed:
- Number of candidate stocks (search space size)
- Sector distribution (tighter constraints = harder)
- How many stocks to select vs available
Web Interface and API
REST API Endpoints
Open src/portfolio_optimization/rest_api.py to see the API:
GET /demo-data
Returns available demo datasets:
GET /demo-data/{dataset_id}
Generates and returns sample stock data:
{
"stocks": [
{
"stockId": "AAPL",
"stockName": "Apple Inc.",
"sector": "Technology",
"predictedReturn": 0.12,
"selected": null
}
],
"targetPositionCount": 20,
"maxSectorPercentage": 0.25
}
POST /portfolios
Submit a portfolio for optimization:
Request body: Same format as demo-data response
Response: Job ID as plain text
"a1b2c3d4-e5f6-7890-abcd-ef1234567890"
Implementation highlights:
@app.post("/portfolios")
async def solve_portfolio(plan_model: PortfolioOptimizationPlanModel) -> str:
job_id = str(uuid4())
plan = model_to_plan(plan_model)
data_sets[job_id] = plan
# Support custom termination time
termination_seconds = 30
if plan_model.solver_config:
termination_seconds = plan_model.solver_config.termination_seconds
config = create_solver_config(termination_seconds)
manager = SolverManager.create(SolverFactory.create(config))
solver_managers[job_id] = manager
manager.solve_and_listen(job_id, plan, lambda sol: update_portfolio(job_id, sol))
return job_id
GET /portfolios/{problem_id}
Get current solution:
{
"stocks": [...],
"targetPositionCount": 20,
"maxSectorPercentage": 0.25,
"score": "0hard/-25000soft",
"solverStatus": "SOLVING_ACTIVE"
}
GET /portfolios/{problem_id}/status
Lightweight status check with metrics:
{
"score": {
"hardScore": 0,
"softScore": -25000
},
"solverStatus": "SOLVING_ACTIVE",
"selectedCount": 20,
"expectedReturn": 0.1125,
"sectorWeights": {
"Technology": 0.25,
"Healthcare": 0.25,
"Finance": 0.25,
"Energy": 0.25
}
}
DELETE /portfolios/{problem_id}
Stop solving early and return best solution found.
PUT /portfolios/analyze
Analyze a portfolio’s constraint violations in detail:
{
"constraints": [
{
"name": "Max stocks per sector",
"weight": "1hard",
"score": "-1hard",
"matches": [
{
"name": "Max stocks per sector",
"score": "-1hard",
"justification": "Technology: 6 stocks (limit 5)"
}
]
}
]
}
Web UI Flow
The static/app.js implements this polling workflow:
- User opens page → Load demo data (
GET /demo-data/SMALL) - Display stocks grouped by sector with predicted returns
- User clicks “Solve” →
POST /portfolios (get job ID back) - Poll
GET /portfolios/{id}/status every 500ms - Update UI with latest selections and score in real-time
- When
solverStatus === "NOT_SOLVING" → Stop polling - Display final score, selected stocks, and sector allocation chart
Making Your First Customization
The quickstart includes a tutorial constraint that demonstrates a common pattern. Let’s understand how it works and then learn how to create similar constraints.
Understanding the Preferred Sector Bonus
The codebase includes preferred_sector_bonus which gives a small bonus to Technology and Healthcare stocks. This constraint is disabled by default (commented out in define_constraints()) but serves as a useful example.
Business rule: “Slightly favor Technology and Healthcare stocks (higher growth sectors)”
The Constraint Implementation
Find this in src/portfolio_optimization/constraints.py around line 200:
# TUTORIAL: Uncomment this constraint to add sector preference
# def preferred_sector_bonus(constraint_factory: ConstraintFactory):
# """Soft constraint: Give a small bonus to stocks from preferred sectors."""
# PREFERRED_SECTORS = {"Technology", "Healthcare"}
# BONUS_POINTS = 50 # Small bonus per preferred stock
#
# return (
# constraint_factory.for_each(StockSelection)
# .filter(lambda stock: stock.selected is True)
# .filter(lambda stock: stock.sector in PREFERRED_SECTORS)
# .reward(
# HardSoftScore.ONE_SOFT,
# lambda stock: BONUS_POINTS
# )
# .as_constraint("Preferred sector bonus")
# )
How this works:
- Find all selected stocks
- Keep only those in preferred sectors
- Reward each with 50 bonus points
Enabling the Constraint
Uncomment the function (remove the # comment markers)
Register it in define_constraints():
return [
must_select_target_count(constraint_factory),
sector_exposure_limit(constraint_factory),
penalize_unselected_stock(constraint_factory),
maximize_expected_return(constraint_factory),
preferred_sector_bonus(constraint_factory), # ADD THIS LINE
]
Restart the server and solve again
Observe the portfolio now slightly favors Tech and Healthcare stocks
Why 50 Points?
The bonus is intentionally small (50) compared to return rewards (1000-2000 per stock). This makes it a tiebreaker rather than an override:
- If two stocks have similar predicted returns, prefer the one in a preferred sector
- Don’t sacrifice significant returns just to pick a preferred sector
Adding a Test
Add a test class to tests/test_constraints.py:
from portfolio_optimization.constraints import preferred_sector_bonus
class TestPreferredSectorBonus:
def test_technology_stock_rewarded(self) -> None:
"""Technology stocks should receive bonus."""
stock = create_stock("TECH1", sector="Technology", selected=True)
constraint_verifier.verify_that(preferred_sector_bonus).given(
stock
).rewards_with(50)
def test_energy_stock_not_rewarded(self) -> None:
"""Energy stocks should not receive bonus."""
stock = create_stock("ENGY1", sector="Energy", selected=True)
constraint_verifier.verify_that(preferred_sector_bonus).given(
stock
).rewards(0)
def test_unselected_tech_not_rewarded(self) -> None:
"""Unselected stocks don't receive bonus even if in preferred sector."""
stock = create_stock("TECH1", sector="Technology", selected=False)
constraint_verifier.verify_that(preferred_sector_bonus).given(
stock
).rewards(0)
Run with:
pytest tests/test_constraints.py::TestPreferredSectorBonus -v
Advanced Constraint Patterns
Pattern 1: Volatility Risk Penalty
Scenario: Penalize portfolios with high variance in predicted returns (higher risk).
def penalize_high_volatility(constraint_factory: ConstraintFactory) -> Constraint:
"""
Soft constraint: Penalize portfolios with high return variance.
Risk-averse investors prefer consistent returns over volatile ones.
"""
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(
ConstraintCollectors.to_list(lambda stock: stock.predicted_return)
)
.filter(lambda returns: len(returns) >= 2)
.penalize(
HardSoftScore.ONE_SOFT,
lambda returns: int(calculate_variance(returns) * 10000)
)
.as_constraint("High volatility penalty")
)
def calculate_variance(returns: list[float]) -> float:
mean = sum(returns) / len(returns)
return sum((r - mean) ** 2 for r in returns) / len(returns)
Pattern 2: Minimum Sector Representation
Scenario: Ensure each sector has at least 2 stocks (broader diversification).
def minimum_sector_representation(constraint_factory: ConstraintFactory) -> Constraint:
"""
Hard constraint: Each sector must have at least 2 stocks.
Ensures broader diversification beyond just max limits.
"""
MIN_PER_SECTOR = 2
KNOWN_SECTORS = {"Technology", "Healthcare", "Finance", "Energy"}
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(lambda stock: stock.sector, ConstraintCollectors.count())
.filter(lambda sector, count: sector in KNOWN_SECTORS and count < MIN_PER_SECTOR)
.penalize(
HardSoftScore.ONE_HARD,
lambda sector, count: MIN_PER_SECTOR - count
)
.as_constraint("Minimum sector representation")
)
Pattern 3: Exclude Specific Stocks
Scenario: Some stocks are on a “do not buy” list (regulatory, ethical, etc.).
def exclude_blacklisted_stocks(constraint_factory: ConstraintFactory) -> Constraint:
"""
Hard constraint: Never select blacklisted stocks.
Useful for regulatory compliance or ethical investing.
"""
BLACKLIST = {"TOBACCO1", "GAMBLING2", "WEAPONS3"}
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.filter(lambda stock: stock.stock_id in BLACKLIST)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Exclude blacklisted stocks")
)
Pattern 4: Sector Weight Balance
Scenario: Prefer portfolios where no sector is significantly larger than others.
def balance_sector_weights(constraint_factory: ConstraintFactory) -> Constraint:
"""
Soft constraint: Prefer balanced sector allocation.
Uses load balancing to penalize uneven distribution.
"""
return (
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(lambda stock: stock.sector, ConstraintCollectors.count())
.group_by(
ConstraintCollectors.load_balance(
lambda sector, count: sector,
lambda sector, count: count
)
)
.penalize(
HardSoftScore.ONE_SOFT,
lambda balance: int(balance.unfairness() * 100)
)
.as_constraint("Balance sector weights")
)
Testing and Validation
Unit Testing Constraints
The quickstart uses ConstraintVerifier for isolated constraint testing. See tests/test_constraints.py:
from solverforge_legacy.solver.test import ConstraintVerifier
constraint_verifier = ConstraintVerifier.build(
define_constraints, PortfolioOptimizationPlan, StockSelection
)
DEFAULT_CONFIG = PortfolioConfig(target_count=20, max_per_sector=5, unselected_penalty=10000)
def create_stock(stock_id, sector="Technology", predicted_return=0.10, selected=True):
selection_value = SELECTED if selected else NOT_SELECTED
return StockSelection(
stock_id=stock_id,
stock_name=f"{stock_id} Corp",
sector=sector,
predicted_return=predicted_return,
selection=selection_value,
)
Test patterns:
Verify no penalty:
def test_at_limit_no_penalty(self):
stocks = [create_stock(f"TECH{i}", sector="Technology") for i in range(5)]
constraint_verifier.verify_that(sector_exposure_limit).given(
*stocks, DEFAULT_CONFIG
).penalizes(0)
Verify exact penalty amount:
def test_one_over_limit_penalizes_1(self):
stocks = [create_stock(f"TECH{i}", sector="Technology") for i in range(6)]
constraint_verifier.verify_that(sector_exposure_limit).given(
*stocks, DEFAULT_CONFIG
).penalizes_by(1)
Verify reward amount:
def test_high_return_stock_rewarded(self):
stock = create_stock("AAPL", predicted_return=0.12, selected=True)
constraint_verifier.verify_that(maximize_expected_return).given(
stock
).rewards_with(1200) # 0.12 * 10000
Running Tests
# All tests
pytest
# Verbose output
pytest -v
# Specific test file
pytest tests/test_constraints.py
# Specific test class
pytest tests/test_constraints.py::TestSectorExposureLimit
# With coverage
pytest --cov=portfolio_optimization
Integration Testing: Full Solve
Test the complete solving cycle in tests/test_feasible.py:
def test_small_dataset_feasible():
"""Solver should find a feasible solution for small dataset."""
plan = generate_demo_data(DemoData.SMALL)
# All stocks start unselected
assert all(s.selection is None for s in plan.stocks)
# Solve for 10 seconds
solution = solve_for_seconds(plan, 10)
# Should select exactly 20 stocks
assert solution.get_selected_count() == 20
# Should have no sector over 25%
for sector, weight in solution.get_sector_weights().items():
assert weight <= 0.26, f"{sector} at {weight*100}%"
# Should have 0 hard score (feasible)
assert solution.score.hard_score == 0
Quick Reference
File Locations
| Need to… | Edit this file |
|---|
| Add/change business rule | src/portfolio_optimization/constraints.py |
| Add field to StockSelection | src/portfolio_optimization/domain.py + converters.py |
| Change default config | src/portfolio_optimization/domain.py (PortfolioConfig) |
| Change solve time | src/portfolio_optimization/solver.py or API parameter |
| Add REST endpoint | src/portfolio_optimization/rest_api.py |
| Change demo data | src/portfolio_optimization/demo_data.py |
| Change UI | static/index.html, static/app.js |
Common Constraint Patterns
Count selected stocks:
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(ConstraintCollectors.count())
.filter(lambda count: count > MAX)
.penalize(...)
Group by sector and count:
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.group_by(lambda stock: stock.sector, ConstraintCollectors.count())
.filter(lambda sector, count: count > MAX)
.penalize(...)
Reward based on attribute:
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.reward(HardSoftScore.ONE_SOFT, lambda stock: int(stock.attribute * 10000))
Filter by set membership:
constraint_factory.for_each(StockSelection)
.filter(lambda stock: stock.selected is True)
.filter(lambda stock: stock.sector in PREFERRED_SECTORS)
.reward(...)
Access configurable parameters:
constraint_factory.for_each(StockSelection)
.filter(...)
.join(PortfolioConfig)
.filter(lambda stock, config: some_condition(stock, config))
.penalize(HardSoftScore.ONE_HARD, lambda stock, config: penalty(config))
Common Gotchas
Forgot to register constraint in define_constraints() return list
- Symptom: Constraint not enforced
Using wrong score type
HardSoftScore.ONE_HARD for must-satisfy rulesHardSoftScore.ONE_SOFT for preferences
Boolean vs SelectionValue confusion
- Use
stock.selected is True (the property) - Not
stock.selection == True (would compare wrong type)
Empty stream returns nothing, not 0
- If no stocks are selected,
group_by(count()) produces nothing - Can’t use “must have at least N” pattern naively
Score sign confusion
- Higher soft score is better (less negative)
- Use
.reward() to add points, .penalize() to subtract
Forgetting to pass config to constraint tests
- Parameterized constraints need
PortfolioConfig as a problem fact constraint_verifier.verify_that(...).given(*stocks, DEFAULT_CONFIG)
Debugging Tips
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Use the /analyze endpoint:
curl -X PUT http://localhost:8080/portfolios/analyze \
-H "Content-Type: application/json" \
-d @my_portfolio.json
Print in constraints (temporary debugging):
.filter(lambda stock: (
print(f"Checking {stock.stock_id}: {stock.sector}") or
stock.sector in PREFERRED_SECTORS
))
Additional Resources
4 - VM Placement
A comprehensive quickstart guide to understanding and building intelligent virtual machine placement optimization with SolverForge
Legacy Implementation Guide
This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.
SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.
Table of Contents
- Introduction
- Getting Started
- The Problem We’re Solving
- Understanding the Data Model
- How Optimization Works
- Writing Constraints: The Business Rules
- The Solver Engine
- Web Interface and API
- Making Your First Customization
- Advanced Constraint Patterns
- Testing and Validation
- Quick Reference
Introduction
What You’ll Learn
This guide walks you through a complete virtual machine placement application built with SolverForge, a constraint-based optimization framework. You’ll learn:
- How to model resource allocation decisions as optimization problems
- How to express capacity limits and placement rules as constraints
- How optimization algorithms find efficient placements automatically
- How to customize the system for your specific infrastructure requirements
No optimization or cloud infrastructure background required — we’ll explain both optimization and datacenter concepts as we encounter them in the code.
Architecture Note: This guide uses the “fast” implementation pattern with dataclass domain models and Pydantic only at API boundaries. For the architectural reasoning behind this design, see Dataclasses vs Pydantic in Constraint Solvers.
Prerequisites
- Basic Python knowledge (classes, functions, type annotations)
- Familiarity with REST APIs
- Comfort with command-line operations
What is VM Placement Optimization?
Traditional VM placement: You write explicit rules like “sort VMs by size and pack them onto servers using first-fit decreasing.”
Constraint-based VM placement: You describe what a valid placement looks like (capacity respected, replicas separated, load balanced) and the solver figures out which VM goes where.
Think of it like describing the ideal datacenter state and having a computer try millions of placement combinations per second to find the best fit.
Datacenter Concepts (Quick Primer)
| Term | Definition | Example |
|---|
| Server | Physical machine with CPU, memory, and storage | 32 cores, 128 GB RAM, 2 TB storage |
| VM | Virtual machine requiring resources from a server | 4 cores, 16 GB RAM, 100 GB storage |
| Rack | Physical grouping of servers in a datacenter | Rack A contains 8 servers |
| Affinity | VMs that should run on the same server | Web app and its cache |
| Anti-Affinity | VMs that must run on different servers | Database primary and replica |
| Consolidation | Using fewer servers to reduce power/cooling costs | Pack VMs tightly |
Getting Started
Running the Application
Download and navigate to the project directory:
git clone https://github.com/SolverForge/solverforge-quickstarts
cd ./solverforge-quickstarts/legacy/vm-placement-fast
Create and activate virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
Install the package:
Start the server:
Open your browser:
http://localhost:8080
You’ll see a VM placement interface with server racks, VMs, and a “Solve” button. Click it and watch the solver automatically assign VMs to servers while respecting capacity limits and placement rules.
File Structure Overview
legacy/vm-placement-fast/
├── src/vm_placement/
│ ├── domain.py # Data classes (Server, VM, VMPlacementPlan)
│ ├── constraints.py # Business rules (90% of customization happens here)
│ ├── solver.py # Solver configuration
│ ├── demo_data.py # Sample infrastructure and VMs
│ ├── rest_api.py # HTTP API endpoints
│ ├── converters.py # REST ↔ Domain model conversion
│ └── json_serialization.py # JSON helpers
├── static/
│ ├── index.html # Web UI with rack visualization
│ ├── app.js # UI logic and visualization
│ └── config.js # Advanced settings sliders
└── tests/
└── test_constraints.py # Unit tests for constraints
Key insight: Most business customization happens in constraints.py alone. You rarely need to modify other files.
The Problem We’re Solving
The Infrastructure Challenge
You manage a datacenter with physical servers organized in racks, and must place virtual machines onto those servers. Each server has limited CPU cores, memory, and storage capacity.
Hard constraints (must be satisfied):
- Never exceed a server’s CPU, memory, or storage capacity
- Keep database replicas on different servers (anti-affinity)
Soft constraints (preferences to optimize):
- Place related services together when possible (affinity)
- Minimize the number of active servers (consolidation)
- Balance load across active servers
- Place higher-priority VMs first
Why Use a Constraint Solver?
For simple bin-packing (fit VMs into servers by size), a well-implemented first-fit-decreasing algorithm works. So why use a constraint solver?
1. Declarative vs Imperative: With constraints, you describe what you want, not how to achieve it. Adding a new rule is one function, not a rewrite of your algorithm.
2. Constraint Interactions: As constraints multiply, greedy logic becomes brittle. Consider adding:
- Anti-affinity for database replicas
- Affinity for microservice tiers
- GPU requirements for ML workloads
- Rack-aware fault tolerance
Each new constraint in greedy code means more if/else branches and edge cases. In a constraint solver, you just add another constraint function.
3. Real-World Complexity: Production datacenters have migration costs, maintenance windows, SLA requirements, and live traffic patterns. These create solution spaces where greedy approaches fail.
Understanding the Data Model
Let’s examine the core classes that model our problem. Open src/vm_placement/domain.py:
Domain Model Architecture
This quickstart separates domain models (dataclasses) from API models (Pydantic):
- Domain layer (
domain.py lines 26-156): Pure @dataclass models for solver operations - API layer (
domain.py lines 159-207): Pydantic BaseModel classes for REST endpoints - Converters (
converters.py): Translate between the two layers
The Server Class (Problem Fact)
@dataclass
class Server:
id: Annotated[str, PlanningId]
name: str
cpu_cores: int
memory_gb: int
storage_gb: int
rack: Optional[str] = None
What it represents: A physical server that can host virtual machines.
Key fields:
id: Unique identifier for the servername: Human-readable server namecpu_cores: Available CPU coresmemory_gb: Available memory in gigabytesstorage_gb: Available storage in gigabytesrack: Which physical rack contains this server
Optimization concept: This is a problem fact — immutable data that doesn’t change during solving. Servers are the targets for VM placement, not the decisions themselves.
The VM Class (Planning Entity)
@planning_entity
@dataclass
class VM:
id: Annotated[str, PlanningId]
name: str
cpu_cores: int
memory_gb: int
storage_gb: int
priority: int = 1
affinity_group: Optional[str] = None
anti_affinity_group: Optional[str] = None
server: Annotated[Optional[Server], PlanningVariable] = None
What it represents: A virtual machine that needs to be placed on a server.
Key fields:
id: Unique identifier (VM ID)name: Human-readable VM namecpu_cores, memory_gb, storage_gb: Resource requirementspriority: Importance level (1-5, higher = more important)affinity_group: Group name for VMs that should be togetheranti_affinity_group: Group name for VMs that must be separatedserver: The assignment decision — which server hosts this VM?
Important annotations:
@planning_entity: Tells SolverForge this class contains decisions to makePlanningVariable: Marks server as the decision variable
Optimization concept: This is a planning variable — the value the solver assigns. Each VM starts with server=None (unassigned). The solver tries different server assignments, evaluating according to your constraints.
The Assignment Pattern
Unlike portfolio optimization where the planning variable is a Boolean (SELECTED/NOT_SELECTED), VM placement uses a reference assignment pattern:
server: Annotated[Optional[Server], PlanningVariable] = None
Why a reference? Each VM can be assigned to any server from the value range. The solver picks from the list of available servers, or leaves the VM unassigned (None).
The VMPlacementPlan Class (Planning Solution)
@planning_solution
@dataclass
class VMPlacementPlan:
name: str
servers: Annotated[list[Server], ProblemFactCollectionProperty, ValueRangeProvider]
vms: Annotated[list[VM], PlanningEntityCollectionProperty]
score: Annotated[Optional[HardSoftScore], PlanningScore] = None
solver_status: SolverStatus = SolverStatus.NOT_SOLVING
What it represents: The complete problem and its solution.
Key fields:
name: Problem instance name (e.g., “Datacenter Alpha”)servers: All physical servers (problem facts + value range)vms: All VMs to place (planning entities)score: Solution quality metric (calculated by constraints)solver_status: Whether solving is active
Annotations explained:
@planning_solution: Marks this as the top-level problem definitionProblemFactCollectionProperty: Immutable problem dataValueRangeProvider: Servers are valid values for VM.serverPlanningEntityCollectionProperty: The entities being optimizedPlanningScore: Where the solver stores the calculated score
Helper Methods for Business Metrics
The VMPlacementPlan class includes useful analytics:
def get_vms_on_server(self, server: Server) -> list:
"""Get all VMs assigned to a specific server."""
return [vm for vm in self.vms if vm.server == server]
def get_server_used_cpu(self, server: Server) -> int:
"""Get total CPU cores used on a server."""
return sum(vm.cpu_cores for vm in self.vms if vm.server == server)
@property
def active_servers(self) -> int:
"""Count servers that have at least one VM assigned."""
active_server_ids = set(vm.server.id for vm in self.vms if vm.server is not None)
return len(active_server_ids)
@property
def unassigned_vms(self) -> int:
"""Count VMs without a server assignment."""
return sum(1 for vm in self.vms if vm.server is None)
How Optimization Works
Before diving into constraints, let’s understand how the solver finds solutions.
The Search Process (Simplified)
- Start with an initial solution (often all VMs unassigned)
- Evaluate the score using your constraint functions
- Make a small change (assign one VM to a server, or move it)
- Evaluate the new score
- Keep the change if it improves the score (with some controlled randomness)
- Repeat millions of times in seconds
- Return the best solution found
The Search Space
For a placement problem with 12 servers and 30 VMs, each VM can go on any of 12 servers (or stay unassigned). That’s 13^30 possible combinations — far too many to enumerate. The solver explores this space using smart heuristics, not brute force.
The Score: How “Good” is a Placement?
Every solution gets a score with two parts:
0hard/-2500soft
- Hard score: Counts hard constraint violations (must be 0 for a valid placement)
- Soft score: Reflects optimization quality (higher/less negative is better)
Scoring rules:
- Hard score must be 0 or positive (negative = invalid placement)
- Among valid placements (hard score = 0), highest soft score wins
- Hard score always takes priority over soft score
Placement example:
-4hard/-1200soft → Invalid: 4 capacity violations
0hard/-3000soft → Valid but many servers active
0hard/-1500soft → Valid with better consolidation
Writing Constraints: The Business Rules
Now the heart of the system. Open src/vm_placement/constraints.py.
The Constraint Provider Pattern
All constraints are registered in one function:
@constraint_provider
def define_constraints(factory: ConstraintFactory):
return [
# Hard constraints
cpu_capacity(factory),
memory_capacity(factory),
storage_capacity(factory),
anti_affinity(factory),
# Soft constraints
affinity(factory),
minimize_servers_used(factory),
balance_utilization(factory),
prioritize_placement(factory),
]
Each constraint is a function returning a Constraint object. Let’s examine them.
Hard Constraint: CPU Capacity
Business rule: “Server CPU capacity cannot be exceeded”
def cpu_capacity(factory: ConstraintFactory):
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.cpu_cores))
.filter(lambda server, total_cpu: total_cpu > server.cpu_cores)
.penalize(
HardSoftScore.ONE_HARD,
lambda server, total_cpu: total_cpu - server.cpu_cores,
)
.as_constraint("cpuCapacity")
)
How to read this:
for_each(VM): Consider every VM.filter(...): Keep only assigned VMs (server is not None).group_by(server, sum(cpu_cores)): Sum CPU cores per server.filter(...): Keep only overloaded servers.penalize(ONE_HARD, excess): Each excess core adds 1 hard penalty
Example: Server with 16 cores hosting VMs totaling 20 cores = penalty of 4
Hard Constraint: Memory Capacity
Business rule: “Server memory capacity cannot be exceeded”
def memory_capacity(factory: ConstraintFactory):
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.memory_gb))
.filter(lambda server, total_memory: total_memory > server.memory_gb)
.penalize(
HardSoftScore.ONE_HARD,
lambda server, total_memory: total_memory - server.memory_gb,
)
.as_constraint("memoryCapacity")
)
Same pattern as CPU capacity, applied to memory.
Hard Constraint: Storage Capacity
Business rule: “Server storage capacity cannot be exceeded”
def storage_capacity(factory: ConstraintFactory):
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.storage_gb))
.filter(lambda server, total_storage: total_storage > server.storage_gb)
.penalize(
HardSoftScore.ONE_HARD,
lambda server, total_storage: total_storage - server.storage_gb,
)
.as_constraint("storageCapacity")
)
Same pattern as CPU capacity, applied to storage.
Hard Constraint: Anti-Affinity
Business rule: “VMs in the same anti-affinity group must be on different servers”
def anti_affinity(factory: ConstraintFactory):
return (
factory.for_each_unique_pair(
VM,
Joiners.equal(lambda vm: vm.anti_affinity_group),
Joiners.equal(lambda vm: vm.server),
)
.filter(lambda vm1, vm2: vm1.anti_affinity_group is not None)
.filter(lambda vm1, vm2: vm1.server is not None)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("antiAffinity")
)
How to read this:
for_each_unique_pair(VM, ...): Find pairs of VMsJoiners.equal(anti_affinity_group): Same anti-affinity groupJoiners.equal(server): On the same server.filter(...): Group must be set (not None).filter(...): Both must be assigned.penalize(ONE_HARD): Each violating pair adds 1 hard penalty
Use case: Database replicas should never be on the same physical server. If one server fails, the other replica survives.
Soft Constraint: Affinity
Business rule: “VMs in the same affinity group should be on the same server”
def affinity(factory: ConstraintFactory):
return (
factory.for_each_unique_pair(
VM,
Joiners.equal(lambda vm: vm.affinity_group),
)
.filter(lambda vm1, vm2: vm1.affinity_group is not None)
.filter(lambda vm1, vm2: vm1.server is not None and vm2.server is not None)
.filter(lambda vm1, vm2: vm1.server != vm2.server)
.penalize(HardSoftScore.ONE_SOFT, lambda vm1, vm2: 100)
.as_constraint("affinity")
)
How to read this:
- Find pairs of VMs in the same affinity group
- Both must be assigned
- Penalize if they’re on different servers
- Each separated pair costs 100 soft points
Use case: Web servers and their cache should be together for low latency.
Soft Constraint: Minimize Servers Used
Business rule: “Use fewer servers to reduce power and cooling costs”
def minimize_servers_used(factory: ConstraintFactory):
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.count())
.penalize(HardSoftScore.ONE_SOFT, lambda server, count: 100)
.as_constraint("minimizeServersUsed")
)
How to read this:
- Find all assigned VMs
- Group by server and count VMs
- Each active server (with at least 1 VM) costs 100 soft points
Business concept: Server consolidation. An idle server still consumes power for cooling, lighting, and baseline operations. Packing VMs onto fewer servers reduces operational costs.
Soft Constraint: Balance Utilization
Business rule: “Distribute load evenly across active servers”
def balance_utilization(factory: ConstraintFactory):
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.cpu_cores))
.penalize(
HardSoftScore.ONE_SOFT,
lambda server, total_cpu: int((total_cpu / server.cpu_cores) ** 2 * 10) if server.cpu_cores > 0 else 0,
)
.as_constraint("balanceUtilization")
)
How to read this:
- Sum CPU usage per server
- Calculate utilization ratio (used/capacity)
- Apply squared penalty — heavily loaded servers cost more
Why squared? A server at 90% utilization is riskier than two servers at 45%. Squaring creates a “fairness” preference that spreads load.
Example:
| Scenario | Server A | Server B | Total Penalty |
|---|
| Imbalanced | 90% = 8.1 | 10% = 0.1 | 8.2 |
| Balanced | 50% = 2.5 | 50% = 2.5 | 5.0 |
Soft Constraint: Prioritize Placement
Business rule: “Higher-priority VMs should be placed first”
def prioritize_placement(factory: ConstraintFactory):
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is None)
.penalize(HardSoftScore.ONE_SOFT, lambda vm: 10000 + vm.priority * 1000)
.as_constraint("prioritizePlacement")
)
How to read this:
- Find unassigned VMs
- Penalize each based on priority
- Higher priority = larger penalty when unassigned
Why these weights? The base penalty (10000) ensures VMs get placed before other soft constraints are optimized. The priority multiplier (1000) makes high-priority VMs more “expensive” to leave unassigned.
Example penalties:
| Priority | Unassigned Penalty |
|---|
| 1 (low) | 11000 |
| 3 (medium) | 13000 |
| 5 (critical) | 15000 |
The Solver Engine
Now let’s see how the solver is configured. Open src/vm_placement/solver.py:
from solverforge_legacy.solver import (
SolverManager,
SolverConfig,
SolverFactory,
SolutionManager,
)
from solverforge_legacy.solver.config import (
ScoreDirectorFactoryConfig,
TerminationConfig,
Duration,
)
from .constraints import define_constraints
from .domain import VMPlacementPlan, VM
solver_config = SolverConfig(
solution_class=VMPlacementPlan,
entity_class_list=[VM],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(
spent_limit=Duration(seconds=30)
),
)
solver_manager = SolverManager.create(SolverFactory.create(solver_config))
solution_manager = SolutionManager.create(solver_manager)
Configuration Breakdown
solution_class: Your planning solution class (VMPlacementPlan)
entity_class_list: Planning entities to optimize ([VM])
- Note: Only
VM is listed, not Server — servers are problem facts
score_director_factory_config: Contains the constraint provider function
termination_config: When to stop solving
spent_limit=Duration(seconds=30): Stop after 30 seconds
SolverManager: Asynchronous Solving
SolverManager handles solving in the background without blocking your API:
# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, placement, callback_function)
# Check status
status = solver_manager.get_solver_status(job_id)
# Stop early
solver_manager.terminate_early(job_id)
Solving Timeline
Small problems (12 servers, 30 VMs):
- Initial valid placement: < 1 second
- Good placement: 5-10 seconds
- Near-optimal: 30 seconds
Large problems (50+ servers, 200 VMs):
- Initial valid placement: 2-5 seconds
- Good placement: 30-60 seconds
- High-quality: 2-5 minutes
Factors affecting speed:
- Number of servers and VMs (search space size)
- Constraint tightness (capacity headroom)
- Anti-affinity groups (placement restrictions)
Web Interface and API
REST API Endpoints
Open src/vm_placement/rest_api.py to see the API:
GET /demo-data
Returns available demo datasets:
["SMALL", "MEDIUM", "LARGE"]
GET /demo-data/{dataset_id}
Generates and returns sample infrastructure data:
{
"name": "Small Datacenter",
"servers": [
{
"id": "server-1",
"name": "Rack-A-Server-1",
"cpuCores": 32,
"memoryGb": 128,
"storageGb": 2000,
"rack": "Rack-A"
}
],
"vms": [
{
"id": "vm-1",
"name": "DB-Primary",
"cpuCores": 8,
"memoryGb": 32,
"storageGb": 500,
"priority": 5,
"antiAffinityGroup": "db-replicas",
"server": null
}
]
}
POST /demo-data/generate
Generate custom infrastructure with configurable parameters:
Request body:
{
"rack_count": 3,
"servers_per_rack": 4,
"vm_count": 30
}
Response: Same format as demo-data response
POST /placements
Submit a placement problem for optimization:
Request body: Same format as demo-data response
Response: Job ID as plain text
"a1b2c3d4-e5f6-7890-abcd-ef1234567890"
GET /placements/{problem_id}
Get current solution:
{
"name": "Small Datacenter",
"servers": [...],
"vms": [...],
"score": "0hard/-2500soft",
"solverStatus": "SOLVING_ACTIVE",
"totalServers": 12,
"activeServers": 6,
"unassignedVms": 0
}
GET /placements/{problem_id}/status
Lightweight status check with metrics:
{
"name": "Small Datacenter",
"score": "0hard/-2500soft",
"solverStatus": "SOLVING_ACTIVE",
"activeServers": 6,
"unassignedVms": 0
}
DELETE /placements/{problem_id}
Stop solving early and return best solution found.
PUT /placements/analyze
Analyze a placement’s constraint violations in detail:
{
"constraints": [
{
"name": "cpuCapacity",
"weight": "1hard",
"score": "-2hard",
"matches": [
{
"name": "cpuCapacity",
"score": "-2hard",
"justification": "Server-1: 34 cores used, 32 available"
}
]
}
]
}
Web UI Flow
The static/app.js implements this polling workflow:
- User opens page → Load demo data (
GET /demo-data/SMALL) - Display servers organized by rack with utilization bars
- User clicks “Solve” →
POST /placements (get job ID back) - Poll
GET /placements/{id} every 2 seconds - Update UI with latest assignments and score in real-time
- When
solverStatus === "NOT_SOLVING" → Stop polling - Display final score, server utilization, and VM assignments
Advanced Settings
The web UI includes configurable sliders (in static/config.js):
- Racks: Number of server racks (1-8)
- Servers per Rack: Servers in each rack (2-10)
- VMs: Number of VMs to place (5-200)
- Solver Time: How long to optimize (5s-2min)
Click “Generate New Data” to create custom scenarios.
Making Your First Customization
Let’s add a new constraint that demonstrates a common datacenter pattern: rack-aware fault tolerance.
The Rack Diversity Constraint
Business rule: “VMs in the same anti-affinity group should be spread across different racks, not just different servers”
Why This Matters
If two database replicas are on different servers but the same rack, a rack-level failure (power, networking, cooling) takes out both. True high availability requires rack diversity.
Implementation
Add this to src/vm_placement/constraints.py:
def rack_diversity(factory: ConstraintFactory):
"""
Soft constraint: Anti-affinity VMs should be on different racks.
Provides rack-level fault tolerance beyond just server separation.
"""
return (
factory.for_each_unique_pair(
VM,
Joiners.equal(lambda vm: vm.anti_affinity_group),
)
.filter(lambda vm1, vm2: vm1.anti_affinity_group is not None)
.filter(lambda vm1, vm2: vm1.server is not None and vm2.server is not None)
.filter(lambda vm1, vm2: vm1.server.rack == vm2.server.rack)
.penalize(HardSoftScore.ONE_SOFT, lambda vm1, vm2: 50)
.as_constraint("rackDiversity")
)
How to read this:
- Find pairs of VMs in the same anti-affinity group
- Both must be assigned
- Penalize if they’re on the same rack (even if different servers)
- Each same-rack pair costs 50 soft points
Registering the Constraint
Add it to define_constraints():
@constraint_provider
def define_constraints(factory: ConstraintFactory):
return [
# Hard constraints
cpu_capacity(factory),
memory_capacity(factory),
storage_capacity(factory),
anti_affinity(factory),
# Soft constraints
affinity(factory),
minimize_servers_used(factory),
balance_utilization(factory),
prioritize_placement(factory),
rack_diversity(factory), # ADD THIS LINE
]
Adding Tests
Add to tests/test_constraints.py:
from vm_placement.constraints import rack_diversity
def test_rack_diversity_same_rack():
"""Anti-affinity VMs on same rack should be penalized."""
server1 = Server(id="s1", name="Server1", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-A")
server2 = Server(id="s2", name="Server2", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-A")
vm1 = VM(id="vm1", name="DB-Primary", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
vm2 = VM(id="vm2", name="DB-Replica", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
assign(server1, vm1)
assign(server2, vm2)
(
constraint_verifier.verify_that(rack_diversity)
.given(server1, server2, vm1, vm2)
.penalizes_by(50)
)
def test_rack_diversity_different_racks():
"""Anti-affinity VMs on different racks should not be penalized."""
server1 = Server(id="s1", name="Server1", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-A")
server2 = Server(id="s2", name="Server2", cpu_cores=32, memory_gb=128, storage_gb=2000, rack="Rack-B")
vm1 = VM(id="vm1", name="DB-Primary", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
vm2 = VM(id="vm2", name="DB-Replica", cpu_cores=8, memory_gb=32, storage_gb=500, anti_affinity_group="db-replicas")
assign(server1, vm1)
assign(server2, vm2)
(
constraint_verifier.verify_that(rack_diversity)
.given(server1, server2, vm1, vm2)
.penalizes_by(0)
)
Run with:
pytest tests/test_constraints.py -v -k "rack_diversity"
Advanced Constraint Patterns
Pattern 1: GPU Requirement
Scenario: Some VMs need GPU-equipped servers.
First, add a has_gpu field to Server and requires_gpu to VM in domain.py:
@dataclass
class Server:
# ... existing fields ...
has_gpu: bool = False
@planning_entity
@dataclass
class VM:
# ... existing fields ...
requires_gpu: bool = False
Then add the constraint:
def gpu_requirement(factory: ConstraintFactory):
"""
Hard constraint: GPU VMs must be placed on GPU servers.
"""
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.filter(lambda vm: vm.requires_gpu)
.filter(lambda vm: not vm.server.has_gpu)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("gpuRequirement")
)
Pattern 2: Maintenance Window Avoidance
Scenario: Some servers are scheduled for maintenance and shouldn’t receive new VMs.
@dataclass
class Server:
# ... existing fields ...
in_maintenance: bool = False
def avoid_maintenance_servers(factory: ConstraintFactory):
"""
Soft constraint: Prefer servers not in maintenance window.
VMs can still be placed there if necessary, but it's discouraged.
"""
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.filter(lambda vm: vm.server.in_maintenance)
.penalize(HardSoftScore.ONE_SOFT, lambda vm: 500)
.as_constraint("avoidMaintenanceServers")
)
Pattern 3: Memory Overcommit Limit
Scenario: Allow memory overcommit up to 120%, but heavily penalize beyond that.
def memory_overcommit_limit(factory: ConstraintFactory):
"""
Soft constraint: Penalize memory overcommit beyond 120%.
Many hypervisors support memory overcommit, but excessive overcommit
causes performance degradation.
"""
OVERCOMMIT_RATIO = 1.2
return (
factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.memory_gb))
.filter(lambda server, total_mem: total_mem > server.memory_gb * OVERCOMMIT_RATIO)
.penalize(
HardSoftScore.ONE_SOFT,
lambda server, total_mem: int((total_mem - server.memory_gb * OVERCOMMIT_RATIO) * 100)
)
.as_constraint("memoryOvercommitLimit")
)
Pattern 4: Prefer Same Rack for Affinity
Scenario: When VMs can’t be on the same server, prefer the same rack.
def affinity_same_rack_preference(factory: ConstraintFactory):
"""
Soft constraint: Affinity VMs on different servers should prefer same rack.
Provides lower latency than cross-rack communication.
"""
return (
factory.for_each_unique_pair(
VM,
Joiners.equal(lambda vm: vm.affinity_group),
)
.filter(lambda vm1, vm2: vm1.affinity_group is not None)
.filter(lambda vm1, vm2: vm1.server is not None and vm2.server is not None)
.filter(lambda vm1, vm2: vm1.server != vm2.server)
.filter(lambda vm1, vm2: vm1.server.rack != vm2.server.rack)
.penalize(HardSoftScore.ONE_SOFT, lambda vm1, vm2: 25)
.as_constraint("affinitySameRackPreference")
)
Testing and Validation
Unit Testing Constraints
The quickstart uses ConstraintVerifier for isolated constraint testing. See tests/test_constraints.py:
from solverforge_legacy.solver.test import ConstraintVerifier
from vm_placement.domain import Server, VM, VMPlacementPlan
from vm_placement.constraints import define_constraints
# VM is the only planning entity (Server is a problem fact)
constraint_verifier = ConstraintVerifier.build(
define_constraints, VMPlacementPlan, VM
)
def assign(server: Server, *vms: VM):
"""Helper to assign VMs to a server."""
for vm in vms:
vm.server = server
Test patterns:
Verify no penalty:
def test_cpu_capacity_not_exceeded():
server = Server(id="s1", name="Server1", cpu_cores=16, memory_gb=64, storage_gb=500)
vm1 = VM(id="vm1", name="VM1", cpu_cores=4, memory_gb=8, storage_gb=50)
vm2 = VM(id="vm2", name="VM2", cpu_cores=8, memory_gb=16, storage_gb=100)
assign(server, vm1, vm2)
(
constraint_verifier.verify_that(cpu_capacity)
.given(server, vm1, vm2)
.penalizes_by(0)
)
Verify exact penalty amount:
def test_cpu_capacity_exceeded():
server = Server(id="s1", name="Server1", cpu_cores=16, memory_gb=64, storage_gb=500)
vm1 = VM(id="vm1", name="VM1", cpu_cores=12, memory_gb=8, storage_gb=50)
vm2 = VM(id="vm2", name="VM2", cpu_cores=8, memory_gb=16, storage_gb=100)
assign(server, vm1, vm2)
# 12 + 8 = 20 cores, capacity = 16, excess = 4
(
constraint_verifier.verify_that(cpu_capacity)
.given(server, vm1, vm2)
.penalizes_by(4)
)
Verify anti-affinity violation:
def test_anti_affinity_violated():
server = Server(id="s1", name="Server1", cpu_cores=16, memory_gb=64, storage_gb=500)
vm1 = VM(id="vm1", name="DB-Primary", cpu_cores=4, memory_gb=8, storage_gb=50, anti_affinity_group="db-replicas")
vm2 = VM(id="vm2", name="DB-Replica", cpu_cores=4, memory_gb=8, storage_gb=50, anti_affinity_group="db-replicas")
assign(server, vm1, vm2)
# Both VMs on same server with same anti-affinity group = 1 violation
(
constraint_verifier.verify_that(anti_affinity)
.given(server, vm1, vm2)
.penalizes_by(1)
)
Running Tests
# All tests
pytest
# Verbose output
pytest -v
# Specific test file
pytest tests/test_constraints.py
# Specific test function
pytest tests/test_constraints.py::test_cpu_capacity_exceeded
# With coverage
pytest --cov=vm_placement
Quick Reference
File Locations
| Need to… | Edit this file |
|---|
| Add/change business rule | src/vm_placement/constraints.py |
| Add field to VM or Server | src/vm_placement/domain.py + converters.py |
| Change solve time | src/vm_placement/solver.py |
| Add REST endpoint | src/vm_placement/rest_api.py |
| Change demo data | src/vm_placement/demo_data.py |
| Change UI | static/index.html, static/app.js |
Common Constraint Patterns
Sum resource usage per server:
constraint_factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.sum(lambda vm: vm.cpu_cores))
.filter(lambda server, total: total > server.cpu_cores)
.penalize(...)
Find pairs with same group:
constraint_factory.for_each_unique_pair(
VM,
Joiners.equal(lambda vm: vm.some_group),
Joiners.equal(lambda vm: vm.server), # On same server
)
.filter(lambda vm1, vm2: vm1.some_group is not None)
.penalize(...)
Count active servers:
constraint_factory.for_each(VM)
.filter(lambda vm: vm.server is not None)
.group_by(lambda vm: vm.server, ConstraintCollectors.count())
.penalize(...) # Each active server incurs cost
Penalize unassigned entities:
constraint_factory.for_each(VM)
.filter(lambda vm: vm.server is None)
.penalize(HardSoftScore.ONE_SOFT, lambda vm: 10000 + vm.priority * 1000)
Common Gotchas
Forgot to register constraint in define_constraints() return list
- Symptom: Constraint not enforced
Using wrong score type
HardSoftScore.ONE_HARD for must-satisfy rulesHardSoftScore.ONE_SOFT for preferences
Server is a problem fact, not an entity
- Don’t add Server to
entity_class_list - Don’t add Server to
ConstraintVerifier.build()
Forgetting to check for None
- Always filter
vm.server is not None before accessing server properties
Score sign confusion
- Higher soft score is better (less negative)
- Use
.reward() to add points, .penalize() to subtract
Forgetting to include problem facts in tests
constraint_verifier.verify_that(...).given(server, vm1, vm2) — servers must be included
Debugging Tips
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Use the /analyze endpoint:
curl -X PUT http://localhost:8080/placements/analyze \
-H "Content-Type: application/json" \
-d @my_placement.json
Print in constraints (temporary debugging):
.filter(lambda vm: (
print(f"Checking {vm.name}: server={vm.server}") or
vm.server is not None
))
Additional Resources
5 - Meeting Scheduling
A comprehensive quickstart guide to understanding and building intelligent meeting scheduling with SolverForge
Legacy Implementation Guide
This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.
SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.
Table of Contents
- Introduction
- Getting Started
- The Problem We’re Solving
- Understanding the Data Model
- How Scheduling Optimization Works
- Writing Constraints: The Business Rules
- The Solver Engine
- Web Interface and API
- Making Your First Customization
- Advanced Constraint Patterns
- Testing and Validation
- Production Considerations
- Quick Reference
Introduction
What You’ll Learn
This guide walks you through a complete meeting scheduling application built with SolverForge, a constraint-based optimization framework. You’ll learn:
- How to model complex scheduling problems with multiple resource types (time slots, rooms, people)
- How to handle hierarchical constraints with different priority levels
- How to balance competing objectives (minimize conflicts, pack meetings early, encourage breaks)
- How to customize the system for your organization’s meeting policies
No optimization background required — we’ll explain concepts as we encounter them in the code.
Architecture Note
This implementation uses dataclass domain models for optimal solver performance. See benchmark results showing this approach completes 60/60 optimization iterations while Pydantic-based alternatives complete only 46-58. Note: benchmarks were run on small test problems; JPype bridge overhead may increase at larger scales.
Prerequisites
- Basic Python knowledge (classes, functions, type annotations)
- Familiarity with REST APIs
- Comfort with command-line operations
- Understanding of calendar/scheduling concepts
What is Meeting Scheduling Optimization?
Traditional approach: Manually coordinate calendars, send emails back and forth, book rooms one by one.
Meeting scheduling optimization: You describe your meetings, attendees, available rooms, and constraints — the solver automatically finds a schedule that satisfies requirements while minimizing conflicts.
Think of it like having an executive assistant who can evaluate millions of scheduling combinations per second to find arrangements that work for everyone.
Getting Started
Running the Application
Navigate to the project directory:
cd /srv/lab/dev/solverforge/solverforge-quickstarts/legacy/meeting-scheduling-fast
Create and activate virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
Install the package:
Start the server:
Open your browser:
http://localhost:8080
You’ll see a calendar interface with meetings, rooms, and people. Click “Solve” and watch the solver automatically schedule meetings into time slots and rooms while avoiding conflicts.
File Structure Overview
src/meeting_scheduling/
├── domain.py # Data classes (Meeting, Person, Room, TimeGrain)
├── constraints.py # Business rules (conflicts, capacity, timing)
├── solver.py # Solver configuration
├── demo_data.py # Sample data generation
├── rest_api.py # HTTP API endpoints
├── converters.py # REST ↔ Domain model conversion
├── json_serialization.py # JSON serialization helpers
└── score_analysis.py # Score breakdown DTOs
static/
├── index.html # Web UI
└── app.js # UI logic and visualization
tests/
├── test_constraints.py # Unit tests for constraints
└── test_feasible.py # Integration tests
Key insight: Most business customization happens in constraints.py. The domain model defines what can be scheduled, but constraints define what makes a good schedule.
The Problem We’re Solving
The Meeting Scheduling Challenge
You need to assign meetings to time slots and rooms while satisfying rules like:
Hard constraints (must be satisfied):
- Rooms cannot be double-booked (no overlapping meetings in same room)
- Required attendees cannot be double-booked
- Meetings must fit within available time slots (no overtime)
- Rooms must have sufficient capacity for all attendees
- Meetings cannot span multiple days (must start and end same day)
Medium constraints (strong preferences):
- Avoid conflicts where a person is required at one meeting and preferred at another
- Avoid conflicts for preferred attendees
Soft constraints (optimization goals):
- Schedule meetings as early as possible
- Encourage breaks between consecutive meetings
- Minimize general overlapping meetings
- Use larger rooms first (efficient room utilization)
- Minimize room switches for attendees
Why This is Hard
For even 10 meetings, 5 rooms, and 20 time slots per day, there are over 10 trillion possible schedules. With 24 meetings like in the demo, the possibilities become astronomical.
The challenges:
- Resource coordination: Must simultaneously allocate time, rooms, and people
- Conflict resolution: One assignment affects availability of all resources
- Priority balancing: Hard constraints must hold while optimizing preferences
- Temporal dependencies: Meeting times affect break patterns and room switch distances
Scheduling optimization algorithms use sophisticated strategies to explore this space efficiently, finding high-quality schedules in seconds.
Understanding the Data Model
Let’s examine the core classes that model our scheduling problem. Open src/meeting_scheduling/domain.py:
The Person Class
@dataclass
class Person:
id: Annotated[str, PlanningId]
full_name: str
What it represents: An attendee who can be required or preferred at meetings.
Key fields:
id: Unique identifier (the PlanningId annotation tells SolverForge this is the primary key)full_name: Display name (e.g., “Amy Cole”)
Optimization concept: People are resources that can be allocated to meetings. Unlike employee scheduling where employees are assigned to shifts, here people’s attendance at meetings creates constraints but isn’t directly optimized.
The TimeGrain Class
# Time slot granularity (configurable)
GRAIN_LENGTH_IN_MINUTES = 15
@dataclass
class TimeGrain:
grain_index: Annotated[int, PlanningId]
day_of_year: int
starting_minute_of_day: int
@property
def ending_minute_of_day(self) -> int:
return self.starting_minute_of_day + GRAIN_LENGTH_IN_MINUTES
What it represents: A discrete time slot (default: 15 minutes).
Time grain granularity: The GRAIN_LENGTH_IN_MINUTES constant (defined at the top of domain.py) controls the scheduling precision. The default 15-minute grain balances precision with search space size. Smaller grains (5 minutes) offer more flexibility but slower solving. Larger grains (30 minutes) solve faster but with less scheduling flexibility.
Key fields:
grain_index: Sequential index across all days (0, 1, 2, … for the entire planning horizon)day_of_year: Which day (1-365)starting_minute_of_day: Time within the day (e.g., 480 = 8:00 AM, 540 = 9:00 AM)
Why discrete time slots?
Instead of continuous time, meetings snap to 15-minute intervals. This:
- Simplifies conflict detection (integer comparisons)
- Matches real-world calendar behavior
- Reduces search space (finite number of start times)
Optimization concept: This is time discretization — converting continuous time into discrete slots. It’s a common technique in scheduling to make problems tractable.
Example time grains:
grain_index=0 → Day 1, 8:00-8:15 AM (starting_minute_of_day=480)
grain_index=1 → Day 1, 8:15-8:30 AM (starting_minute_of_day=495)
grain_index=39 → Day 1, 5:45-6:00 PM (starting_minute_of_day=1065)
grain_index=40 → Day 2, 8:00-8:15 AM (starting_minute_of_day=480)
The Room Class
@dataclass
class Room:
id: Annotated[str, PlanningId]
name: str
capacity: int
What it represents: A physical meeting room.
Key fields:
name: Display name (e.g., “Room A”, “Conference Room”)capacity: Maximum number of people it can hold
Optimization concept: Rooms are constrained resources. Each room can host at most one meeting at a time, and must be large enough for attendees.
The Meeting Class
@dataclass
class Meeting:
id: Annotated[str, PlanningId]
topic: str
duration_in_grains: int
required_attendances: list[RequiredAttendance]
preferred_attendances: list[PreferredAttendance]
speakers: list[Person]
entire_group_meeting: bool
What it represents: A meeting that needs to be scheduled.
Key fields:
topic: Meeting subject (e.g., “Sprint Planning”, “Budget Review”)duration_in_grains: Length in 15-minute slots (e.g., 8 = 2 hours)required_attendances: People who must attend (hard constraint)preferred_attendances: People who should attend if possible (soft constraint)speakers: Optional presenter listentire_group_meeting: Flag for all-hands meetings
Attendance types:
Required attendance (hard constraint):
@dataclass
class RequiredAttendance:
id: Annotated[str, PlanningId]
person: Person
meeting: Meeting
The person must be available at the meeting time. Conflicts are hard constraint violations.
Preferred attendance (soft constraint):
@dataclass
class PreferredAttendance:
id: Annotated[str, PlanningId]
person: Person
meeting: Meeting
The person should attend if possible, but conflicts are only soft penalties.
Optimization concept: This is hierarchical attendance — distinguishing must-have from nice-to-have attendees. It allows flexible scheduling when not everyone can attend.
The MeetingAssignment Class (Planning Entity)
@planning_entity
@dataclass
class MeetingAssignment:
id: Annotated[str, PlanningId]
meeting: Meeting
starting_time_grain: Annotated[TimeGrain | None, PlanningVariable] = None
room: Annotated[Room | None, PlanningVariable] = None
pinned: bool = False
What it represents: A decision about when and where to hold a meeting.
Key fields:
meeting: Reference to the Meeting object (immutable)starting_time_grain: When the meeting starts — this is a planning variable!room: Where the meeting is held — this is also a planning variable!pinned: If True, this assignment is fixed and won’t be changed by the solver
Annotations:
@planning_entity: Tells SolverForge this class contains decisions to makePlanningVariable: Marks fields as decision variables
Optimization concept: Unlike employee scheduling (one variable: which employee) or vehicle routing (one list variable: which visits), this problem has two independent planning variables per entity. The solver must simultaneously decide both time and room.
Why multiple planning variables matter: Having two planning variables (time and room) per entity creates a larger search space but more flexibility. The dataclass-based domain model enables efficient evaluation of variable combinations. For architectural details on why dataclasses outperform Pydantic in constraint evaluation, see Dataclasses vs Pydantic in Constraint Solvers.
Important methods:
def get_last_time_grain_index(self) -> int:
"""Calculate when meeting ends."""
return self.starting_time_grain.grain_index + self.meeting.duration_in_grains - 1
def calculate_overlap(self, other: 'MeetingAssignment') -> int:
"""Calculate overlap in time grains with another meeting."""
if self.starting_time_grain is None or other.starting_time_grain is None:
return 0
start1 = self.starting_time_grain.grain_index
end1 = self.get_last_time_grain_index()
start2 = other.starting_time_grain.grain_index
end2 = other.get_last_time_grain_index()
# Interval intersection
overlap_start = max(start1, start2)
overlap_end = min(end1, end2)
return max(0, overlap_end - overlap_start + 1)
This helper enables efficient overlap detection in constraints.
The MeetingSchedule Class (Planning Solution)
@planning_solution
@dataclass
class MeetingSchedule:
day_list: Annotated[list[int], ProblemFactCollectionProperty, ValueRangeProvider]
time_grain_list: Annotated[list[TimeGrain], ProblemFactCollectionProperty, ValueRangeProvider]
room_list: Annotated[list[Room], ProblemFactCollectionProperty, ValueRangeProvider]
person_list: Annotated[list[Person], ProblemFactCollectionProperty]
meeting_list: Annotated[list[Meeting], ProblemFactCollectionProperty]
meeting_assignment_list: Annotated[list[MeetingAssignment], PlanningEntityCollectionProperty]
score: Annotated[HardMediumSoftScore | None, PlanningScore] = None
solver_status: SolverStatus = SolverStatus.NOT_SOLVING
What it represents: The complete scheduling problem and its solution.
Key fields:
time_grain_list: All available time slots (value range for starting_time_grain)room_list: All available rooms (value range for room)person_list: All people who might attend meetingsmeeting_list: All meetings that need schedulingmeeting_assignment_list: The planning entities (what the solver optimizes)score: Solution quality metricsolver_status: Whether solving is active
Annotations explained:
@planning_solution: Marks this as the top-level problem definitionProblemFactCollectionProperty: Immutable input dataValueRangeProvider: Collections that provide possible values for planning variablesPlanningEntityCollectionProperty: The entities being optimizedPlanningScore: Where the solver stores calculated quality
Optimization concept: The ValueRangeProvider annotations tell the solver: “When assigning starting_time_grain, choose from time_grain_list; when assigning room, choose from room_list.”
How Scheduling Optimization Works
Before diving into constraints, let’s understand the scheduling process.
The Three-Tier Scoring System
Unlike employee scheduling (Hard/Soft) or vehicle routing (Hard/Soft), meeting scheduling uses three levels:
Score format: "0hard/0medium/-1234soft"
Hard constraints (priority 1):
- Room conflicts
- Required attendee conflicts
- Room capacity
- Meetings within available time
- Same-day constraints
Medium constraints (priority 2):
- Required vs preferred attendee conflicts
- Preferred attendee conflicts
Soft constraints (priority 3):
- Schedule meetings early
- Breaks between meetings
- Minimize overlaps
- Room utilization
- Room stability
Why three tiers?
This creates a hierarchy: hard > medium > soft. A solution with 0hard/-100medium/-5000soft is better than 0hard/-50medium/-1000soft even though soft score is worse, because medium takes priority.
Optimization concept: This is lexicographic scoring with three levels instead of two. It’s useful when you have multiple categories of preferences with clear priority relationships.
The Search Process
- Initial solution: Often all meetings unassigned or randomly assigned
- Evaluate score: Calculate all constraint penalties across three tiers
- Make a move:
- Change a meeting’s time slot
- Change a meeting’s room
- Swap two meetings’ times or rooms
- Re-evaluate score (incrementally)
- Accept if improvement (considering all three score levels)
- Repeat millions of times
- Return best solution found
Move types specific to meeting scheduling:
- Change time: Move meeting to different time slot
- Change room: Move meeting to different room
- Change both: Simultaneously change time and room
- Swap times: Exchange times of two meetings
- Swap rooms: Exchange rooms of two meetings
Why Multiple Planning Variables Matter
Having two planning variables (time and room) per entity creates interesting dynamics:
Independent optimization: The solver can change time without changing room, or vice versa.
Coordinated moves: Sometimes changing both together is better than changing separately.
Search space: With T time slots and R rooms, each meeting has T × R possible assignments (much larger than T or R alone).
Writing Constraints: The Business Rules
Now the heart of the system. Open src/meeting_scheduling/constraints.py.
The Constraint Provider Pattern
All constraints are registered in one function:
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
return [
# Hard constraints
room_conflict(constraint_factory),
avoid_overtime(constraint_factory),
required_attendance_conflict(constraint_factory),
required_room_capacity(constraint_factory),
start_and_end_on_same_day(constraint_factory),
# Medium constraints
required_and_preferred_attendance_conflict(constraint_factory),
preferred_attendance_conflict(constraint_factory),
# Soft constraints
do_meetings_as_soon_as_possible(constraint_factory),
one_break_between_consecutive_meetings(constraint_factory),
overlapping_meetings(constraint_factory),
assign_larger_rooms_first(constraint_factory),
room_stability(constraint_factory),
]
Let’s examine each constraint category.
Hard Constraints
Hard Constraint: Room Conflict
Business rule: “No two meetings can use the same room at overlapping times.”
def room_conflict(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each_unique_pair(
MeetingAssignment,
Joiners.equal(lambda meeting_assignment: meeting_assignment.room),
)
.filter(lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2) > 0)
.penalize(
HardMediumSoftScore.ONE_HARD,
lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2)
)
.as_constraint("Room conflict")
)
How to read this:
for_each_unique_pair(MeetingAssignment, ...): Create pairs of meeting assignmentsJoiners.equal(...): Only pair meetings assigned to the same room.filter(...): Keep only pairs that overlap in time.penalize(ONE_HARD, ...): Penalize by number of overlapping time grains
Example scenario:
Room A, Time grains 0-11 (8:00 AM - 11:00 AM):
- Meeting 1: Time grains 0-7 (8:00-10:00 AM, 2 hours)
- Meeting 2: Time grains 4-11 (9:00-11:00 AM, 2 hours)
- Overlap: Time grains 4-7 (9:00-10:00 AM) = 4 grains → Penalty: 4 hard points
Optimization concept: This is a resource conflict constraint. The resource (room) has limited capacity (one meeting at a time), and the penalty is proportional to the conflict severity (overlap duration).
Hard Constraint: Avoid Overtime
Business rule: “Meetings cannot extend beyond available time slots.”
def avoid_overtime(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.filter(lambda meeting_assignment:
meeting_assignment.get_last_time_grain_index() >= len(time_grain_list))
.penalize(
HardMediumSoftScore.ONE_HARD,
lambda meeting_assignment:
meeting_assignment.get_last_time_grain_index() - len(time_grain_list) + 1
)
.as_constraint("Don't go in overtime")
)
How to read this:
for_each(MeetingAssignment): Consider every meeting.filter(...): Keep meetings that end beyond the last available time grain.penalize(...): Penalize by how far past the boundary
Example scenario:
Time grains available: 0-155 (156 total grains = 4 days × 39 grains/day)
- Meeting starts at grain 150, duration 8 grains
- Ends at grain 157 (150 + 8 - 1 = 157)
- Overtime: 157 - 155 = 2 grains → Penalty: 2 hard points
Note: This constraint assumes time_grain_list is available in the scope. In practice, it’s passed via closure or accessed from the solution object.
Hard Constraint: Required Attendance Conflict
Business rule: “Required attendees cannot be double-booked.”
def required_attendance_conflict(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(RequiredAttendance)
.join(
RequiredAttendance,
Joiners.equal(lambda attendance: attendance.person),
Joiners.less_than(lambda attendance: attendance.id)
)
.filter(
lambda attendance1, attendance2:
attendance1.meeting.meeting_assignment.calculate_overlap(
attendance2.meeting.meeting_assignment
) > 0
)
.penalize(
HardMediumSoftScore.ONE_HARD,
lambda attendance1, attendance2:
attendance1.meeting.meeting_assignment.calculate_overlap(
attendance2.meeting.meeting_assignment
)
)
.as_constraint("Required attendance conflict")
)
How to read this:
for_each(RequiredAttendance): Consider every required attendance.join(RequiredAttendance, ...): Pair with other required attendancesJoiners.equal(...): Only pair attendances for the same personJoiners.less_than(...): Ensure each pair counted once (ordering by ID).filter(...): Keep pairs where meetings overlap in time.penalize(...): Penalize by overlap duration
Example scenario:
Person “Amy Cole” is required at:
- Meeting A: Time grains 0-7 (8:00-10:00 AM)
- Meeting B: Time grains 6-13 (8:30-10:30 AM)
- Overlap: Time grains 6-7 (9:30-10:00 AM) = 2 grains → Penalty: 2 hard points
Optimization concept: This is a person-centric conflict constraint. Unlike room conflicts (resource conflict), this is about a person’s availability (capacity of 1 meeting at a time).
Why join on RequiredAttendance instead of MeetingAssignment?
By joining on attendance records, we automatically filter to only the meetings each person is required at. This is more efficient than checking all meeting pairs and then filtering by attendees.
Hard Constraint: Required Room Capacity
Business rule: “Rooms must have enough capacity for all attendees (required + preferred).”
def required_room_capacity(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.filter(
lambda meeting_assignment:
len(meeting_assignment.meeting.required_attendances) +
len(meeting_assignment.meeting.preferred_attendances) >
meeting_assignment.room.capacity
)
.penalize(
HardMediumSoftScore.ONE_HARD,
lambda meeting_assignment:
len(meeting_assignment.meeting.required_attendances) +
len(meeting_assignment.meeting.preferred_attendances) -
meeting_assignment.room.capacity
)
.as_constraint("Required room capacity")
)
How to read this:
for_each(MeetingAssignment): Consider every assigned meeting.filter(...): Keep meetings where attendee count exceeds room capacity.penalize(...): Penalize by the capacity shortage
Example scenario:
Meeting has:
- Required attendees: 8 people
- Preferred attendees: 4 people
- Total: 12 people
Assigned to Room B (capacity 10):
- Shortage: 12 - 10 = 2 people → Penalty: 2 hard points
Design choice: Count both required and preferred attendees. You could alternatively only count required attendees (and make preferred overflow a soft constraint).
Hard Constraint: Start and End on Same Day
Business rule: “Meetings cannot span multiple days.”
def start_and_end_on_same_day(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.join(
TimeGrain,
Joiners.equal(
lambda meeting_assignment: meeting_assignment.get_last_time_grain_index(),
lambda time_grain: time_grain.grain_index
)
)
.filter(
lambda meeting_assignment, last_time_grain:
meeting_assignment.starting_time_grain.day_of_year !=
last_time_grain.day_of_year
)
.penalize(HardMediumSoftScore.ONE_HARD)
.as_constraint("Start and end on same day")
)
How to read this:
for_each(MeetingAssignment): All meetings.join(TimeGrain, ...): Join with the time grain where meeting ends.filter(...): Keep meetings where start day ≠ end day.penalize(ONE_HARD): Simple binary penalty
Example scenario:
Meeting starts at:
- Time grain 35 (day 1, 5:15 PM,
starting_minute_of_day = 1035) - Duration: 8 grains (2 hours)
- Ends at grain 42 (day 2, 8:00 AM)
- Start day (1) ≠ End day (2) → Penalty: 1 hard point
Optimization concept: This enforces a temporal boundary constraint. Meetings respect day boundaries, which is realistic for most organizations.
Medium Constraints
Medium constraints sit between hard (must satisfy) and soft (nice to have). They represent strong preferences that should rarely be violated.
Medium Constraint: Required and Preferred Attendance Conflict
Business rule: “Strongly discourage conflicts where a person is required at one meeting and preferred at another.”
def required_and_preferred_attendance_conflict(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(RequiredAttendance)
.join(
PreferredAttendance,
Joiners.equal(
lambda required: required.person,
lambda preferred: preferred.person
)
)
.filter(
lambda required, preferred:
required.meeting.meeting_assignment.calculate_overlap(
preferred.meeting.meeting_assignment
) > 0
)
.penalize(
HardMediumSoftScore.ONE_MEDIUM,
lambda required, preferred:
required.meeting.meeting_assignment.calculate_overlap(
preferred.meeting.meeting_assignment
)
)
.as_constraint("Required and preferred attendance conflict")
)
How to read this:
for_each(RequiredAttendance): All required attendances.join(PreferredAttendance, ...): Pair with preferred attendances for same person.filter(...): Keep pairs where meetings overlap.penalize(ONE_MEDIUM, ...): Medium-level penalty by overlap
Example scenario:
Person “Bob Smith”:
- Required at Meeting A: Time grains 0-7
- Preferred at Meeting B: Time grains 4-11
- Overlap: 4 grains → Penalty: 4 medium points
Why medium instead of hard?
This is a degraded service scenario. The person can’t attend the preferred meeting (unfortunate) but can fulfill their required attendance (essential). It’s not ideal but acceptable.
Medium Constraint: Preferred Attendance Conflict
Business rule: “Discourage conflicts between two preferred attendances for the same person.”
def preferred_attendance_conflict(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(PreferredAttendance)
.join(
PreferredAttendance,
Joiners.equal(lambda attendance: attendance.person),
Joiners.less_than(lambda attendance: attendance.id)
)
.filter(
lambda attendance1, attendance2:
attendance1.meeting.meeting_assignment.calculate_overlap(
attendance2.meeting.meeting_assignment
) > 0
)
.penalize(
HardMediumSoftScore.ONE_MEDIUM,
lambda attendance1, attendance2:
attendance1.meeting.meeting_assignment.calculate_overlap(
attendance2.meeting.meeting_assignment
)
)
.as_constraint("Preferred attendance conflict")
)
Similar to required attendance conflict but for preferred attendees.
Why medium instead of soft?
Preferred attendees are still important — just not critical. Medium priority expresses “try hard to avoid this” without making it a hard requirement.
Soft Constraints
Soft constraints represent optimization goals and nice-to-have preferences.
Soft Constraint: Schedule Meetings Early
Business rule: “Prefer scheduling meetings earlier in the day/week rather than later.”
def do_meetings_as_soon_as_possible(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.penalize(
HardMediumSoftScore.ONE_SOFT,
lambda meeting_assignment: meeting_assignment.get_last_time_grain_index()
)
.as_constraint("Do all meetings as soon as possible")
)
How to read this:
for_each(MeetingAssignment): All meetings.penalize(ONE_SOFT, ...): Penalize by the ending time grain index
Why penalize by end time?
The later a meeting ends, the higher the penalty. This naturally pushes meetings toward earlier time slots.
Example scenarios:
- Meeting ends at grain 10 → Penalty: 10 soft points
- Meeting ends at grain 50 → Penalty: 50 soft points
- Meeting ends at grain 100 → Penalty: 100 soft points
The solver will prefer the first meeting’s timing.
Alternative formulation:
You could penalize by start time instead:
.penalize(ONE_SOFT, lambda ma: ma.starting_time_grain.grain_index)
Penalizing by end time accounts for both start time and duration, which can be more balanced.
Soft Constraint: Breaks Between Meetings
Business rule: “Encourage at least one 15-minute break between consecutive meetings.”
def one_break_between_consecutive_meetings(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.join(
MeetingAssignment,
Joiners.less_than(
lambda meeting: meeting.get_last_time_grain_index(),
lambda meeting: meeting.starting_time_grain.grain_index
)
)
.filter(
lambda meeting1, meeting2:
meeting1.get_last_time_grain_index() + 1 ==
meeting2.starting_time_grain.grain_index
)
.penalize(HardMediumSoftScore.of_soft(100))
.as_constraint("One time grain break between two consecutive meetings")
)
How to read this:
for_each(MeetingAssignment): All meetings.join(MeetingAssignment, ...): Pair with meetings that start after this one ends.filter(...): Keep pairs that are back-to-back (no gap).penalize(100 soft): Fixed penalty for consecutive meetings
Example scenario:
- Meeting A: Time grains 0-7 (ends at grain 7)
- Meeting B: Time grains 8-15 (starts at grain 8)
- Back-to-back:
7 + 1 == 8 → Penalty: 100 soft points
Why not check for shared attendees?
This constraint applies globally — any consecutive meetings are discouraged. You could enhance it to only penalize when attendees overlap:
.filter(lambda m1, m2: has_shared_attendees(m1, m2))
Soft Constraint: Minimize Overlapping Meetings
Business rule: “Generally discourage overlapping meetings, even in different rooms.”
def overlapping_meetings(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each_unique_pair(
MeetingAssignment,
Joiners.less_than(lambda meeting: meeting.id)
)
.filter(lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2) > 0)
.penalize(
HardMediumSoftScore.of_soft(10),
lambda meeting1, meeting2: meeting1.calculate_overlap(meeting2)
)
.as_constraint("Overlapping meetings")
)
How to read this:
for_each_unique_pair(MeetingAssignment, ...): All pairs of meetings.filter(...): Keep overlapping pairs.penalize(10 soft, ...): Penalize by overlap × 10
Why discourage overlaps in different rooms?
This creates a temporal spread of meetings. Benefits:
- Reduces hallway congestion
- Easier to find substitute attendees
- Better utilization of time slots
Weight tuning: The penalty weight (10) can be adjusted based on preference. Higher values more strongly discourage overlaps.
Soft Constraint: Assign Larger Rooms First
Business rule: “Prefer using larger rooms over smaller rooms.”
def assign_larger_rooms_first(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.join(
Room,
Joiners.greater_than(
lambda meeting_assignment: meeting_assignment.room.capacity,
lambda room: room.capacity
)
)
.penalize(
HardMediumSoftScore.ONE_SOFT,
lambda meeting_assignment, room:
room.capacity - meeting_assignment.room.capacity
)
.as_constraint("Assign larger rooms first")
)
How to read this:
for_each(MeetingAssignment): All meetings.join(Room, ...): Join with rooms larger than the assigned room.penalize(ONE_SOFT, ...): Penalize by capacity difference
Example scenario:
Available rooms: 30, 20, 16 capacity
Meeting assigned to Room B (capacity 20):
- Room A (capacity 30) exists and is larger
- Penalty: 30 - 20 = 10 soft points
If all larger rooms are used, no penalty.
Why prefer larger rooms?
This implements conservative resource allocation — use larger rooms by default, save smaller rooms for when larger ones are occupied. This maximizes flexibility.
Alternative approach: You could prefer smaller rooms that fit (minimize waste):
# Prefer smallest room that fits
.filter(lambda ma: ma.room.capacity >= required_capacity)
.reward(ONE_SOFT, lambda ma: ma.room.capacity)
The choice depends on your organization’s room utilization patterns.
Soft Constraint: Room Stability
Business rule: “Encourage attendees to stay in the same room for nearby meetings.”
This constraint handles both required and preferred attendees using concat() to combine both attendance types into a single stream:
def room_stability(constraint_factory: ConstraintFactory):
"""
Soft constraint: Encourages room stability for people attending multiple meetings.
Penalizes when a person attends meetings in different rooms that are close in time,
encouraging room stability. This handles both required and preferred attendees by
creating separate constraint streams that are combined.
Since Python doesn't have a common Attendance base class for RequiredAttendance
and PreferredAttendance, we use concat() to combine both attendance types into
a single stream.
"""
# Create a stream that combines both required and preferred attendances
return (
constraint_factory.for_each(RequiredAttendance)
.map(lambda ra: (ra.person, ra.meeting_id))
.concat(
constraint_factory.for_each(PreferredAttendance)
.map(lambda pa: (pa.person, pa.meeting_id))
)
.join(
constraint_factory.for_each(RequiredAttendance)
.map(lambda ra: (ra.person, ra.meeting_id))
.concat(
constraint_factory.for_each(PreferredAttendance)
.map(lambda pa: (pa.person, pa.meeting_id))
),
Joiners.equal(
lambda left: left[0], # person
lambda right: right[0], # person
),
Joiners.filtering(
lambda left, right: left[1] != right[1] # different meeting_id
),
)
.join(
MeetingAssignment,
Joiners.equal(
lambda left, right: left[1], # left.meeting_id
lambda assignment: assignment.meeting.id,
),
)
.join(
MeetingAssignment,
Joiners.equal(
lambda left, right, left_assignment: right[1], # right.meeting_id
lambda assignment: assignment.meeting.id,
),
Joiners.less_than(
lambda left, right, left_assignment: left_assignment.get_grain_index(),
lambda assignment: assignment.get_grain_index(),
),
Joiners.filtering(
lambda left, right, left_assignment, right_assignment:
left_assignment.room != right_assignment.room
),
Joiners.filtering(
lambda left, right, left_assignment, right_assignment:
right_assignment.get_grain_index()
- left_assignment.meeting.duration_in_grains
- left_assignment.get_grain_index()
<= 2
),
)
.penalize(HardMediumSoftScore.ONE_SOFT)
.as_constraint("Room stability")
)
How to read this:
- Create a combined stream of
(person, meeting_id) tuples from both RequiredAttendance and PreferredAttendance using concat() - Self-join on same person, different meetings
- Join to
MeetingAssignment to get time and room for left meeting - Join to
MeetingAssignment to get time and room for right meeting - Filter: different rooms and close in time (within 2 grains gap)
.penalize(ONE_SOFT): Simple penalty for room switches
Why use concat()?
Python doesn’t have a common base class for RequiredAttendance and PreferredAttendance. The concat() method combines two constraint streams into one, allowing us to treat both attendance types uniformly.
Example scenario:
Person “Carol Johnson”:
- Required at Meeting A at 9:00 AM in Room A
- Preferred at Meeting B at 9:30 AM in Room B (different room, close in time)
- Penalty: 1 soft point (room switch)
If Meeting B were also in Room A, no penalty.
Optimization concept: This is a locality constraint — encouraging spatial proximity for temporally close activities. It reduces attendee movement for both required and preferred attendees.
Time threshold: The <= 2 filter means within 2 time grains gap after the first meeting ends. Adjust this based on building size and walking times.
The Solver Engine
Now let’s see how the solver is configured. Open src/meeting_scheduling/solver.py:
solver_config = SolverConfig(
solution_class=MeetingSchedule,
entity_class_list=[MeetingAssignment],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)
solver_manager = SolverManager.create(solver_config)
solution_manager = SolutionManager.create(solver_manager)
Configuration Breakdown
solution_class: MeetingSchedule (the top-level planning solution)
entity_class_list: [MeetingAssignment] (the planning entities with variables)
score_director_factory_config: Links to define_constraints function
termination_config: Stops after 30 seconds
Multiple Planning Variables
Unlike simpler problems with one variable per entity, MeetingAssignment has two planning variables:
starting_time_grain (when)room (where)
The solver must optimize both simultaneously. This creates:
More complex search space: T × R possible combinations per meeting
More move types: Can change time, change room, or change both
Better flexibility: Can optimize time and room independently
Optimization concept: This is multi-variable planning. The solver uses specialized move selectors that understand how to efficiently explore both variables.
SolverManager: Asynchronous Solving
Meeting scheduling can take time for large problems. SolverManager enables non-blocking solving:
# Start solving (returns immediately)
solver_manager.solve_and_listen(job_id, schedule, callback_function)
# Check status
status = solver_manager.get_solver_status(job_id)
# Get current best solution (updates live)
solution = solver_manager.get_final_best_solution(job_id)
# Stop early
solver_manager.terminate_early(job_id)
SolutionManager: Score Analysis
The solution_manager provides detailed score breakdowns:
# Analyze solution
analysis = solution_manager.analyze(schedule)
# See which constraints fired
for constraint_analysis in analysis.constraint_analyses:
print(f"{constraint_analysis.name}: {constraint_analysis.score}")
for match in constraint_analysis.matches:
print(f" {match.justification}")
This shows exactly which constraints are violated and by how much — invaluable for debugging.
Solving Timeline
Small problems (10-15 meetings, 2-3 rooms, 2 days):
- Initial feasible solution: < 1 second
- Good solution: 5-10 seconds
- High-quality: 30 seconds
Medium problems (20-30 meetings, 3-5 rooms, 4 days):
- Initial feasible solution: 1-5 seconds
- Good solution: 30-60 seconds
- High-quality: 2-5 minutes
Large problems (50+ meetings, 5+ rooms, 5+ days):
- Initial feasible solution: 5-30 seconds
- Good solution: 5-10 minutes
- High-quality: 15-30 minutes
Factors affecting speed:
- Number of meetings (primary factor)
- Number of attendees per meeting (affects conflict constraints)
- Time grain granularity (finer = more options = slower)
- Constraint complexity
Web Interface and API
REST API Endpoints
Open src/meeting_scheduling/rest_api.py to see the API. It runs on port 8080.
GET /demo-data
Returns generated demo data:
Response:
{
"dayList": [1, 2, 3, 4],
"timeGrainList": [
{"grainIndex": 0, "dayOfYear": 1, "startingMinuteOfDay": 480},
{"grainIndex": 1, "dayOfYear": 1, "startingMinuteOfDay": 495},
...
],
"roomList": [
{"id": "room_0", "name": "Room 0", "capacity": 30},
{"id": "room_1", "name": "Room 1", "capacity": 20},
{"id": "room_2", "name": "Room 2", "capacity": 16}
],
"personList": [
{"id": "person_0", "fullName": "Amy Cole"},
...
],
"meetingList": [
{
"id": "meeting_0",
"topic": "Strategize B2B",
"durationInGrains": 8,
"requiredAttendances": [...],
"preferredAttendances": [...],
"entireGroupMeeting": false
},
...
],
"meetingAssignmentList": [
{
"id": "assignment_0",
"meeting": "meeting_0",
"startingTimeGrain": null,
"room": null,
"pinned": false
},
...
]
}
Demo data specs:
- 4 days
- 156 time grains (39 per day, 8 AM - 6 PM)
- 3 rooms (capacities 30, 20, 16)
- 20 people
- 24 meetings
POST /schedules
Submit a schedule for solving:
Request body: Same format as demo data
Response: Job ID
"a1b2c3d4-e5f6-7890-abcd-ef1234567890"
Implementation:
@app.post("/schedules")
async def solve(schedule: MeetingScheduleModel) -> str:
job_id = str(uuid4())
meeting_schedule = model_to_schedule(schedule)
data_sets[job_id] = meeting_schedule
solver_manager.solve_and_listen(
job_id,
meeting_schedule,
lambda solution: update_solution(job_id, solution)
)
return job_id
The solver runs in the background, continuously updating the best solution.
GET /schedules
List all active job IDs:
Response:
["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "b2c3d4e5-f6a7-8901-bcde-f23456789012"]
GET /schedules/{schedule_id}
Get current solution:
Response (while solving):
{
"meetingAssignmentList": [
{
"id": "assignment_0",
"meeting": "meeting_0",
"startingTimeGrain": {"grainIndex": 5, ...},
"room": {"id": "room_1", ...}
},
...
],
"score": "0hard/-150medium/-8945soft",
"solverStatus": "SOLVING_ACTIVE"
}
Response (finished):
{
"score": "0hard/0medium/-6234soft",
"solverStatus": "NOT_SOLVING"
}
GET /schedules/{problem_id}/status
Lightweight status check (doesn’t return full solution):
Response:
{
"score": "0hard/0medium/-6234soft",
"solverStatus": "NOT_SOLVING"
}
DELETE /schedules/{problem_id}
Stop solving early:
@app.delete("/schedules/{problem_id}")
async def stop_solving(problem_id: str) -> None:
solver_manager.terminate_early(problem_id)
Returns best solution found so far.
PUT /schedules/analyze
Analyze a solution’s score:
Request body: Complete schedule with assignments
Response:
{
"score": "-2hard/-50medium/-8945soft",
"constraints": [
{
"name": "Room conflict",
"score": "-2hard/0medium/0soft",
"matches": [
{
"justification": "Room room_1: Meeting A (grains 5-12) overlaps Meeting B (grains 10-17) by 2 grains",
"indictedObjects": ["assignment_5", "assignment_12"]
}
]
},
{
"name": "Required attendance conflict",
"score": "0hard/-50medium/0soft",
"matches": [...]
}
]
}
This endpoint is extremely useful for understanding why a solution has a particular score.
Web UI Flow
The static/app.js implements this workflow:
- Load demo data →
GET /demo-data - Display unscheduled meetings and available resources
- User clicks “Solve” →
POST /schedules (get job ID) - Poll
GET /schedules/{id}/status every 2 seconds - Update visualization with current assignments
- When
solverStatus === "NOT_SOLVING" → Stop polling - Display final schedule in timeline view
Visualization features:
- Timeline view by room or by person
- Color-coded meetings (by topic or priority)
- Hover details (attendees, time, room)
- Unassigned meetings highlighted
- Score breakdown panel
- Constraint analysis tab
Making Your First Customization
Let’s add a new constraint step-by-step.
Scenario: Limit Meetings Per Day
New business rule: “No more than 5 meetings should be scheduled on any single day.”
This is a soft constraint (preference, not requirement).
Step 1: Open constraints.py
Navigate to src/meeting_scheduling/constraints.py.
Step 2: Write the Constraint Function
Add this function:
def max_meetings_per_day(constraint_factory: ConstraintFactory):
"""
Soft constraint: Discourage having more than 5 meetings on the same day.
"""
MAX_MEETINGS_PER_DAY = 5
return (
constraint_factory.for_each(MeetingAssignment)
.group_by(
lambda meeting: meeting.starting_time_grain.day_of_year,
ConstraintCollectors.count()
)
.filter(lambda day, count: count > MAX_MEETINGS_PER_DAY)
.penalize(
HardMediumSoftScore.ONE_SOFT,
lambda day, count: (count - MAX_MEETINGS_PER_DAY) * 100
)
.as_constraint("Max meetings per day")
)
How this works:
- Group meetings by day
- Count meetings per day
- Filter to days exceeding 5 meetings
- Penalize by excess × 100
Example:
- Day 1: 7 meetings → Excess: 2 → Penalty: 200 soft points
- Day 2: 4 meetings → No penalty
- Day 3: 6 meetings → Excess: 1 → Penalty: 100 soft points
Step 3: Register the Constraint
Add to define_constraints:
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
return [
# ... existing constraints ...
# Soft constraints
do_meetings_as_soon_as_possible(constraint_factory),
one_break_between_consecutive_meetings(constraint_factory),
overlapping_meetings(constraint_factory),
assign_larger_rooms_first(constraint_factory),
room_stability(constraint_factory),
max_meetings_per_day(constraint_factory), # ← Add here
]
Step 4: Test It
Restart the server:
Load demo data and solve:
- Open http://localhost:8080
- Click “Solve”
- Check meeting distribution across days
Verify meetings are more evenly spread across days
Testing tip: To see the effect more clearly, increase the penalty weight (e.g., × 1000 instead of × 100) or lower the threshold to 3 meetings per day.
Step 5: Add Unit Test
Create a test in tests/test_constraints.py:
def test_max_meetings_per_day():
"""Test that exceeding 5 meetings per day creates penalty."""
from meeting_scheduling.constraints import max_meetings_per_day
# Create 6 meetings on the same day
time_grain_day1 = TimeGrain(grain_index=0, day_of_year=1, starting_minute_of_day=480)
meetings = []
for i in range(6):
meeting = Meeting(
id=f"meeting_{i}",
topic=f"Meeting {i}",
duration_in_grains=4,
required_attendances=[],
preferred_attendances=[],
speakers=[],
entire_group_meeting=False
)
assignment = MeetingAssignment(
id=f"assignment_{i}",
meeting=meeting,
starting_time_grain=time_grain_day1,
room=test_room
)
meetings.append(assignment)
# Should penalize by (6 - 5) × 100 = 100
constraint_verifier.verify_that(max_meetings_per_day) \
.given(*meetings) \
.penalizes_by(100)
Run with:
pytest tests/test_constraints.py::test_max_meetings_per_day -v
Advanced Constraint Patterns
Pattern 1: Minimum Meeting Spacing
Scenario: Require at least 30 minutes between any two meetings (not just consecutive ones).
def minimum_meeting_spacing(constraint_factory: ConstraintFactory):
"""
Soft constraint: Encourage 30-minute spacing between all meetings.
"""
MIN_SPACING_GRAINS = 2 # 30 minutes
return (
constraint_factory.for_each_unique_pair(
MeetingAssignment,
Joiners.less_than(lambda m: m.id)
)
.filter(lambda m1, m2:
abs(m1.starting_time_grain.grain_index -
m2.starting_time_grain.grain_index) < MIN_SPACING_GRAINS and
m1.calculate_overlap(m2) == 0 # Not overlapping
)
.penalize(
HardMediumSoftScore.ONE_SOFT,
lambda m1, m2: MIN_SPACING_GRAINS -
abs(m1.starting_time_grain.grain_index -
m2.starting_time_grain.grain_index)
)
.as_constraint("Minimum meeting spacing")
)
Pattern 2: Preferred Time Slots
Scenario: Some meetings should preferably be in the morning (before noon).
First, add a field to Meeting:
@dataclass
class Meeting:
# ... existing fields ...
preferred_time: str = "anytime" # "morning", "afternoon", "anytime"
Then the constraint:
def preferred_time_slot(constraint_factory: ConstraintFactory):
"""
Soft constraint: Honor preferred time slots.
"""
NOON_MINUTE = 12 * 60 # 720 minutes
return (
constraint_factory.for_each(MeetingAssignment)
.filter(lambda ma: ma.meeting.preferred_time == "morning")
.filter(lambda ma:
ma.starting_time_grain.starting_minute_of_day >= NOON_MINUTE)
.penalize(
HardMediumSoftScore.of_soft(500),
lambda ma:
ma.starting_time_grain.starting_minute_of_day - NOON_MINUTE
)
.as_constraint("Preferred time slot")
)
Penalty increases with how far past noon the meeting is scheduled.
Pattern 3: VIP Attendee Priority
Scenario: Meetings with executives should get preferred time slots and rooms.
Add a field to Person:
@dataclass
class Person:
id: str
full_name: str
is_vip: bool = False
Then prioritize their meetings:
def vip_meeting_priority(constraint_factory: ConstraintFactory):
"""
Soft constraint: VIP meetings scheduled early with best rooms.
"""
return (
constraint_factory.for_each(MeetingAssignment)
.join(
RequiredAttendance,
Joiners.equal(
lambda ma: ma.meeting,
lambda att: att.meeting
)
)
.filter(lambda ma, att: att.person.is_vip)
.penalize(
HardMediumSoftScore.of_soft(10),
lambda ma, att: ma.starting_time_grain.grain_index
)
.as_constraint("VIP meeting priority")
)
This penalizes later times more for VIP meetings, pushing them earlier.
Pattern 4: Recurring Meeting Consistency
Scenario: Recurring meetings should be at the same time each day.
Add a field to identify recurring meetings:
@dataclass
class Meeting:
# ... existing fields ...
recurrence_group: Optional[str] = None # "weekly-standup", "daily-sync", etc.
Then enforce consistency:
def recurring_meeting_consistency(constraint_factory: ConstraintFactory):
"""
Soft constraint: Recurring meetings at same time each occurrence.
"""
return (
constraint_factory.for_each(MeetingAssignment)
.filter(lambda ma: ma.meeting.recurrence_group is not None)
.join(
MeetingAssignment,
Joiners.equal(
lambda ma1: ma1.meeting.recurrence_group,
lambda ma2: ma2.meeting.recurrence_group
),
Joiners.less_than(lambda ma: ma.id)
)
.filter(lambda ma1, ma2:
ma1.starting_time_grain.starting_minute_of_day !=
ma2.starting_time_grain.starting_minute_of_day
)
.penalize(
HardMediumSoftScore.ONE_SOFT,
lambda ma1, ma2:
abs(ma1.starting_time_grain.starting_minute_of_day -
ma2.starting_time_grain.starting_minute_of_day)
)
.as_constraint("Recurring meeting consistency")
)
Pattern 5: Department Room Preference
Scenario: Departments prefer certain rooms (closer to their area).
Add department info:
@dataclass
class Person:
# ... existing fields ...
department: str = "General"
@dataclass
class Room:
# ... existing fields ...
preferred_department: Optional[str] = None
Then reward matches:
def department_room_preference(constraint_factory: ConstraintFactory):
"""
Soft constraint: Assign rooms preferred by attendees' departments.
"""
return (
constraint_factory.for_each(MeetingAssignment)
.join(
RequiredAttendance,
Joiners.equal(
lambda ma: ma.meeting,
lambda att: att.meeting
)
)
.filter(lambda ma, att:
ma.room.preferred_department is not None and
ma.room.preferred_department == att.person.department
)
.reward(HardMediumSoftScore.of_soft(50))
.as_constraint("Department room preference")
)
Each department match adds 50 soft points (reward).
Testing and Validation
Unit Testing Constraints
Best practice: Test constraints in isolation.
Open tests/test_constraints.py to see examples:
from meeting_scheduling.domain import *
from meeting_scheduling.constraints import define_constraints
from solverforge_legacy.test import ConstraintVerifier
# Create verifier
constraint_verifier = ConstraintVerifier.build(
define_constraints,
MeetingSchedule,
MeetingAssignment
)
Example: Test Room Conflict
def test_room_conflict_penalized():
"""Two meetings in same room at overlapping times should penalize."""
room = Room(id="room1", name="Room 1", capacity=20)
# Meeting 1: Grains 0-7 (2 hours)
meeting1 = create_test_meeting(id="m1", duration=8)
assignment1 = MeetingAssignment(
id="a1",
meeting=meeting1,
starting_time_grain=TimeGrain(0, 1, 480),
room=room
)
# Meeting 2: Grains 5-12 (overlaps grains 5-7 = 3 grains)
meeting2 = create_test_meeting(id="m2", duration=8)
assignment2 = MeetingAssignment(
id="a2",
meeting=meeting2,
starting_time_grain=TimeGrain(5, 1, 555),
room=room
)
# Verify penalty of 3 hard points (overlap duration)
constraint_verifier.verify_that(room_conflict) \
.given(assignment1, assignment2) \
.penalizes_by(3)
Example: Test No Conflict
def test_room_conflict_not_penalized():
"""Meetings in same room without overlap should not penalize."""
room = Room(id="room1", name="Room 1", capacity=20)
# Meeting 1: Grains 0-7
assignment1 = MeetingAssignment(
id="a1",
meeting=create_test_meeting(id="m1", duration=8),
starting_time_grain=TimeGrain(0, 1, 480),
room=room
)
# Meeting 2: Grains 10-17 (no overlap)
assignment2 = MeetingAssignment(
id="a2",
meeting=create_test_meeting(id="m2", duration=8),
starting_time_grain=TimeGrain(10, 1, 630),
room=room
)
# No penalty
constraint_verifier.verify_that(room_conflict) \
.given(assignment1, assignment2) \
.penalizes_by(0)
Helper function:
def create_test_meeting(id: str, duration: int) -> Meeting:
"""Create a minimal meeting for testing."""
return Meeting(
id=id,
topic=f"Test Meeting {id}",
duration_in_grains=duration,
required_attendances=[],
preferred_attendances=[],
speakers=[],
entire_group_meeting=False
)
Run tests:
pytest tests/test_constraints.py -v
Integration Testing: Full Solve
Test the complete solving cycle in tests/test_feasible.py:
def test_feasible():
"""Test that solver finds feasible solution for demo data."""
# Get demo problem
schedule = generate_demo_data()
# Verify initially unassigned
assert all(ma.starting_time_grain is None for ma in schedule.meeting_assignment_list)
assert all(ma.room is None for ma in schedule.meeting_assignment_list)
# Solve
job_id = "test-feasible"
solver_manager.solve(job_id, schedule)
# Wait for completion
timeout = 120 # 2 minutes
start = time.time()
while solver_manager.get_solver_status(job_id) == "SOLVING_ACTIVE":
if time.time() - start > timeout:
solver_manager.terminate_early(job_id)
break
time.sleep(2)
# Get solution
solution = solver_manager.get_final_best_solution(job_id)
# Verify all assigned
unassigned = [ma for ma in solution.meeting_assignment_list
if ma.starting_time_grain is None or ma.room is None]
assert len(unassigned) == 0, f"{len(unassigned)} meetings unassigned"
# Verify feasible
assert solution.score is not None
assert solution.score.hard_score == 0, \
f"Solution infeasible: {solution.score}"
print(f"Final score: {solution.score}")
Manual Testing via UI
Start application:
Open browser console (F12) to monitor API calls
Load and inspect data:
- Verify 24 meetings, 20 people, 3 rooms displayed
- Check time grains span 4 days
Solve and observe:
- Click “Solve”
- Watch score improve in real-time
- See meetings get assigned to rooms and times
- Monitor constraint violations decrease
Verify solution quality:
- Hard score should be 0 (feasible)
- All meetings assigned (no unassigned list)
- Room capacity respected (check stats)
- No double-bookings (visual timeline check)
Test constraint analysis:
- Click “Analyze” tab
- Review constraint breakdown
- Verify matches make sense
Test early termination:
- Start solving
- Click “Stop solving” after 5 seconds
- Verify partial solution returned
Production Considerations
Constraints are evaluated millions of times during solving. Performance matters.
❌ DON’T: Complex calculations in constraints
def bad_constraint(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.filter(lambda ma:
expensive_api_call(ma.meeting.topic)) # SLOW!
.penalize(HardMediumSoftScore.ONE_SOFT)
.as_constraint("Bad")
)
✅ DO: Pre-compute before solving
# Before solving, once
blocked_topics = fetch_blocked_topics_from_api()
def good_constraint(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(MeetingAssignment)
.filter(lambda ma: ma.meeting.topic in blocked_topics) # Fast set lookup
.penalize(HardMediumSoftScore.ONE_SOFT)
.as_constraint("Good")
)
Time Grain Granularity
The default is 15-minute grains. Consider trade-offs:
Finer granularity (5 minutes):
- ✅ More scheduling flexibility
- ✅ Better fits meeting durations
- ❌ 3× more time slots → larger search space → slower
Coarser granularity (30 minutes):
- ✅ Fewer time slots → faster solving
- ❌ Less flexibility (all meetings snap to 30-min intervals)
Recommendation: 15 minutes is a good balance for most organizations.
Scaling Strategies
Problem size guidelines (30 second solve):
- Up to 30 meetings, 5 rooms, 5 days: Good solutions
- 30-50 meetings: Increase solve time to 2-5 minutes
- 50-100 meetings: Consider decomposition
Decomposition approaches:
By time period:
# Schedule week 1, then week 2
week1_meetings = [m for m in meetings if m.week == 1]
week2_meetings = [m for m in meetings if m.week == 2]
solution_week1 = solve(week1_meetings, rooms)
solution_week2 = solve(week2_meetings, rooms)
By department:
# Schedule each department separately
for dept in ["Engineering", "Sales", "Marketing"]:
dept_meetings = [m for m in meetings if m.department == dept]
dept_solution = solve(dept_meetings, dept_rooms)
By priority:
# Schedule high-priority meetings first, then fill in rest
high_pri = [m for m in meetings if m.priority == "high"]
solution_high = solve(high_pri, rooms)
# Pin high-priority assignments
for assignment in solution_high.meeting_assignment_list:
assignment.pinned = True
# Add low-priority meetings and re-solve
all_meetings = high_pri + low_priority_meetings
final_solution = solve(all_meetings, rooms)
The pinned field prevents the solver from changing certain assignments.
Handling Infeasible Problems
Sometimes no feasible solution exists (e.g., too many meetings, insufficient rooms).
Detect and diagnose:
solution = solver_manager.get_final_best_solution(job_id)
if solution.score.hard_score < 0:
# Analyze what's infeasible
analysis = solution_manager.analyze(solution)
violations = {}
for constraint in analysis.constraint_analyses:
if constraint.score.hard_score < 0:
violations[constraint.name] = {
"score": constraint.score.hard_score,
"count": len(constraint.matches)
}
return {
"status": "infeasible",
"hard_score": solution.score.hard_score,
"violations": violations,
"suggestions": generate_suggestions(violations)
}
def generate_suggestions(violations):
suggestions = []
if "Room conflict" in violations:
suggestions.append("Add more rooms or reduce meeting durations")
if "Required attendance conflict" in violations:
suggestions.append("Mark some attendees as 'preferred' instead of 'required'")
if "Required room capacity" in violations:
suggestions.append("Use larger rooms or reduce attendee counts")
return suggestions
Real-Time Rescheduling
Scenario: Need to reschedule due to:
- Meeting canceled
- Room unavailable
- Attendee conflict added
Incremental re-solving:
def cancel_meeting(schedule: MeetingSchedule, meeting_id: str):
"""Remove a meeting and re-optimize."""
# Find and remove the assignment
schedule.meeting_assignment_list = [
ma for ma in schedule.meeting_assignment_list
if ma.meeting.id != meeting_id
]
# Re-solve (starting from current solution)
job_id = f"replan-{uuid4()}"
solver_manager.solve_and_listen(job_id, schedule, callback)
return job_id
def add_urgent_meeting(schedule: MeetingSchedule, new_meeting: Meeting):
"""Add urgent meeting and re-optimize."""
# Add meeting to schedule
schedule.meeting_list.append(new_meeting)
# Create assignment (initially unassigned)
new_assignment = MeetingAssignment(
id=f"assignment_{new_meeting.id}",
meeting=new_meeting,
starting_time_grain=None,
room=None
)
schedule.meeting_assignment_list.append(new_assignment)
# Re-solve
solver_manager.solve_and_listen(f"urgent-{uuid4()}", schedule, callback)
Optimization concept: Warm starting from the current solution makes re-scheduling fast — the solver only adjusts what’s necessary.
Monitoring and Logging
Track key metrics:
import logging
logger = logging.getLogger(__name__)
start_time = time.time()
solver_manager.solve_and_listen(job_id, schedule, callback)
# ... wait for completion ...
solution = solver_manager.get_final_best_solution(job_id)
duration = time.time() - start_time
# Metrics
total_meetings = len(solution.meeting_assignment_list)
assigned = sum(1 for ma in solution.meeting_assignment_list
if ma.starting_time_grain and ma.room)
logger.info(
f"Solved schedule {job_id}: "
f"duration={duration:.1f}s, "
f"score={solution.score}, "
f"assigned={assigned}/{total_meetings}, "
f"feasible={solution.score.hard_score == 0}"
)
# Alert if infeasible
if solution.score.hard_score < 0:
logger.warning(
f"Infeasible schedule {job_id}: "
f"hard_score={solution.score.hard_score}"
)
Quick Reference
File Locations
| Need to… | Edit this file |
|---|
| Add/change business rule | src/meeting_scheduling/constraints.py |
| Add field to Meeting | src/meeting_scheduling/domain.py + converters.py |
| Add field to Person/Room | src/meeting_scheduling/domain.py + converters.py |
| Change solve time | src/meeting_scheduling/solver.py |
| Change time grain size | src/meeting_scheduling/domain.py (GRAIN_LENGTH_IN_MINUTES) |
| Add REST endpoint | src/meeting_scheduling/rest_api.py |
| Change demo data | src/meeting_scheduling/demo_data.py |
| Change UI | static/index.html, static/app.js |
Common Constraint Patterns
Unary constraint (single meeting):
constraint_factory.for_each(MeetingAssignment)
.filter(lambda ma: # condition)
.penalize(HardMediumSoftScore.ONE_HARD)
Binary constraint (pairs of meetings):
constraint_factory.for_each_unique_pair(
MeetingAssignment,
Joiners.equal(lambda ma: ma.room) # Same room
)
.filter(lambda ma1, ma2: ma1.calculate_overlap(ma2) > 0)
.penalize(HardMediumSoftScore.ONE_HARD,
lambda ma1, ma2: ma1.calculate_overlap(ma2))
Attendance-based constraint:
constraint_factory.for_each(RequiredAttendance)
.join(RequiredAttendance,
Joiners.equal(lambda att: att.person))
.filter(lambda att1, att2: # overlapping meetings)
.penalize(...)
Grouping and counting:
constraint_factory.for_each(MeetingAssignment)
.group_by(
lambda ma: ma.starting_time_grain.day_of_year,
ConstraintCollectors.count()
)
.filter(lambda day, count: count > MAX)
.penalize(...)
Reward instead of penalize:
.reward(HardMediumSoftScore.ONE_SOFT)
Common Domain Patterns
Check if meeting assigned:
if ma.starting_time_grain is not None and ma.room is not None:
# Meeting is fully assigned
Calculate meeting end time:
end_grain_index = ma.get_last_time_grain_index()
# or
end_grain_index = ma.starting_time_grain.grain_index + ma.meeting.duration_in_grains - 1
Check overlap:
overlap_grains = meeting1.calculate_overlap(meeting2)
if overlap_grains > 0:
# Meetings overlap
Get attendee count:
total_attendees = (
len(meeting.required_attendances) +
len(meeting.preferred_attendances)
)
Time grain conversions:
# Grain index to time
hour = time_grain.starting_minute_of_day // 60
minute = time_grain.starting_minute_of_day % 60
# Check if morning
is_morning = time_grain.starting_minute_of_day < 12 * 60
Debugging Tips
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Analyze solution score:
from meeting_scheduling.solver import solution_manager
analysis = solution_manager.analyze(schedule)
for constraint in analysis.constraint_analyses:
print(f"{constraint.name}: {constraint.score}")
for match in constraint.matches[:5]: # First 5 matches
print(f" {match.justification}")
Test constraint in isolation:
from solverforge_legacy.test import ConstraintVerifier
verifier = ConstraintVerifier.build(
define_constraints,
MeetingSchedule,
MeetingAssignment
)
verifier.verify_that(room_conflict) \
.given(assignment1, assignment2) \
.penalizes_by(expected_penalty)
Print meeting details:
def print_schedule(schedule: MeetingSchedule):
"""Debug helper."""
for ma in schedule.meeting_assignment_list:
if ma.starting_time_grain and ma.room:
start_min = ma.starting_time_grain.starting_minute_of_day
hour = start_min // 60
minute = start_min % 60
print(f"{ma.meeting.topic}: Day {ma.starting_time_grain.day_of_year}, "
f"{hour:02d}:{minute:02d} in {ma.room.name}")
else:
print(f"{ma.meeting.topic}: UNASSIGNED")
Common Gotchas
Forgot to handle None values
- Check
ma.starting_time_grain is not None before accessing properties - Symptom: AttributeError: ‘NoneType’ object has no attribute ‘grain_index’
Time grain list not in scope
- The
avoid_overtime constraint needs access to time_grain_list - Solution: Pass via closure or access from solution object
- Symptom: NameError: name ’time_grain_list’ is not defined
Overlapping vs touching meetings
- Meeting ends at grain 7, next starts at grain 8: not overlapping
- Use
calculate_overlap() > 0 to check - Symptom: False positives in conflict detection
Forgot to register constraint
- Add to
define_constraints() return list - Symptom: Constraint not enforced
Score level confusion
- Hard:
HardMediumSoftScore.ONE_HARD - Medium:
HardMediumSoftScore.ONE_MEDIUM - Soft:
HardMediumSoftScore.ONE_SOFT - Or:
HardMediumSoftScore.of_soft(100) - Symptom: Constraint at wrong priority level
Attendance navigation
RequiredAttendance has .person and .meeting- Meeting has
.meeting_assignment - Person doesn’t directly link to meetings
- Symptom: Can’t navigate relationship
Typical evaluation speeds (on modern hardware):
| Problem Size | Evaluations/Second | 30-Second Results |
|---|
| 10 meetings, 3 rooms, 2 days | 5,000+ | Near-optimal |
| 24 meetings, 3 rooms, 4 days | 2,000+ | High quality |
| 50 meetings, 5 rooms, 5 days | 500-1000 | Good quality |
| 100 meetings, 8 rooms, 10 days | 200-500 | Decent quality |
If significantly slower, review constraint complexity and look for expensive operations.
Conclusion
You now have a complete understanding of constraint-based meeting scheduling:
✅ Multi-resource modeling — Coordinating time slots, rooms, and people simultaneously
✅ Hierarchical scoring — Three-tier constraints (hard/medium/soft) with clear priorities
✅ Multiple planning variables — Optimizing both time and room for each meeting
✅ Conflict resolution — Handling required vs preferred attendance gracefully
✅ Customization patterns — Extending for your organization’s policies
Next Steps
- Run the application and experiment with the demo data
- Modify an existing constraint — change capacity limits or time preferences
- Add your own constraint — implement a rule from your organization
- Test thoroughly — write unit tests for your constraints
- Customize the data model — add departments, priorities, or other business fields
- Deploy with real data — integrate with your calendar system
Key Takeaways
Three-Tier Scoring:
- Hard: Non-negotiable requirements
- Medium: Strong preferences (degraded service acceptable)
- Soft: Optimization goals and nice-to-haves
Multiple Planning Variables:
- Each
MeetingAssignment has two independent variables: time and room - Solver optimizes both simultaneously
- Creates richer search space and better solutions
Discrete Time Grains:
- Convert continuous time into 15-minute slots
- Simplifies overlap detection and constraint evaluation
- Matches real-world calendar behavior
Attendance Hierarchy:
- Required attendance: Hard constraint (must attend)
- Preferred attendance: Soft constraint (should attend if possible)
- Enables flexible scheduling when conflicts arise
The Power of Constraints:
- Most business logic in one file (
constraints.py) - Easy to add new scheduling policies
- Declarative: describe what you want, solver finds how
Comparison to Other Quickstarts
vs. Employee Scheduling:
- Employee: Single resource (employees assigned to shifts)
- Meeting: Three resources (time + room + people)
- Employee: Two-tier scoring (hard/soft)
- Meeting: Three-tier scoring (hard/medium/soft)
vs. Vehicle Routing:
- Routing: Spatial optimization (minimize distance)
- Meeting: Temporal optimization (minimize conflicts, pack early)
- Routing: List variables (route sequences)
- Meeting: Multiple simple variables per entity (time + room)
Each quickstart teaches complementary optimization techniques.
Additional Resources
Questions? Start by solving the demo data and observing how meetings get assigned. Try modifying constraints to see how the schedule changes. The best way to learn scheduling optimization is to experiment and visualize the results.
Happy scheduling! 📅🗓️
6 - Vehicle Routing
A comprehensive quickstart guide to understanding and building intelligent vehicle routing with SolverForge
Legacy Implementation Guide
This guide uses solverforge-legacy, a fork of Timefold 1.24 that bridges Python to Java via JPype. This legacy implementation is already archived and will no longer be maintained once SolverForge’s native Python bindings are production-ready.
SolverForge has been completely rewritten as a native constraint solver in Rust. This guide is preserved for educational purposes and constraint modeling concepts.
Table of Contents
- Introduction
- Getting Started
- The Problem We’re Solving
- Understanding the Data Model
- How Route Optimization Works
- Writing Constraints: The Business Rules
- The Solver Engine
- Web Interface and API
- Making Your First Customization
- Advanced Constraint Patterns
- Testing and Validation
- Production Considerations
- Quick Reference
Introduction
What You’ll Learn
This guide walks you through a complete vehicle routing application built with SolverForge, a constraint-based optimization framework. You’ll learn:
- How to model real-world logistics problems as optimization problems
- How to construct efficient delivery routes with time windows and capacity constraints
- How optimization algorithms balance competing objectives automatically
- How to customize the system for your specific routing needs
No optimization background required — we’ll explain concepts as we encounter them in the code.
Performance Note
Vehicle routing is particularly sensitive to constraint evaluation performance, as the solver must recalculate distances and arrival times millions of times during optimization. This implementation uses the “fast” dataclass architecture—see benchmark results. Note: benchmarks were run on small test problems (25-77 customers); JPype bridge overhead may compound at larger scales.
Prerequisites
- Basic Python knowledge (classes, functions, type annotations)
- Familiarity with REST APIs
- Comfort with command-line operations
- Understanding of basic geographic concepts (latitude/longitude)
What is Vehicle Routing Optimization?
Traditional planning: Manually assign deliveries to drivers and plan routes using maps.
Vehicle routing optimization: You describe your vehicles, customers, and constraints — the solver automatically generates efficient routes that minimize travel time while satisfying all requirements.
Think of it like having an expert logistics planner who can evaluate millions of route combinations per second to find near-optimal solutions.
SolverForge Enhancements
This implementation includes several enhancements over the standard Timefold quickstart:
| Feature | Benefit |
|---|
| Adaptive time windows | Time windows dynamically scale based on problem area and visit count, ensuring feasible solutions |
| Haversine formula | Fast great-circle distances without external API dependencies (default mode) |
| Real Roads mode | Optional OSMnx integration for actual road network routing with visual route display |
| Real street addresses | Demo data uses actual locations in Philadelphia, Hartford, and Florence for realistic routing |
These features give you more control over the performance/accuracy tradeoff during development and production.
Getting Started
Running the Application
Navigate to the project directory:
cd solverforge-quickstarts/legacy/vehicle-routing-fast
Create and activate virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
Install the package:
Start the server:
Open your browser:
http://localhost:8082
You’ll see a map interface with customer locations plotted. Click “Solve” and watch the solver automatically create delivery routes for multiple vehicles, respecting capacity limits and time windows.
File Structure Overview
src/vehicle_routing/
├── domain.py # Data classes (Vehicle, Visit, Location)
├── constraints.py # Business rules (capacity, time windows, distance)
├── solver.py # Solver configuration
├── demo_data.py # Sample datasets (Philadelphia, Hartford, Florence)
├── rest_api.py # HTTP API endpoints
├── routing.py # Distance matrix and OSMnx routing
├── converters.py # REST ↔ Domain model conversion
├── json_serialization.py # JSON helpers
└── score_analysis.py # Score breakdown DTOs
static/
├── index.html # Web UI
└── app.js # UI logic and map visualization
tests/
├── test_constraints.py # Unit tests for constraints
├── test_routing.py # Unit tests for routing module
└── test_feasible.py # Integration tests
Key insight: Most business customization happens in constraints.py alone. The domain model defines what can be routed, but constraints define what makes a good route.
The Problem We’re Solving
The Vehicle Routing Challenge
You need to assign customer visits to vehicles and determine the order of visits for each vehicle while satisfying:
Hard constraints (must be satisfied):
- Vehicle capacity limits (total customer demand ≤ vehicle capacity)
- Time windows (arrive at customer before their deadline)
Soft constraints (objectives to minimize):
- Total driving time across all vehicles
This is known as the Capacitated Vehicle Routing Problem with Time Windows (CVRPTW) — a classic optimization problem in logistics.
Why This is Hard
Even with just 10 customers and 3 vehicles, there are over 3.6 million possible route configurations. With 50 customers and 6 vehicles, the possibilities become astronomical.
The challenges:
- Combinatorial explosion: Number of possibilities grows exponentially with problem size
- Multiple objectives: Minimize distance while respecting capacity and time constraints
- Interdependencies: Assigning one customer affects available capacity and time for others
Route optimization algorithms use sophisticated strategies to explore this space efficiently, finding high-quality solutions in seconds.
Understanding the Data Model
Let’s examine the core classes that model our routing problem. Open src/vehicle_routing/domain.py:
The Location Class
@dataclass
class Location:
latitude: float
longitude: float
# Earth radius in meters
_EARTH_RADIUS_M = 6371000
_TWICE_EARTH_RADIUS_M = 2 * _EARTH_RADIUS_M
# Average driving speed assumption: 50 km/h
_AVERAGE_SPEED_KMPH = 50
def driving_time_to(self, other: "Location") -> int:
"""
Get driving time in seconds to another location using Haversine formula.
"""
return self._calculate_driving_time_haversine(other)
What it represents: A geographic coordinate (latitude/longitude).
Key method:
driving_time_to(): Calculates driving time using the Haversine formula
Haversine formula details:
- Accounts for Earth’s curvature using great-circle distance
- Assumes 50 km/h average driving speed
- Example: Philadelphia to New York (~130 km) → ~9,400 seconds (~2.6 hours)
Optimization concept: The Haversine formula provides realistic geographic distances without external API dependencies. For production with real road networks, you can replace the distance calculation with a routing API (Google Maps, OSRM, etc.).
The Visit Class (Planning Entity)
@planning_entity
@dataclass
class Visit:
id: Annotated[str, PlanningId]
name: str # Customer name
location: Location # Where to visit
demand: int # Capacity units required
min_start_time: datetime # Earliest service start
max_end_time: datetime # Latest service end (deadline)
service_duration: timedelta # How long service takes
# Shadow variables (automatically updated by solver)
vehicle: Annotated[
Optional['Vehicle'],
InverseRelationShadowVariable(source_variable_name="visits"),
] = None
previous_visit: Annotated[
Optional['Visit'],
PreviousElementShadowVariable(source_variable_name="visits")
] = None
next_visit: Annotated[
Optional['Visit'],
NextElementShadowVariable(source_variable_name="visits")
] = None
arrival_time: Annotated[
Optional[datetime],
CascadingUpdateShadowVariable(target_method_name="update_arrival_time"),
] = None
What it represents: A customer location that needs a delivery/service visit.
Key fields:
demand: How much vehicle capacity this visit consumes (e.g., number of packages, weight)min_start_time: Earliest acceptable arrival (customer opens at 8 AM)max_end_time: Latest acceptable service completion (customer closes at 6 PM)service_duration: Time spent at customer location (unloading, paperwork, etc.)
Shadow variable annotations explained:
InverseRelationShadowVariable(source_variable_name="visits"): Automatically set to the Vehicle when this Visit is added to a vehicle’s visits listPreviousElementShadowVariable(source_variable_name="visits"): Points to the previous visit in the route chain (or None if first)NextElementShadowVariable(source_variable_name="visits"): Points to the next visit in the route chain (or None if last)CascadingUpdateShadowVariable(target_method_name="update_arrival_time"): Triggers the update_arrival_time() method when dependencies change, cascading arrival time calculations through the route
Shadow variables (automatically maintained by solver):
vehicle: Which vehicle is assigned to this visit (inverse relationship)previous_visit/next_visit: Links forming the route chainarrival_time: When vehicle arrives at this location (cascades through chain)
Optimization concept: Shadow variables implement derived data — values that depend on planning decisions but aren’t directly decided by the solver. They update automatically when the solver modifies routes.
Important methods:
def calculate_departure_time(self) -> datetime:
"""When vehicle leaves after service."""
return max(self.arrival_time, self.min_start_time) + self.service_duration
def is_service_finished_after_max_end_time(self) -> bool:
"""Check if time window violated."""
return self.calculate_departure_time() > self.max_end_time
def service_finished_delay_in_minutes(self) -> int:
"""How many minutes late (for penalty calculation)."""
if not self.is_service_finished_after_max_end_time():
return 0
return int((self.calculate_departure_time() - self.max_end_time).total_seconds() / 60)
These methods support constraint evaluation without duplicating logic.
The Vehicle Class (Planning Entity)
@planning_entity
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
name: str # Vehicle name (e.g., "Alpha", "Bravo")
capacity: int # Maximum demand it can handle
home_location: Location # Depot location
departure_time: datetime # When vehicle leaves depot
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
What it represents: A delivery vehicle that starts from a depot, visits customers, and returns.
Key fields:
id: Unique identifier for the vehiclename: Human-readable name (e.g., “Alpha”, “Bravo” from phonetic alphabet)capacity: Total demand the vehicle can carry (e.g., 100 packages, 1000 kg)home_location: Depot where vehicle starts and ends its routedeparture_time: When vehicle begins its routevisits: Ordered list of customer visits — this is what the solver optimizes!
Annotations:
@planning_entity: Tells SolverForge this class contains decisionsPlanningListVariable: Marks visits as a list variable — the solver assigns visits to vehicles AND determines their order
Important properties:
@property
def arrival_time(self) -> datetime:
"""When vehicle returns to depot."""
if not self.visits:
return self.departure_time
return self.visits[-1].calculate_departure_time() + timedelta(
seconds=self.visits[-1].location.driving_time_to(self.home_location)
)
@property
def total_demand(self) -> int:
"""Sum of all visit demands."""
return sum(visit.demand for visit in self.visits)
@property
def total_driving_time_seconds(self) -> int:
"""Total travel time including return to depot."""
# Includes depot → first visit, between visits, and last visit → depot
These properties enable constraints to easily evaluate route feasibility and quality.
Optimization concept: Using a list variable for the route means the solver simultaneously solves:
- Assignment: Which vehicle serves which customers?
- Sequencing: In what order should each vehicle visit its customers?
This is more powerful than separate assignment and routing phases.
The VehicleRoutePlan Class (Planning Solution)
@planning_solution
@dataclass
class VehicleRoutePlan:
name: str
south_west_corner: Location # Map bounds
north_east_corner: Location # Map bounds
vehicles: Annotated[list[Vehicle], PlanningEntityCollectionProperty]
visits: Annotated[list[Visit], PlanningEntityCollectionProperty, ValueRangeProvider]
score: Annotated[Optional[HardSoftScore], PlanningScore] = None
solver_status: SolverStatus = SolverStatus.NOT_SOLVING
What it represents: The complete routing problem and its solution.
Key fields:
vehicles: All available vehicles (planning entities)visits: All customer visits to be routed (planning entities + value range)score: Solution quality metric- Map bounds: Used for visualization
Annotations explained:
@planning_solution: Marks this as the top-level problem definitionPlanningEntityCollectionProperty: Collections of entities being optimizedValueRangeProvider: The visits list provides possible values for vehicle assignmentsPlanningScore: Where the solver stores calculated quality
Optimization concept: Unlike employee scheduling where only shifts are entities, here both vehicles and visits are planning entities. Vehicles have list variables (routes), and visits have shadow variables tracking their position in routes.
How Route Optimization Works
Before diving into constraints, let’s understand route construction.
The Route Chaining Mechanism
Routes are built using shadow variable chaining:
Solver modifies: vehicle.visits = [visit_A, visit_B, visit_C]
Shadow variables automatically update:
- Each visit’s
vehicle points to the vehicle - Each visit’s
previous_visit and next_visit link the chain - Each visit’s
arrival_time cascades through the chain
Arrival time cascade:
visit_A.arrival_time = vehicle.departure_time + travel(depot → A)
visit_B.arrival_time = visit_A.departure_time + travel(A → B)
visit_C.arrival_time = visit_B.departure_time + travel(B → C)
Constraints evaluate: Check capacity, time windows, and distance
Optimization concept: This is incremental score calculation. When the solver moves one visit, only affected arrival times recalculate — not the entire solution. This enables evaluating millions of route modifications per second.
Why this matters for performance: Shadow variables enable efficient incremental updates. With Pydantic models, validation overhead would occur on every update—compounding across millions of moves per second. The dataclass approach avoids this overhead entirely. See benchmark analysis for details on this architectural choice.
The Search Process
- Initial solution: Often all visits unassigned or randomly assigned
- Evaluate score: Calculate capacity violations, time window violations, and total distance
- Make a move:
- Assign visit to different vehicle
- Change visit order in a route
- Swap visits between routes
- Re-evaluate score (incrementally)
- Accept if improvement (with controlled randomness to escape local optima)
- Repeat millions of times
- Return best solution found
Metaheuristics used:
- Variable Neighborhood Descent: Tries different types of moves
- Late Acceptance: Accepts solutions close to recent best
- Strategic Oscillation: Temporarily allows infeasibility to explore more space
The Score: Measuring Route Quality
Every solution gets a score with two parts:
0hard/-45657soft
- Hard score: Counts capacity overages and time window violations (must be 0)
- Soft score: Total driving time in seconds (lower magnitude is better)
Examples:
-120hard/-50000soft: Infeasible (120 minutes late or 120 units over capacity)0hard/-45657soft: Feasible route with 45,657 seconds (12.7 hours) driving0hard/-30000soft: Better route with only 30,000 seconds (8.3 hours) driving
Optimization concept: The scoring system implements constraint prioritization. We absolutely require capacity and time window compliance before optimizing distance.
Writing Constraints: The Business Rules
Now the heart of the system. Open src/vehicle_routing/constraints.py.
The Constraint Provider Pattern
All constraints are registered in one function:
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
return [
# Hard constraints (must satisfy)
vehicle_capacity(constraint_factory),
service_finished_after_max_end_time(constraint_factory),
# Soft constraints (minimize)
minimize_travel_time(constraint_factory),
]
Let’s examine each constraint in detail.
Hard Constraint: Vehicle Capacity
Business rule: “A vehicle’s total customer demand cannot exceed its capacity.”
def vehicle_capacity(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Vehicle)
.filter(lambda vehicle: vehicle.calculate_total_demand() > vehicle.capacity)
.penalize(
HardSoftScore.ONE_HARD,
lambda vehicle: vehicle.calculate_total_demand() - vehicle.capacity
)
.as_constraint("vehicleCapacity")
)
How to read this:
for_each(Vehicle): Consider every vehicle.filter(...): Keep only vehicles exceeding capacity.penalize(ONE_HARD, ...): Penalize by the amount of excess demand (overage).as_constraint(...): Name it for debugging
Why penalize by overage amount?
Example scenarios:
- Vehicle capacity: 100 units
- Assigned demand: 80 units → No penalty (feasible)
- Assigned demand: 105 units → Penalty of 5 hard points (5 units over)
- Assigned demand: 120 units → Penalty of 20 hard points (20 units over)
Optimization concept: This creates graded penalties that guide the solver. Being slightly over capacity (penalty 5) is “less wrong” than being very over (penalty 20), helping the solver navigate toward feasibility incrementally.
Implementation detail: The calculate_total_demand() method in domain.py:
def calculate_total_demand(self) -> int:
return sum(visit.demand for visit in self.visits)
Hard Constraint: Time Window Compliance
Business rule: “Service at each customer must finish before their deadline (max_end_time).”
def service_finished_after_max_end_time(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Visit)
.filter(lambda visit: visit.is_service_finished_after_max_end_time())
.penalize(
HardSoftScore.ONE_HARD,
lambda visit: visit.service_finished_delay_in_minutes()
)
.as_constraint("serviceFinishedAfterMaxEndTime")
)
How to read this:
for_each(Visit): Consider every customer visit.filter(...): Keep only visits finishing after their deadline.penalize(ONE_HARD, ...): Penalize by minutes late
Example scenario:
Customer has max_end_time = 18:00 (6 PM deadline):
- Arrive at 17:00, 30-minute service → Finish at 17:30 → No penalty (on time)
- Arrive at 17:50, 30-minute service → Finish at 18:20 → Penalty of 20 minutes
- Arrive at 19:00, 30-minute service → Finish at 19:30 → Penalty of 90 minutes
Wait time handling:
If vehicle arrives before min_start_time, it waits:
def calculate_departure_time(self) -> datetime:
# If arrive at 7:00 but min_start_time is 8:00, wait until 8:00
return max(self.arrival_time, self.min_start_time) + self.service_duration
Optimization concept: Time windows create temporal dependencies in routes. The order of visits affects whether deadlines can be met. The solver must balance early visits (to respect time windows) with short distances (to minimize travel).
Helper methods in domain.py:
def is_service_finished_after_max_end_time(self) -> bool:
"""Check deadline violation."""
if self.arrival_time is None:
return False
return self.calculate_departure_time() > self.max_end_time
def service_finished_delay_in_minutes(self) -> int:
"""Calculate penalty magnitude."""
if not self.is_service_finished_after_max_end_time():
return 0
delay = self.calculate_departure_time() - self.max_end_time
return int(delay.total_seconds() / 60)
Soft Constraint: Minimize Total Distance
Business rule: “Minimize the total driving time across all vehicles.”
def minimize_travel_time(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Vehicle)
.penalize(
HardSoftScore.ONE_SOFT,
lambda vehicle: vehicle.calculate_total_driving_time_seconds()
)
.as_constraint("minimizeTravelTime")
)
How to read this:
for_each(Vehicle): Consider every vehicle.penalize(ONE_SOFT, ...): Penalize by total driving seconds for that vehicle- No filter: Every vehicle contributes to the soft score
Why penalize all vehicles?
Each vehicle’s driving time adds to the penalty:
- Vehicle 1: 10,000 seconds → -10,000 soft score
- Vehicle 2: 15,000 seconds → -15,000 soft score
- Vehicle 3: 8,000 seconds → -8,000 soft score
- Total soft score: -33,000
The solver tries different route configurations to reduce this total.
Optimization concept: This constraint implements the routing objective function. After ensuring feasibility (hard constraints), the solver focuses on minimizing this distance measure.
Implementation detail in domain.py:
def calculate_total_driving_time_seconds(self) -> int:
"""Total travel time including depot → first → ... → last → depot."""
if not self.visits:
return 0
total = 0
# Depot to first visit
total += self.home_location.driving_time_to(self.visits[0].location)
# Between consecutive visits
for i in range(len(self.visits) - 1):
total += self.visits[i].location.driving_time_to(self.visits[i + 1].location)
# Last visit back to depot
total += self.visits[-1].location.driving_time_to(self.home_location)
return total
Why include depot return? Real routes must return vehicles to the depot. Not including this would incentivize ending routes far from the depot.
The Solver Engine
Now let’s see how the solver is configured. Open src/vehicle_routing/solver.py:
solver_config = SolverConfig(
solution_class=VehicleRoutePlan,
entity_class_list=[Vehicle, Visit],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=30)),
)
solver_manager = SolverManager.create(solver_config)
solution_manager = SolutionManager.create(solver_manager)
Configuration Breakdown
solution_class: Your planning solution class (VehicleRoutePlan)
entity_class_list: Both Vehicle and Visit are planning entities
Vehicle has a planning variable (visits list)Visit has shadow variables that depend on vehicle assignments
score_director_factory_config: Contains the constraint provider function
- This is where your business rules are registered
termination_config: When to stop solving
spent_limit=Duration(seconds=30): Stop after 30 seconds- Could also use:
unimproved_spent_limit (stop if no improvement for X seconds)
SolverManager: Asynchronous Solving
Routing problems can take time to solve well. SolverManager handles solving in the background:
# Start solving (non-blocking)
solver_manager.solve_and_listen(job_id, route_plan, callback_function)
# Check status
status = solver_manager.get_solver_status(job_id)
# Get current best solution (updates in real-time)
solution = solver_manager.get_final_best_solution(job_id)
# Stop early if satisfied
solver_manager.terminate_early(job_id)
Optimization concept: Vehicle routing uses anytime algorithms that continuously improve solutions. You get a valid answer quickly, then progressively better answers as solving continues. You can stop whenever the solution is “good enough.”
SolutionManager: Score Analysis
The solution_manager helps explain scores:
solution_manager = SolutionManager.create(solver_manager)
# Analyze which constraints fired and why
analysis = solution_manager.analyze(route_plan)
# Shows breakdown like:
# - Vehicle capacity: -50 hard (Vehicle 1: 20 over, Vehicle 2: 30 over)
# - Time windows: -15 hard (Visit 42: 15 minutes late)
# - Total distance: -45657 soft
This is invaluable for debugging infeasible solutions or understanding score composition.
Solving Timeline
Small problems (20-30 visits, 2-3 vehicles):
- Initial valid solution: < 1 second
- Good solution: 5-10 seconds
- High-quality: 30 seconds
Medium problems (50-100 visits, 5-8 vehicles):
- Initial valid solution: 1-5 seconds
- Good solution: 30-60 seconds
- High-quality: 2-5 minutes
Large problems (200+ visits, 10+ vehicles):
- Initial valid solution: 5-30 seconds
- Good solution: 5-10 minutes
- High-quality: 30-60 minutes
Factors affecting speed:
- Number of visits (main factor)
- Number of vehicles (less impact than visits)
- How tight time windows are (tighter = harder)
- How tight capacity constraints are (fuller vehicles = less flexibility)
Web Interface and API
REST API Endpoints
Open src/vehicle_routing/rest_api.py to see the API. It runs on port 8082 (different from employee scheduling’s 8080).
GET /demo-data
Returns available demo datasets:
["PHILADELPHIA", "HARTFORT", "FIRENZE"]
Each dataset uses real city coordinates with different problem sizes.
GET /demo-data/{demo_name}
Returns a specific demo dataset:
Parameters:
demo_name: Name of the demo dataset (PHILADELPHIA, HARTFORT, FIRENZE)routing (query, optional): Routing mode - haversine (default) or real_roads
Request:
GET /demo-data/PHILADELPHIA?routing=haversine
Response:
{
"name": "demo",
"southWestCorner": [39.7656, -76.8378],
"northEastCorner": [40.7764, -74.9301],
"vehicles": [
{
"id": "0",
"name": "Alpha",
"capacity": 25,
"homeLocation": [40.5154, -75.3721],
"departureTime": "2025-12-10T06:00:00",
"visits": [],
"totalDemand": 0,
"totalDrivingTimeSeconds": 0
}
],
"visits": [
{
"id": "0",
"name": "Amy Cole",
"location": [40.7831, -74.9376],
"demand": 1,
"minStartTime": "2025-12-10T17:00:00",
"maxEndTime": "2025-12-10T20:00:00",
"serviceDuration": 420,
"vehicle": null,
"arrivalTime": null
}
],
"score": null,
"solverStatus": null,
"totalDrivingTimeSeconds": 0
}
Field notes:
- Coordinates are
[latitude, longitude] arrays - Times use ISO format strings
serviceDuration is in seconds (420 = 7 minutes)- Initially
vehicle is null (unassigned) and visits lists are empty - Vehicles have names from the phonetic alphabet (Alpha, Bravo, Charlie, etc.)
- Customer names are randomly generated from first/last name combinations
Demo datasets:
- PHILADELPHIA: 55 visits, 6 vehicles, moderate capacity (15-30)
- HARTFORT: 50 visits, 6 vehicles, tighter capacity (20-30)
- FIRENZE: 77 visits (largest), 6 vehicles, varied capacity (20-40)
GET /demo-data/{demo_name}/stream
Server-Sent Events (SSE) endpoint for loading demo data with progress updates. Use this when routing=real_roads to show download/computation progress.
Parameters:
demo_name: Name of the demo datasetrouting (query, optional): haversine (default) or real_roads
Request:
GET /demo-data/PHILADELPHIA/stream?routing=real_roads
SSE Events:
Progress event (during computation):
{"event": "progress", "phase": "network", "message": "Downloading OpenStreetMap road network...", "percent": 10, "detail": "Area: 0.08° × 0.12°"}
Complete event (when ready):
{"event": "complete", "solution": {...}, "geometries": {"0": ["encodedPolyline1", "encodedPolyline2"], "1": [...]}}
Error event (on failure):
{"event": "error", "message": "Demo data not found: INVALID"}
Geometry format: Each vehicle’s geometries are an array of encoded polylines (Google polyline format), one per route segment:
- First: depot → first visit
- Middle: visit → visit
- Last: last visit → depot
GET /route-plans/{problem_id}/geometry
Get route geometries for displaying actual road paths:
Response:
{
"geometries": {
"0": ["_p~iF~ps|U_ulLnnqC_mqNvxq`@", "afvkFnps|U~hbE~reK"],
"1": ["_izlFnps|U_ulLnnqC"]
}
}
Decode polylines on the frontend to display actual road routes instead of straight lines.
POST /route-plans
Submit a routing problem to solve:
Request body: Same format as demo data response
Response: Job ID as plain text
"a1b2c3d4-e5f6-7890-abcd-ef1234567890"
Implementation:
@app.post("/route-plans")
async def solve(problem: VehicleRoutePlanModel) -> str:
job_id = str(uuid4())
route_plan = model_to_plan(problem)
data_sets[job_id] = route_plan
solver_manager.solve_and_listen(
job_id,
route_plan,
lambda solution: update_solution(job_id, solution)
)
return job_id
Key detail: Uses solve_and_listen() with a callback that updates the stored solution in real-time. This enables live progress tracking in the UI.
GET /route-plans/{problem_id}
Get current best solution:
Request:
GET /route-plans/a1b2c3d4-e5f6-7890-abcd-ef1234567890
Response (while solving):
{
"vehicles": [
{
"id": "vehicle_0",
"visits": ["5", "12", "23", "7"],
"totalDemand": 28,
"totalDrivingTimeSeconds": 15420
}
],
"visits": [
{
"id": "5",
"vehicle": "vehicle_0",
"arrivalTime": "2025-11-27T08:15:30"
}
],
"score": "-15hard/-187594soft",
"solverStatus": "SOLVING_ACTIVE"
}
Response (finished):
{
"score": "0hard/-45657soft",
"solverStatus": "NOT_SOLVING"
}
Important: The response updates in real-time as solving progresses. Clients should poll this endpoint (e.g., every 2 seconds) to show live progress.
GET /route-plans
List all active job IDs:
Response:
["a1b2c3d4-e5f6-7890-abcd-ef1234567890", "b2c3d4e5-f6a7-8901-bcde-f23456789012"]
GET /route-plans/{problem_id}/status
Lightweight status check (score and solver status only):
Response:
{
"name": "PHILADELPHIA",
"score": "0hard/-45657soft",
"solverStatus": "SOLVING_ACTIVE"
}
DELETE /route-plans/{problem_id}
Terminate solving early:
@app.delete("/route-plans/{problem_id}")
async def stop_solving(problem_id: str) -> VehicleRoutePlanModel:
solver_manager.terminate_early(problem_id)
return plan_to_model(data_sets.get(problem_id))
Returns the best solution found so far. Useful if the user is satisfied with current quality and doesn’t want to wait for the full 30 seconds.
POST /route-plans/recommendation
Request recommendations for assigning a new visit to vehicles:
Request body:
{
"solution": { /* complete route plan */ },
"visitId": "new_visit_42"
}
Response:
[
{
"proposition": {
"vehicleId": "vehicle_2",
"index": 3
},
"scoreDiff": "0hard/-1234soft"
},
{
"proposition": {
"vehicleId": "vehicle_0",
"index": 5
},
"scoreDiff": "0hard/-2345soft"
}
]
Returns up to 5 recommendations sorted by score impact. The first recommendation is the best option.
POST /route-plans/recommendation/apply
Apply a selected recommendation:
Request body:
{
"solution": { /* complete route plan */ },
"visitId": "new_visit_42",
"vehicleId": "vehicle_2",
"index": 3
}
Response: Updated route plan with the visit inserted at the specified position.
PUT /route-plans/analyze
Analyze a solution’s score breakdown:
Request body: Complete route plan (assigned solution)
Response:
{
"score": "-20hard/-45657soft",
"constraints": [
{
"name": "Vehicle capacity",
"score": "-20hard/0soft",
"matches": [
{
"justification": "Vehicle vehicle_2 exceeds capacity by 20 units",
"score": "-20hard/0soft"
}
]
},
{
"name": "Minimize travel time",
"score": "0hard/-45657soft",
"matches": [
{
"justification": "Vehicle vehicle_0 drives 12345 seconds",
"score": "0hard/-12345soft"
}
]
}
]
}
This is invaluable for understanding why a solution has a particular score and which constraints are violated.
Web UI Flow
The static/app.js implements this workflow:
- User opens page → Load demo data (
GET /demo-data/PHILADELPHIA) - Display map with:
- Depot marked as home icon
- Customer locations as numbered markers
- Sidebar showing visit details
- User clicks “Solve” →
POST /route-plans (get job ID) - Poll
GET /route-plans/{id} every 2 seconds - Update UI with:
- Routes drawn as colored lines connecting visits
- Each vehicle’s route in different color
- Stats: total distance, capacity used, time windows status
- When
solverStatus === "NOT_SOLVING" → Stop polling - Display final score and route statistics
Visual features:
- Color-coded routes (one color per vehicle)
- Depot-to-depot complete routes visualized
- Visit details on hover (arrival time, demand, time window status)
- Stats panel showing capacity utilization and total distance per vehicle
Making Your First Customization
Let’s add a new constraint step-by-step.
Scenario: Limit Maximum Route Duration
New business rule: “No vehicle can be out for more than 8 hours (from depot departure to depot return).”
This is a hard constraint (must be satisfied).
Step 1: Open constraints.py
Navigate to src/vehicle_routing/constraints.py.
Step 2: Write the Constraint Function
Add this function:
def max_route_duration(constraint_factory: ConstraintFactory):
"""
Hard constraint: Vehicle routes cannot exceed 8 hours total duration.
"""
MAX_DURATION_SECONDS = 8 * 60 * 60 # 8 hours
return (
constraint_factory.for_each(Vehicle)
.filter(lambda vehicle: len(vehicle.visits) > 0) # Skip empty vehicles
.filter(lambda vehicle:
(vehicle.arrival_time - vehicle.departure_time).total_seconds()
> MAX_DURATION_SECONDS
)
.penalize(
HardSoftScore.ONE_HARD,
lambda vehicle: int(
((vehicle.arrival_time - vehicle.departure_time).total_seconds()
- MAX_DURATION_SECONDS) / 60
)
)
.as_constraint("Max route duration 8 hours")
)
How this works:
- Examine each vehicle with visits
- Calculate total time: depot departure → visit customers → depot return
- If exceeds 8 hours, penalize by excess minutes
- Example: 9-hour route → 60-minute penalty
Why penalize by excess? A 8.5-hour route (penalty 30 minutes) is closer to feasible than a 12-hour route (penalty 240 minutes), guiding the solver incrementally.
Step 3: Register the Constraint
Add it to the define_constraints function:
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory):
return [
# Hard constraints
vehicle_capacity(constraint_factory),
service_finished_after_max_end_time(constraint_factory),
max_route_duration(constraint_factory), # ← Add this line
# Soft constraints
minimize_travel_time(constraint_factory),
]
Step 4: Test It
Restart the server:
Load demo data and solve:
- Open http://localhost:8082
- Load PHILADELPHIA dataset
- Click “Solve”
Verify constraint:
- Check vehicle stats in the UI
- Each vehicle’s total time should be ≤ 8 hours
- If infeasible, some vehicles may still exceed (hard score < 0)
Testing tip: Temporarily lower the limit to 4 hours to see the constraint actively preventing long routes. You might get an infeasible solution (negative hard score) showing the constraint is working but can’t be satisfied with current vehicles.
Step 5: Make It Configurable
For production use, make the limit configurable per vehicle:
Modify domain.py:
@dataclass
class Vehicle:
id: Annotated[str, PlanningId]
name: str
capacity: int
home_location: Location
departure_time: datetime
max_duration_seconds: int = 8 * 60 * 60 # New field with default
visits: Annotated[list[Visit], PlanningListVariable] = field(default_factory=list)
Update constraint:
def max_route_duration(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Vehicle)
.filter(lambda vehicle: len(vehicle.visits) > 0)
.filter(lambda vehicle:
(vehicle.arrival_time - vehicle.departure_time).total_seconds()
> vehicle.max_duration_seconds # Use vehicle-specific limit
)
.penalize(
HardSoftScore.ONE_HARD,
lambda vehicle: int(
((vehicle.arrival_time - vehicle.departure_time).total_seconds()
- vehicle.max_duration_seconds) / 60
)
)
.as_constraint("Max route duration")
)
Now each vehicle can have a different maximum duration.
Understanding What You Did
You just implemented a temporal constraint — limiting time-based aspects of routes. This pattern is common in routing:
- Driver shift limits (8-hour, 10-hour, etc.)
- Maximum distance per route
- Required breaks after X hours
- Service windows for depot operations
The pattern is always:
- Calculate the temporal/distance measure
- Compare to limit
- Penalize by the excess amount (for graded guidance)
Advanced Constraint Patterns
Pattern 1: Priority Customers
Scenario: Some customers are high-priority and should be served earlier in their vehicle’s route.
def priority_customers_early(constraint_factory: ConstraintFactory):
"""
Soft constraint: High-priority customers should be visited early in route.
"""
HIGH_PRIORITY_CUSTOMERS = {"Customer A", "Customer B"}
return (
constraint_factory.for_each(Visit)
.filter(lambda visit: visit.name in HIGH_PRIORITY_CUSTOMERS)
.filter(lambda visit: visit.vehicle is not None) # Assigned visits only
.penalize(
HardSoftScore.ONE_SOFT,
lambda visit: visit.vehicle.visits.index(visit) * 100
)
.as_constraint("Priority customers early")
)
How it works: Penalize by position in route × weight:
- 1st position: penalty 0
- 2nd position: penalty 100
- 5th position: penalty 400
This incentivizes placing priority customers earlier without making it a hard requirement.
Optimization concept: This implements soft sequencing preferences. Unlike hard sequencing (e.g., “A must come before B”), this just prefers certain orders.
Pattern 2: Vehicle-Customer Compatibility
Scenario: Certain vehicles cannot serve certain customers (e.g., refrigerated trucks for frozen goods, size restrictions).
First, add compatibility data to domain:
@dataclass
class Vehicle:
# ... existing fields ...
vehicle_type: str = "standard" # e.g., "refrigerated", "large", "standard"
@dataclass
class Visit:
# ... existing fields ...
required_vehicle_type: Optional[str] = None # None = any vehicle OK
Then the constraint:
def vehicle_customer_compatibility(constraint_factory: ConstraintFactory):
"""
Hard constraint: Only compatible vehicles can serve customers.
"""
return (
constraint_factory.for_each(Visit)
.filter(lambda visit: visit.required_vehicle_type is not None)
.filter(lambda visit: visit.vehicle is not None)
.filter(lambda visit: visit.vehicle.vehicle_type != visit.required_vehicle_type)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Vehicle customer compatibility")
)
Pattern 3: Balanced Workload Across Vehicles
Scenario: Distribute visits evenly across vehicles (avoid one vehicle doing everything).
def balance_vehicle_workload(constraint_factory: ConstraintFactory):
"""
Soft constraint: Balance number of visits across vehicles.
"""
return (
constraint_factory.for_each(Vehicle)
.group_by(
ConstraintCollectors.count_distinct(lambda vehicle: vehicle)
)
.complement(Vehicle, lambda v: 0) # Include empty vehicles
.group_by(
ConstraintCollectors.load_balance(
lambda vehicle: vehicle,
lambda vehicle: len(vehicle.visits)
)
)
.penalize(
HardSoftScore.ONE_SOFT,
lambda load_balance: int(load_balance.unfairness() * 100)
)
.as_constraint("Balance workload")
)
Optimization concept: This uses the load balancing collector which calculates variance in workload distribution. It’s more sophisticated than simple quadratic penalties.
Pattern 4: Break Requirements
Scenario: Drivers must take a 30-minute break after 4 hours of driving.
This requires modeling breaks explicitly:
@dataclass
class Break:
"""Represents a required break in a vehicle's route."""
id: str
duration: timedelta = timedelta(minutes=30)
min_driving_before: timedelta = timedelta(hours=4)
@dataclass
class Vehicle:
# ... existing fields ...
required_breaks: list[Break] = field(default_factory=list)
Then a complex constraint checking cumulative driving time:
def break_enforcement(constraint_factory: ConstraintFactory):
"""
Hard constraint: Breaks must occur within required intervals.
"""
# Implementation would track cumulative driving time and verify
# breaks are inserted appropriately in the route
# This is advanced and requires careful handling of route chains
pass
Note: Break enforcement is complex and often requires custom move selectors to ensure breaks are positioned correctly. This is beyond basic constraint writing.
Pattern 5: Distance Limits
Scenario: Each vehicle has a maximum total distance it can travel (fuel constraint).
def max_vehicle_distance(constraint_factory: ConstraintFactory):
"""
Hard constraint: Vehicle cannot exceed maximum distance.
"""
return (
constraint_factory.for_each(Vehicle)
.filter(lambda vehicle: len(vehicle.visits) > 0)
.filter(lambda vehicle:
vehicle.calculate_total_driving_time_seconds()
> vehicle.max_driving_seconds # New field needed
)
.penalize(
HardSoftScore.ONE_HARD,
lambda vehicle: int(
(vehicle.calculate_total_driving_time_seconds()
- vehicle.max_driving_seconds) / 60
)
)
.as_constraint("Max vehicle distance")
)
You’d need to add max_driving_seconds to the Vehicle class in domain.py.
Testing and Validation
Unit Testing Constraints
Best practice: Test each constraint in isolation without running full solver.
Open tests/test_constraints.py to see examples:
from vehicle_routing.domain import Vehicle, Visit, Location, VehicleRoutePlan
from vehicle_routing.constraints import define_constraints
from solverforge_legacy.solver.test import ConstraintVerifier
# Create verifier with your constraints
constraint_verifier = ConstraintVerifier.build(
define_constraints,
VehicleRoutePlan,
Vehicle,
Visit
)
Example: Test Capacity Constraint
def test_vehicle_capacity_unpenalized():
"""Capacity within limit should not penalize."""
vehicle = Vehicle(
id="v1",
name="Alpha",
capacity=100,
home_location=Location(0.0, 0.0),
departure_time=datetime(2025, 11, 27, 8, 0)
)
visit = Visit(
id="visit1",
name="Customer A",
location=Location(1.0, 1.0),
demand=80, # Within capacity
min_start_time=datetime(2025, 11, 27, 9, 0),
max_end_time=datetime(2025, 11, 27, 18, 0),
service_duration=timedelta(minutes=30)
)
# Connect visit to vehicle (helper from tests)
connect(vehicle, visit)
# Verify no penalty
constraint_verifier.verify_that(vehicle_capacity) \
.given(vehicle, visit) \
.penalizes_by(0)
def test_vehicle_capacity_penalized():
"""Exceeding capacity should penalize by overage amount."""
vehicle = Vehicle(id="v1", name="Alpha", capacity=100, ...)
visit1 = Visit(id="v1", demand=80, ...)
visit2 = Visit(id="v2", demand=40, ...) # Total 120 > 100
connect(vehicle, visit1, visit2)
# Should penalize by 20 (overage amount)
constraint_verifier.verify_that(vehicle_capacity) \
.given(vehicle, visit1, visit2) \
.penalizes_by(20)
Helper function for tests:
def connect(vehicle: Vehicle, *visits: Visit):
"""Helper to set up vehicle-visit relationships."""
vehicle.visits = list(visits)
for i, visit in enumerate(visits):
visit.vehicle = vehicle
visit.previous_visit = visits[i - 1] if i > 0 else None
visit.next_visit = visits[i + 1] if i < len(visits) - 1 else None
# Calculate arrival times
if i == 0:
travel_time = vehicle.home_location.driving_time_to(visit.location)
visit.arrival_time = vehicle.departure_time + timedelta(seconds=travel_time)
else:
travel_time = visits[i-1].location.driving_time_to(visit.location)
visit.arrival_time = visits[i-1].calculate_departure_time() + timedelta(seconds=travel_time)
Example: Test Time Window Constraint
def test_time_window_violation():
"""Finishing after deadline should penalize by delay minutes."""
vehicle = Vehicle(
id="v1",
name="Alpha",
departure_time=datetime(2025, 11, 27, 7, 0),
...
)
visit = Visit(
id="visit1",
max_end_time=datetime(2025, 11, 27, 12, 0), # Noon deadline
service_duration=timedelta(minutes=30),
...
)
# Set arrival causing late finish
visit.arrival_time = datetime(2025, 11, 27, 11, 45) # Arrive 11:45
# Service ends at 12:15 (30 min service) → 15 minutes late
constraint_verifier.verify_that(service_finished_after_max_end_time) \
.given(visit) \
.penalizes_by(15)
Run tests:
pytest tests/test_constraints.py -v
Integration Testing: Full Solve
Test the complete solving cycle in tests/test_feasible.py:
import time
from vehicle_routing.demo_data import DemoData, generate_demo_data
from vehicle_routing.solver import solver_manager
def test_solve_philadelphia():
"""Test that solver finds feasible solution for Philadelphia dataset."""
# Get demo problem
route_plan = generate_demo_data(DemoData.PHILADELPHIA)
# Verify initially unassigned
assert all(visit.vehicle is None for visit in route_plan.visits)
assert all(len(vehicle.visits) == 0 for vehicle in route_plan.vehicles)
# Start solving
job_id = "test-philadelphia"
solver_manager.solve(job_id, route_plan)
# Wait for completion (with timeout)
max_wait = 120 # 2 minutes
start = time.time()
while solver_manager.get_solver_status(job_id) != "NOT_SOLVING":
if time.time() - start > max_wait:
solver_manager.terminate_early(job_id)
break
time.sleep(2)
# Get solution
solution = solver_manager.get_final_best_solution(job_id)
# Verify all visits assigned
unassigned = [v for v in solution.visits if v.vehicle is None]
assert len(unassigned) == 0, f"{len(unassigned)} visits remain unassigned"
# Verify feasible (hard score = 0)
assert solution.score is not None
assert solution.score.hard_score == 0, \
f"Solution infeasible with hard score {solution.score.hard_score}"
# Report quality
print(f"Final score: {solution.score}")
print(f"Total driving time: {solution.score.soft_score / -1} seconds")
# Verify route integrity
for vehicle in solution.vehicles:
if vehicle.visits:
# Check capacity
total_demand = sum(v.demand for v in vehicle.visits)
assert total_demand <= vehicle.capacity, \
f"Vehicle {vehicle.id} over capacity: {total_demand}/{vehicle.capacity}"
# Check time windows
for visit in vehicle.visits:
assert not visit.is_service_finished_after_max_end_time(), \
f"Visit {visit.id} finishes late"
Run integration tests:
pytest tests/test_feasible.py -v
Manual Testing via UI
Start the application:
Open browser console (F12) to see API calls and responses
Load demo data:
- Select “PHILADELPHIA” from dropdown
- Verify map shows 55 customer markers + 1 depot
- Check visit list shows all customers unassigned
Solve and observe:
- Click “Solve”
- Watch score improve in real-time
- See routes appear on map (colored lines)
- Monitor stats panel for capacity and time info
Verify solution quality:
- Final hard score should be 0 (feasible)
- All visits should be assigned (no gray markers)
- Routes should form complete loops (depot → customers → depot)
- Check vehicle stats: demand ≤ capacity for each
Test different datasets:
- Try HARTFORT (tighter capacity constraints)
- Try FIRENZE (more visits, harder problem)
- Compare solve times and final scores
Test early termination:
- Start solving FIRENZE
- Click “Stop” after 10 seconds
- Verify you get a partial solution (may be infeasible)
Test with different datasets:
- Try PHILADELPHIA (55 visits), HARTFORT (50 visits), and FIRENZE (77 visits)
- Larger datasets take longer to solve but demonstrate scalability
Production Considerations
Constraints are evaluated millions of times per second during solving. Performance is critical.
❌ DON’T: Expensive operations in constraints
def bad_constraint(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Visit)
.filter(lambda visit:
fetch_customer_credit_score(visit.name) < 500) # API call!
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Bad")
)
✅ DO: Pre-compute before solving
# Before solving, once
blocked_customers = {
name for name, score in fetch_all_credit_scores().items()
if score < 500
}
def good_constraint(constraint_factory: ConstraintFactory):
return (
constraint_factory.for_each(Visit)
.filter(lambda visit: visit.name in blocked_customers) # Fast set lookup
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Good")
)
Performance tips:
- Pre-compute expensive calculations (distances, lookup tables)
- Cache property calculations in domain objects if safe
- Avoid loops and complex logic in lambda functions
- Use efficient data structures (sets for membership, dicts for lookup)
Distance Calculation: Two Modes
This quickstart supports two routing modes, selectable via the UI toggle or API parameter:
Haversine Mode (Default)
Fast great-circle distance calculation using the Haversine formula:
- No external dependencies or network calls
- Assumes 50 km/h average driving speed
- Routes display as straight lines on the map
- Best for: development, testing, and quick iterations
Real Roads Mode
Actual road network routing using OpenStreetMap data via OSMnx:
- Downloads and caches road network data locally
- Computes shortest paths using Dijkstra’s algorithm
- Routes display as actual road paths on the map
- Progress streaming via Server-Sent Events (SSE)
First-time use: The initial load downloads ~5-15 MB of road network data for the demo area (cached for subsequent runs).
How it works:
- Enable “Real Roads” toggle in the UI before loading demo data
- The system downloads/loads the OSM road network for the bounding box
- A distance matrix is precomputed for all location pairs
- The solver uses real driving times; the UI displays actual road routes
# The Location class automatically uses the distance matrix when set
Location.set_distance_matrix(matrix)
# Solver calls driving_time_to() which checks for matrix first
time = loc1.driving_time_to(loc2) # Uses matrix if available, else haversine
Custom Routing APIs
For production with proprietary routing (Google Maps, Mapbox, OSRM), pre-compute a distance matrix before solving:
def build_real_distance_matrix(locations):
"""Fetch actual driving times from routing API (run once, before solving)."""
matrix = {}
for loc1 in locations:
for loc2 in locations:
if loc1 != loc2:
# Call Google Maps / Mapbox / OSRM once per pair
matrix[(loc1, loc2)] = call_routing_api(loc1, loc2)
return matrix
Never call external APIs during solving — pre-compute everything.
Scaling Strategies
Problem size guidelines (30 second solve):
- Up to 100 visits × 5 vehicles: Good solutions
- 100-200 visits: Increase solve time to 2-5 minutes
- 200-500 visits: Increase to 10-30 minutes
- 500+ visits: Consider decomposition strategies
Decomposition approaches:
Geographic clustering:
# Split large problem into regions
north_region_visits = [v for v in visits if v.location.latitude > 40.5]
south_region_visits = [v for v in visits if v.location.latitude <= 40.5]
# Solve each region separately
north_solution = solve(north_vehicles, north_region_visits)
south_solution = solve(south_vehicles, south_region_visits)
Time-based decomposition:
# Solve AM deliveries, then PM deliveries
am_visits = [v for v in visits if v.max_end_time.hour < 13]
pm_visits = [v for v in visits if v.min_start_time.hour >= 13]
Multi-day routing:
# Route planning for weekly schedule
for day in ["Monday", "Tuesday", "Wednesday", ...]:
day_visits = get_visits_for_day(day)
solution = solve(vehicles, day_visits)
save_solution(day, solution)
Handling Infeasible Problems
Sometimes no feasible solution exists (e.g., time windows impossible to meet, insufficient vehicle capacity).
Detect and report:
solution = solver_manager.get_final_best_solution(job_id)
if solution.score.hard_score < 0:
# Analyze what's infeasible
unassigned = [v for v in solution.visits if v.vehicle is None]
over_capacity = [
v for v in solution.vehicles
if v.calculate_total_demand() > v.capacity
]
late_visits = [
v for v in solution.visits
if v.is_service_finished_after_max_end_time()
]
return {
"status": "infeasible",
"hard_score": solution.score.hard_score,
"issues": {
"unassigned_visits": len(unassigned),
"capacity_violations": len(over_capacity),
"time_window_violations": len(late_visits)
},
"suggestions": [
"Add more vehicles" if unassigned else None,
"Increase vehicle capacity" if over_capacity else None,
"Relax time windows" if late_visits else None
]
}
Relaxation strategies:
Soft capacity violations:
# Change capacity from hard to soft constraint
.penalize(HardSoftScore.ONE_SOFT, lambda v: v.total_demand - v.capacity)
Penalized unassigned visits:
# Allow unassigned visits with large penalty
factory.for_each(Visit)
.filter(lambda v: v.vehicle is None)
.penalize(HardSoftScore.of_soft(100000))
Flexible time windows:
# Allow late arrivals with graduated penalty
.penalize(HardSoftScore.ONE_SOFT, lambda v: v.service_finished_delay_in_minutes())
Real-Time Routing Adjustments
Scenario: Need to re-route due to:
- New urgent orders received
- Vehicle breakdown
- Traffic delays
Dynamic re-routing:
def add_urgent_visit(current_solution: VehicleRoutePlan, new_visit: Visit):
"""Add urgent visit and re-optimize."""
# Add to problem
current_solution.visits.append(new_visit)
# Use current solution as warm start
job_id = f"urgent-{uuid4()}"
solver_manager.solve_and_listen(
job_id,
current_solution, # Starts from current routes
callback,
problem_change=ProblemChange.add_entity(new_visit)
)
return job_id
def handle_vehicle_breakdown(solution: VehicleRoutePlan, broken_vehicle_id: str):
"""Re-assign visits from broken vehicle."""
broken_vehicle = next(v for v in solution.vehicles if v.id == broken_vehicle_id)
# Unassign all visits from this vehicle
for visit in broken_vehicle.visits:
visit.vehicle = None
broken_vehicle.visits = []
# Mark vehicle unavailable
broken_vehicle.capacity = 0
# Re-solve
solver_manager.solve_and_listen("emergency-replan", solution, callback)
Optimization concept: Warm starting from current solution makes re-routing much faster than solving from scratch. The solver starts with current routes and only modifies what’s necessary.
Monitoring and Logging
Track key metrics:
import logging
import time
logger = logging.getLogger(__name__)
start_time = time.time()
solver_manager.solve_and_listen(job_id, route_plan, callback)
# ... wait for completion ...
solution = solver_manager.get_final_best_solution(job_id)
duration = time.time() - start_time
# Calculate metrics
total_visits = len(solution.visits)
total_vehicles = len(solution.vehicles)
assigned_visits = sum(1 for v in solution.visits if v.vehicle is not None)
total_distance = -solution.score.soft_score if solution.score else 0
logger.info(
f"Solved route plan {job_id}: "
f"duration={duration:.1f}s, "
f"score={solution.score}, "
f"visits={assigned_visits}/{total_visits}, "
f"vehicles={total_vehicles}, "
f"distance={total_distance}s"
)
# Alert if infeasible
if solution.score and solution.score.hard_score < 0:
logger.warning(
f"Infeasible solution for {job_id}: "
f"hard_score={solution.score.hard_score}"
)
Production monitoring:
- Solve duration: Alert if suddenly increases (data quality issue?)
- Infeasibility rate: Track percentage of infeasible solutions
- Score trends: Monitor if soft scores degrading over time
- Capacity utilization: Are vehicles underutilized? (might need fewer vehicles)
- Time window tightness: Frequent time violations? (might need more vehicles)
Quick Reference
File Locations
| Need to… | Edit this file |
|---|
| Add/change business rule | src/vehicle_routing/constraints.py |
| Add field to Vehicle | src/vehicle_routing/domain.py + converters.py |
| Add field to Visit | src/vehicle_routing/domain.py + converters.py |
| Change solve time | src/vehicle_routing/solver.py |
| Change distance calculation | src/vehicle_routing/routing.py |
| Configure routing mode | src/vehicle_routing/routing.py |
| Add REST endpoint | src/vehicle_routing/rest_api.py |
| Change demo data | src/vehicle_routing/demo_data.py |
| Change UI/map | static/index.html, static/app.js |
Common Constraint Patterns
Unary constraint (single entity):
constraint_factory.for_each(Vehicle)
.filter(lambda vehicle: # condition)
.penalize(HardSoftScore.ONE_HARD)
Filtering by route property:
constraint_factory.for_each(Vehicle)
.filter(lambda vehicle: len(vehicle.visits) > 0) # Has visits
.filter(lambda vehicle: vehicle.calculate_total_demand() > vehicle.capacity)
.penalize(...)
Visit constraints:
constraint_factory.for_each(Visit)
.filter(lambda visit: visit.vehicle is not None) # Assigned only
.filter(lambda visit: # condition)
.penalize(...)
Summing over vehicles:
constraint_factory.for_each(Vehicle)
.penalize(
HardSoftScore.ONE_SOFT,
lambda vehicle: vehicle.calculate_total_driving_time_seconds()
)
Variable penalty amount:
.penalize(
HardSoftScore.ONE_HARD,
lambda entity: calculate_penalty_amount(entity)
)
Common Domain Patterns
Check if visit assigned:
if visit.vehicle is not None:
# Visit is assigned to a vehicle
Iterate through route:
for i, visit in enumerate(vehicle.visits):
print(f"Stop {i+1}: {visit.name}")
Calculate route metrics:
total_demand = sum(v.demand for v in vehicle.visits)
total_time = (vehicle.arrival_time - vehicle.departure_time).total_seconds()
avg_demand = total_demand / len(vehicle.visits) if vehicle.visits else 0
Time calculations:
arrival = visit.arrival_time
service_start = max(arrival, visit.min_start_time) # Wait if arrived early
service_end = service_start + visit.service_duration
is_late = service_end > visit.max_end_time
Debugging Tips
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
Analyze solution score:
from vehicle_routing.solver import solution_manager
analysis = solution_manager.analyze(route_plan)
print(analysis.summary())
# See detailed constraint breakdown
for constraint in analysis.constraint_analyses:
print(f"{constraint.name}: {constraint.score}")
for match in constraint.matches:
print(f" - {match.justification}")
Test constraint in isolation:
from vehicle_routing.constraints import define_constraints
from solverforge_legacy.test import ConstraintVerifier
verifier = ConstraintVerifier.build(
define_constraints,
VehicleRoutePlan,
Vehicle,
Visit
)
# Test specific scenario
verifier.verify_that(vehicle_capacity) \
.given(test_vehicle, test_visit) \
.penalizes_by(expected_penalty)
Visualize route in tests:
def print_route(vehicle: Vehicle):
"""Debug helper to print route."""
print(f"Vehicle {vehicle.id}:")
print(f" Capacity: {vehicle.total_demand}/{vehicle.capacity}")
print(f" Route:")
print(f" Depot → {vehicle.departure_time}")
for i, visit in enumerate(vehicle.visits):
print(f" {visit.name} → arrive {visit.arrival_time}, "
f"depart {visit.calculate_departure_time()}")
print(f" Depot → {vehicle.arrival_time}")
Common Gotchas
Forgot to handle empty routes
- Check
len(vehicle.visits) > 0 before accessing route properties - Symptom: IndexError or None errors
Shadow variables not updated
- Use the
connect() helper in tests to properly link visits - In production, solver maintains these automatically
- Symptom: arrival_time is None or incorrect
Distance calculation too slow
- Pre-compute distance matrices before solving
- Never call external APIs during constraint evaluation
- Symptom: Solving extremely slow (< 100 evaluations/second)
Forgot to register constraint
- Add to
define_constraints() return list - Symptom: Constraint not enforced
Time zone issues
- Use timezone-aware datetime objects consistently
- Or use naive datetime (no timezone) consistently
- Symptom: Time calculations off by hours
Capacity violations not penalized
- Ensure
calculate_total_demand() is used, not manual sum - Check filter logic: should penalize when demand > capacity
- Symptom: Solutions with impossible loads
Typical evaluation speeds (on modern hardware):
| Problem Size | Evaluations/Second | 30-Second Results |
|---|
| 20 visits, 2 vehicles | 10,000+ | Near-optimal |
| 50 visits, 5 vehicles | 5,000+ | High quality |
| 100 visits, 8 vehicles | 2,000+ | Good quality |
| 200 visits, 10 vehicles | 500-1000 | Decent quality |
If your speeds are significantly lower, review constraint complexity and pre-compute expensive operations.
Conclusion
You now have a complete understanding of constraint-based vehicle routing:
✅ Problem modeling — How to represent routing problems with vehicles, visits, and locations
✅ Constraint logic — How to express capacity, time windows, and distance minimization
✅ Route construction — How list variables and shadow variables build efficient routes
✅ Customization patterns — How to extend for your routing needs
✅ Production readiness — Performance, scaling, and infeasibility handling
Next Steps
- Run the application and experiment with the three demo datasets
- Modify an existing constraint — change capacity limits or time windows
- Add your own constraint — implement a rule from your domain (max distance, breaks, priorities)
- Test thoroughly — write unit tests for your constraints
- Customize the data model — add vehicle types, visit priorities, or other business fields
- Deploy with real data — integrate with your customer database and mapping service
Key Takeaways
List Variables for Routing:
PlanningListVariable on vehicle.visits handles both assignment and sequencing- Shadow variables automatically maintain route chain integrity
- Arrival times cascade through the chain for efficient time calculations
Hard vs Soft Constraints:
- Hard: Capacity and time windows (must satisfy for valid routes)
- Soft: Total distance (optimize after ensuring validity)
Graded Penalties:
- Penalize by excess amount (not just binary yes/no)
- Helps solver navigate incrementally toward feasibility
- Example: 20 units over capacity is “less wrong” than 50 units over
Metaheuristics for Routing:
- Efficiently explore massive solution spaces (millions of possibilities)
- Anytime algorithms: improve continuously, stop when satisfied
- No guarantee of global optimum, but high-quality solutions in practical time
The Power of Constraints:
- Most business logic lives in one file (
constraints.py) - Easy to add new rules without changing core routing logic
- Declarative: describe what you want, solver figures out how
Comparison to Other Quickstarts
vs. Employee Scheduling:
- Scheduling: Temporal (when to schedule shifts)
- Routing: Spatial + temporal (where to go and when to arrive)
- Scheduling: Simple planning variable (which employee)
- Routing: List variable (which visits, in what order)
vs. Other Routing Variants:
- CVRP (Capacitated Vehicle Routing): Just capacity, no time windows
- VRPTW (this quickstart): Capacity + time windows
- VRPPD (Pickup & Delivery): Precedence constraints between pickup/delivery pairs
- MDVRP (Multi-Depot): Multiple starting locations
This quickstart teaches core concepts applicable to all routing variants.
Additional Resources
Questions? Start by solving the demo datasets and observing how the routes are constructed. Try modifying capacity or time windows to see how the solver adapts. The best way to learn routing optimization is to experiment and visualize the results.
Happy routing! 🚚📦