Constraints
Define constraints using the fluent Constraint Streams API.
Constraints define the rules that make a solution valid and optimal. SolverForge uses a fluent Constraint Streams API that lets you express constraints declaratively.
Topics
Constraint Types
| Type | Purpose | Example |
|---|
| Hard | Must be satisfied for feasibility | No two lessons in the same room at the same time |
| Soft | Preferences to optimize | Teachers prefer consecutive lessons |
| Medium | Between hard and soft (optional) | Important but not mandatory constraints |
Example
from solverforge_legacy.solver.score import (
constraint_provider, ConstraintFactory, Constraint, Joiners, HardSoftScore
)
@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
return [
room_conflict(constraint_factory),
teacher_conflict(constraint_factory),
teacher_room_stability(constraint_factory),
]
def room_conflict(constraint_factory: ConstraintFactory) -> Constraint:
# Hard constraint: No two lessons in the same room at the same time
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.room),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Room conflict")
)
def teacher_room_stability(constraint_factory: ConstraintFactory) -> Constraint:
# Soft constraint: Teachers prefer teaching in the same room
return (
constraint_factory
.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.teacher),
)
.filter(lambda lesson1, lesson2: lesson1.room != lesson2.room)
.penalize(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher room stability")
)
1 - Constraint Streams
Build constraints using the fluent Constraint Streams API.
The Constraint Streams API is a fluent, declarative way to define constraints. It’s inspired by Java Streams and SQL, allowing you to express complex scoring logic concisely.
Basic Structure
Every constraint follows this pattern:
from solverforge_legacy.solver.score import (
constraint_provider, ConstraintFactory, Constraint, HardSoftScore
)
@constraint_provider
def define_constraints(factory: ConstraintFactory) -> list[Constraint]:
return [
my_constraint(factory),
]
def my_constraint(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(MyEntity) # 1. Select entities
.filter(lambda e: e.is_active) # 2. Filter matches
.penalize(HardSoftScore.ONE_HARD) # 3. Apply score impact
.as_constraint("My constraint") # 4. Name the constraint
)
Stream Types
Streams are typed by the number of entities they carry:
| Stream Type | Entities | Example Use |
|---|
UniConstraintStream | 1 | Single entity constraints |
BiConstraintStream | 2 | Pair constraints |
TriConstraintStream | 3 | Triple constraints |
QuadConstraintStream | 4 | Quad constraints |
Starting a Stream
for_each()
Start with all instances of an entity class:
factory.for_each(Lesson)
# Stream of: Lesson1, Lesson2, Lesson3, ...
for_each_unique_pair()
Get all unique pairs (no duplicates, no self-pairs):
factory.for_each_unique_pair(Lesson)
# Stream of: (L1,L2), (L1,L3), (L2,L3), ...
# NOT: (L1,L1), (L2,L1), ...
With joiners for efficient filtering:
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
# Only pairs with same timeslot AND same room
for_each_including_unassigned()
Include entities with unassigned planning variables:
factory.for_each_including_unassigned(Lesson)
# Includes lessons where timeslot=None or room=None
Filtering
filter()
Remove non-matching items:
factory.for_each(Lesson)
.filter(lambda lesson: lesson.teacher == "A. Turing")
For bi-streams:
factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.room != l2.room)
Joining
join()
Combine streams:
factory.for_each(Lesson)
.join(Room)
# BiStream of (Lesson, Room) for all combinations
With joiners:
factory.for_each(Lesson)
.join(
Room,
Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)
# BiStream of (Lesson, Room) where lesson.room == room
See Joiners for available joiner types.
if_exists() / if_not_exists()
Check for existence without creating pairs:
# Lessons that have at least one other lesson in the same room
factory.for_each(Lesson)
.if_exists(
Lesson,
Joiners.equal(lambda l: l.room),
Joiners.filtering(lambda l1, l2: l1.id != l2.id)
)
# Employees not assigned to any shift
factory.for_each(Employee)
.if_not_exists(
Shift,
Joiners.equal(lambda emp: emp, lambda shift: shift.employee)
)
Grouping
group_by()
Aggregate entities:
from solverforge_legacy.solver.score import ConstraintCollectors
# Count lessons per teacher
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.count()
)
# BiStream of (teacher, count)
Multiple collectors:
# Get count and list of lessons per teacher
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.count(),
ConstraintCollectors.to_list(lambda l: l)
)
# TriStream of (teacher, count, lesson_list)
See Collectors for available collector types.
Mapping
map()
Transform stream elements:
factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
# UniStream of teachers (with duplicates)
expand()
Add derived values:
factory.for_each(Lesson)
.expand(lambda lesson: lesson.duration_minutes)
# BiStream of (Lesson, duration)
distinct()
Remove duplicates:
factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
.distinct()
# UniStream of unique teachers
Scoring
penalize()
Apply negative score for matches:
# Hard constraint
.penalize(HardSoftScore.ONE_HARD)
# Soft constraint
.penalize(HardSoftScore.ONE_SOFT)
# Dynamic weight
.penalize(HardSoftScore.ONE_SOFT, lambda lesson: lesson.priority)
reward()
Apply positive score for matches:
# Reward preferred assignments
.reward(HardSoftScore.ONE_SOFT, lambda lesson: lesson.preference_score)
impact()
Apply positive or negative score based on value:
# Positive values reward, negative values penalize
.impact(HardSoftScore.ONE_SOFT, lambda l: l.score_impact)
Finalizing
as_constraint()
Name the constraint (required):
.as_constraint("Room conflict")
justify_with()
Add custom justification for score explanation:
.penalize(HardSoftScore.ONE_HARD)
.justify_with(lambda l1, l2, score: RoomConflictJustification(l1, l2, score))
.as_constraint("Room conflict")
indict_with()
Specify which entities to blame:
.penalize(HardSoftScore.ONE_HARD)
.indict_with(lambda l1, l2: [l1, l2])
.as_constraint("Room conflict")
Complete Examples
Room Conflict (Hard)
def room_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Room conflict")
)
Teacher Room Stability (Soft)
def teacher_room_stability(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.teacher)
)
.filter(lambda l1, l2: l1.room != l2.room)
.penalize(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher room stability")
)
Balance Workload (Soft)
def balance_workload(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
.filter(lambda employee, count: count > 5)
.penalize(
HardSoftScore.ONE_SOFT,
lambda employee, count: count - 5 # Penalize excess shifts
)
.as_constraint("Balance workload")
)
Best Practices
Do
- Use joiners in
for_each_unique_pair() for efficiency - Name constraints descriptively
- Break complex constraints into helper functions
Don’t
- Use
filter() when a joiner would work (less efficient) - Create overly complex single constraints (split them)
- Forget to call
as_constraint()
Next Steps
2 - Joiners
Efficiently filter and match entities in constraint streams.
Joiners efficiently filter pairs of entities during joins and unique pair operations. They’re more efficient than post-join filtering because they use indexing.
Basic Usage
from solverforge_legacy.solver.score import Joiners
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda lesson: lesson.timeslot),
Joiners.equal(lambda lesson: lesson.room),
)
Multiple joiners are combined with AND logic.
Available Joiners
equal()
Match when property values are equal:
# Same timeslot
Joiners.equal(lambda lesson: lesson.timeslot)
# In a join, specify both sides
factory.for_each(Lesson).join(
Room,
Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)
less_than() / less_than_or_equal()
Match when first value is less than second:
# l1.priority < l2.priority
Joiners.less_than(lambda lesson: lesson.priority)
# l1.start_time <= l2.start_time
Joiners.less_than_or_equal(lambda lesson: lesson.start_time)
greater_than() / greater_than_or_equal()
Match when first value is greater than second:
# l1.priority > l2.priority
Joiners.greater_than(lambda lesson: lesson.priority)
# l1.end_time >= l2.end_time
Joiners.greater_than_or_equal(lambda lesson: lesson.end_time)
overlapping()
Match when ranges overlap:
# Time overlap: [start1, end1) overlaps [start2, end2)
Joiners.overlapping(
lambda l: l.start_time, # Start of range 1
lambda l: l.end_time, # End of range 1
lambda l: l.start_time, # Start of range 2
lambda l: l.end_time, # End of range 2
)
For a join between different types:
factory.for_each(Meeting).join(
Availability,
Joiners.overlapping(
lambda m: m.start_time,
lambda m: m.end_time,
lambda a: a.start_time,
lambda a: a.end_time,
)
)
filtering()
Custom filter function (less efficient, use as last resort):
# Custom logic that can't be expressed with other joiners
Joiners.filtering(lambda l1, l2: l1.is_compatible_with(l2))
Combining Joiners
Joiners are combined with AND:
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot), # Same timeslot AND
Joiners.equal(lambda l: l.room), # Same room
)
Index-Based Joiners (Preferred)
These joiners use internal indexes for O(1) or O(log n) lookup:
equal() - Hash indexless_than(), greater_than() - Tree indexoverlapping() - Interval tree
Filtering Joiner (Slower)
filtering() checks every pair, O(n²):
# Avoid when possible - checks all pairs
Joiners.filtering(lambda l1, l2: some_complex_check(l1, l2))
Optimization Tips
Good: Index joiners first, filtering last:
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot), # Index first
Joiners.filtering(lambda l1, l2: custom(l1, l2)) # Filter remaining
)
Bad: Only filtering (checks all pairs):
factory.for_each_unique_pair(
Lesson,
Joiners.filtering(lambda l1, l2: l1.timeslot == l2.timeslot and custom(l1, l2))
)
Examples
Time Conflict Detection
def time_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Shift,
Joiners.equal(lambda s: s.employee),
Joiners.overlapping(
lambda s: s.start_time,
lambda s: s.end_time,
lambda s: s.start_time,
lambda s: s.end_time,
),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Employee time conflict")
)
Same Day Sequential
def same_day_sequential(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Lesson)
.join(
Lesson,
Joiners.equal(lambda l: l.teacher),
Joiners.equal(lambda l: l.timeslot.day_of_week),
Joiners.less_than(lambda l: l.timeslot.start_time),
Joiners.filtering(lambda l1, l2:
(l2.timeslot.start_time - l1.timeslot.end_time).seconds <= 1800
),
)
.reward(HardSoftScore.ONE_SOFT)
.as_constraint("Teacher consecutive lessons")
)
Resource Assignment
def resource_assignment(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Task)
.join(
Resource,
Joiners.equal(lambda t: t.required_skill, lambda r: r.skill),
Joiners.greater_than_or_equal(lambda t: t.priority, lambda r: r.min_priority),
)
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Resource skill match")
)
Joiner vs Filter
| Use Joiner When | Use Filter When |
|---|
| Checking equality | Complex logic |
| Comparing values | Multiple conditions with OR |
| Range overlap | Calling methods on entities |
| Performance matters | Simple one-off checks |
Next Steps
3 - Collectors
Aggregate data in constraint streams using collectors.
Collectors aggregate data when grouping entities. They’re used with group_by() to compute counts, sums, lists, and other aggregations.
Basic Usage
from solverforge_legacy.solver.score import ConstraintCollectors
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee, # Group key
ConstraintCollectors.count() # Collector
)
# Result: BiStream of (employee, count)
Available Collectors
count()
Count items in each group:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
# (Employee, int)
count_distinct()
Count unique values:
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.count_distinct(lambda l: l.room)
)
# (Teacher, number of distinct rooms)
sum()
Sum numeric values:
factory.for_each(Visit)
.group_by(
lambda visit: visit.vehicle,
ConstraintCollectors.sum(lambda v: v.demand)
)
# (Vehicle, total demand)
min() / max()
Find minimum or maximum:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.min(lambda s: s.start_time)
)
# (Employee, earliest start time)
With comparator:
ConstraintCollectors.max(
lambda shift: shift,
key=lambda s: s.priority
)
# Returns the shift with highest priority
average()
Calculate average:
factory.for_each(Task)
.group_by(
lambda task: task.worker,
ConstraintCollectors.average(lambda t: t.duration)
)
# (Worker, average task duration)
to_list()
Collect into a list:
factory.for_each(Visit)
.group_by(
lambda visit: visit.vehicle,
ConstraintCollectors.to_list(lambda v: v)
)
# (Vehicle, list of visits)
to_set()
Collect into a set (unique values):
factory.for_each(Lesson)
.group_by(
lambda lesson: lesson.teacher,
ConstraintCollectors.to_set(lambda l: l.room)
)
# (Teacher, set of rooms)
to_sorted_set()
Collect into a sorted set:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.to_sorted_set(lambda s: s.start_time)
)
# (Employee, sorted set of start times)
compose()
Combine multiple collectors:
ConstraintCollectors.compose(
ConstraintCollectors.count(),
ConstraintCollectors.sum(lambda s: s.hours),
lambda count, total_hours: (count, total_hours)
)
# Returns (count, sum) tuple
conditional()
Collect only matching items:
ConstraintCollectors.conditional(
lambda shift: shift.is_night,
ConstraintCollectors.count()
)
# Count only night shifts
Multiple Collectors
Use multiple collectors in one group_by:
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count(),
ConstraintCollectors.sum(lambda s: s.hours),
ConstraintCollectors.min(lambda s: s.start_time),
)
# QuadStream: (Employee, count, total_hours, earliest_start)
Grouping Patterns
Count Per Category
def balance_shift_count(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.count()
)
.filter(lambda employee, count: count > 5)
.penalize(
HardSoftScore.ONE_SOFT,
lambda employee, count: (count - 5) ** 2
)
.as_constraint("Balance shift count")
)
Sum with Threshold
def vehicle_capacity(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Visit)
.group_by(
lambda visit: visit.vehicle,
ConstraintCollectors.sum(lambda v: v.demand)
)
.filter(lambda vehicle, total: total > vehicle.capacity)
.penalize(
HardSoftScore.ONE_HARD,
lambda vehicle, total: total - vehicle.capacity
)
.as_constraint("Vehicle capacity")
)
Load Distribution
def fair_distribution(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Task)
.group_by(
lambda task: task.worker,
ConstraintCollectors.count()
)
.group_by(
ConstraintCollectors.min(lambda worker, count: count),
ConstraintCollectors.max(lambda worker, count: count),
)
.filter(lambda min_count, max_count: max_count - min_count > 2)
.penalize(
HardSoftScore.ONE_SOFT,
lambda min_count, max_count: max_count - min_count
)
.as_constraint("Fair task distribution")
)
Consecutive Detection
def consecutive_shifts(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Shift)
.group_by(
lambda shift: shift.employee,
ConstraintCollectors.to_sorted_set(lambda s: s.date)
)
.filter(lambda employee, dates: has_consecutive_days(dates, 6))
.penalize(HardSoftScore.ONE_HARD)
.as_constraint("Max consecutive days")
)
def has_consecutive_days(dates: set, max_consecutive: int) -> bool:
sorted_dates = sorted(dates)
consecutive = 1
for i in range(1, len(sorted_dates)):
if (sorted_dates[i] - sorted_dates[i-1]).days == 1:
consecutive += 1
if consecutive > max_consecutive:
return True
else:
consecutive = 1
return False
Prefer count() over to_list()
# Good: Efficient counting
ConstraintCollectors.count()
# Avoid: Creates list just to count
ConstraintCollectors.to_list(lambda x: x).map(len)
Use conditional() for Filtered Counts
# Good: Single pass
ConstraintCollectors.conditional(
lambda s: s.is_weekend,
ConstraintCollectors.count()
)
# Avoid: Filter then count
factory.for_each(Shift)
.filter(lambda s: s.is_weekend)
.group_by(...)
Minimize Data in Collectors
# Good: Collect only needed data
ConstraintCollectors.to_list(lambda s: s.start_time)
# Avoid: Collect entire objects
ConstraintCollectors.to_list(lambda s: s)
Next Steps
4 - Score Types
Choose the right score type for your constraints.
Score types determine how constraint violations and rewards are measured. Choose the type that matches your problem’s structure.
Available Score Types
| Score Type | Levels | Use Case |
|---|
SimpleScore | 1 | Single optimization objective |
HardSoftScore | 2 | Feasibility + optimization |
HardMediumSoftScore | 3 | Hard + important + nice-to-have |
BendableScore | N | Custom number of levels |
*DecimalScore variants | - | Decimal precision |
SimpleScore
For single-objective optimization:
from solverforge_legacy.solver.score import SimpleScore
# In domain model
score: Annotated[SimpleScore, PlanningScore] = field(default=None)
# In constraints
.penalize(SimpleScore.ONE)
.reward(SimpleScore.of(10))
Use when: You only need to maximize or minimize one thing (e.g., total profit, total distance).
HardSoftScore
The most common type—separates feasibility from optimization:
from solverforge_legacy.solver.score import HardSoftScore
# In domain model
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)
# In constraints
.penalize(HardSoftScore.ONE_HARD) # Broken constraint
.penalize(HardSoftScore.ONE_SOFT) # Suboptimal
.penalize(HardSoftScore.of_hard(5)) # Weighted hard
.penalize(HardSoftScore.of_soft(10)) # Weighted soft
Hard constraints:
- Must be satisfied for a feasible solution
- Score format:
Xhard/Ysoft 0hard/*soft = feasible
Soft constraints:
- Preferences to optimize
- Better soft scores are preferred among feasible solutions
Use when: You have rules that must be followed AND preferences to optimize.
HardMediumSoftScore
Three levels of priority:
from solverforge_legacy.solver.score import HardMediumSoftScore
# In domain model
score: Annotated[HardMediumSoftScore, PlanningScore] = field(default=None)
# In constraints
.penalize(HardMediumSoftScore.ONE_HARD) # Must satisfy
.penalize(HardMediumSoftScore.ONE_MEDIUM) # Important preference
.penalize(HardMediumSoftScore.ONE_SOFT) # Nice to have
Use when:
- Medium = “Assign as many as possible”
- Medium = “Important but not mandatory”
- Medium = “Prefer over soft, but not as critical as hard”
Example: Meeting scheduling where:
- Hard: Required attendees must be available
- Medium: Preferred attendees should attend
- Soft: Room size preferences
BendableScore
Custom number of hard and soft levels:
from solverforge_legacy.solver.score import BendableScore
# Configure levels (3 hard, 2 soft)
score: Annotated[BendableScore, PlanningScore] = field(default=None)
# In constraints
.penalize(BendableScore.of_hard(0, 1)) # First hard level
.penalize(BendableScore.of_hard(1, 1)) # Second hard level
.penalize(BendableScore.of_soft(0, 1)) # First soft level
Use when: You need more than 3 priority levels.
Decimal Score Variants
For precise calculations:
from solverforge_legacy.solver.score import HardSoftDecimalScore
score: Annotated[HardSoftDecimalScore, PlanningScore] = field(default=None)
# In constraints
from decimal import Decimal
.penalize(HardSoftDecimalScore.of_soft(Decimal("0.01")))
Available variants:
SimpleDecimalScoreHardSoftDecimalScoreHardMediumSoftDecimalScoreBendableDecimalScore
Use when: Integer scores aren’t precise enough (e.g., money, distances).
Score Constants
Common score values are predefined:
# SimpleScore
SimpleScore.ZERO
SimpleScore.ONE
SimpleScore.of(n)
# HardSoftScore
HardSoftScore.ZERO
HardSoftScore.ONE_HARD
HardSoftScore.ONE_SOFT
HardSoftScore.of_hard(n)
HardSoftScore.of_soft(n)
HardSoftScore.of(hard, soft)
# HardMediumSoftScore
HardMediumSoftScore.ZERO
HardMediumSoftScore.ONE_HARD
HardMediumSoftScore.ONE_MEDIUM
HardMediumSoftScore.ONE_SOFT
HardMediumSoftScore.of(hard, medium, soft)
Dynamic Weights
Apply weights based on entity properties:
def weighted_penalty(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each(Task)
.filter(lambda t: t.is_late())
.penalize(
HardSoftScore.ONE_SOFT,
lambda task: task.priority # High priority = bigger penalty
)
.as_constraint("Late task")
)
Score Comparison
Scores are compared level by level:
# Hard first, then soft
0hard/-100soft > -1hard/0soft (first is feasible)
-1hard/-50soft > -2hard/-10soft (first has better hard)
0hard/-50soft > 0hard/-100soft (same hard, better soft)
Score Properties
score = HardSoftScore.of(-2, -100)
score.is_feasible # False (hard < 0)
score.hard_score # -2
score.soft_score # -100
str(score) # "-2hard/-100soft"
HardSoftScore.parse("-2hard/-100soft") # Parse from string
Choosing a Score Type
| Question | Recommendation |
|---|
| Need feasibility check? | Use HardSoftScore |
| Single objective only? | Use SimpleScore |
| “Assign as many as possible”? | Use HardMediumSoftScore |
| More than 3 priority levels? | Use BendableScore |
| Need decimal precision? | Use *DecimalScore variant |
Best Practices
Do
- Use
HardSoftScore as default choice - Keep hard constraints truly hard (legal requirements, physical limits)
- Use consistent weight scales within each level
Don’t
- Use medium level for actual hard constraints
- Over-complicate with BendableScore when HardMediumSoftScore works
- Mix units in the same level (e.g., minutes and dollars)
Next Steps
5 - Score Analysis
Understand why a solution has its score.
Score analysis helps you understand why a solution received its score. This is essential for debugging constraints and explaining results to users.
SolutionManager
Use SolutionManager to analyze solutions:
from solverforge_legacy.solver import SolverFactory, SolutionManager
solver_factory = SolverFactory.create(solver_config)
solution_manager = SolutionManager.create(solver_factory)
# Analyze a solution
analysis = solution_manager.analyze(solution)
Score Explanation
Get a breakdown of constraint scores:
analysis = solution_manager.analyze(solution)
# Overall score
print(f"Score: {analysis.score}")
# Per-constraint breakdown
for constraint_analysis in analysis.constraint_analyses():
print(f"{constraint_analysis.constraint_name}: {constraint_analysis.score}")
print(f" Match count: {constraint_analysis.match_count}")
Example output:
Score: -2hard/-15soft
Room conflict: -2hard
Match count: 2
Teacher room stability: -10soft
Match count: 10
Teacher time efficiency: -5soft
Match count: 5
Constraint Matches
See exactly which entities triggered each constraint:
for constraint_analysis in analysis.constraint_analyses():
print(f"\n{constraint_analysis.constraint_name}:")
for match in constraint_analysis.matches():
print(f" Match: {match.justification}")
print(f" Score: {match.score}")
Indictments
Find which entities are causing problems:
# Get indictments (entities blamed for score impact)
for indictment in analysis.indictments():
print(f"\nEntity: {indictment.indicted_object}")
print(f"Total score impact: {indictment.score}")
for match in indictment.matches():
print(f" - {match.constraint_name}: {match.score}")
Example output:
Entity: Lesson(id=1, subject='Math')
Total score impact: -1hard/-3soft
- Room conflict: -1hard
- Teacher room stability: -3soft
Custom Justifications
Add explanations to your constraints:
@dataclass
class RoomConflictJustification:
lesson1: Lesson
lesson2: Lesson
timeslot: Timeslot
room: Room
def __str__(self):
return (f"{self.lesson1.subject} and {self.lesson2.subject} "
f"both scheduled in {self.room} at {self.timeslot}")
def room_conflict(factory: ConstraintFactory) -> Constraint:
return (
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
.penalize(HardSoftScore.ONE_HARD)
.justify_with(lambda l1, l2, score: RoomConflictJustification(
l1, l2, l1.timeslot, l1.room
))
.as_constraint("Room conflict")
)
Debugging Constraints
Verify Score Calculation
# Calculate score without solving
score = solution_manager.update(solution)
print(f"Calculated score: {score}")
Find Missing Constraints
If a constraint isn’t firing when expected:
# Check if specific entities match
for constraint_analysis in analysis.constraint_analyses():
if constraint_analysis.constraint_name == "Room conflict":
if constraint_analysis.match_count == 0:
print("No room conflicts detected!")
# Check your joiners and filters
Verify Feasibility
if not solution.score.is_feasible:
print("Solution is infeasible!")
for ca in analysis.constraint_analyses():
if ca.score.hard_score < 0:
print(f"Hard constraint broken: {ca.constraint_name}")
for match in ca.matches():
print(f" {match.justification}")
Integration with FastAPI
Expose score analysis in your API:
from fastapi import FastAPI
@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
solution = solutions.get(job_id)
if not solution:
raise HTTPException(404, "Job not found")
analysis = solution_manager.analyze(solution)
return {
"score": str(analysis.score),
"is_feasible": analysis.score.is_feasible,
"constraints": [
{
"name": ca.constraint_name,
"score": str(ca.score),
"match_count": ca.match_count,
}
for ca in analysis.constraint_analyses()
]
}
Best Practices
Do
- Use
justify_with() for user-facing explanations - Check score analysis when debugging constraints
- Expose score breakdown in your UI
Don’t
- Analyze every solution during solving (performance)
- Ignore indictments when troubleshooting
- Forget to handle infeasible solutions
Score Comparison
Compare two solutions:
def compare_solutions(old: Timetable, new: Timetable):
old_analysis = solution_manager.analyze(old)
new_analysis = solution_manager.analyze(new)
print(f"Score improved: {old.score} -> {new.score}")
old_constraints = {ca.constraint_name: ca for ca in old_analysis.constraint_analyses()}
new_constraints = {ca.constraint_name: ca for ca in new_analysis.constraint_analyses()}
for name in old_constraints:
old_ca = old_constraints[name]
new_ca = new_constraints.get(name)
if new_ca and old_ca.score != new_ca.score:
print(f" {name}: {old_ca.score} -> {new_ca.score}")
Next Steps
6 - Constraint Performance
Optimize constraint evaluation for faster solving.
Efficient constraint evaluation is critical for solver performance. Most solving time is spent calculating scores, so optimizing constraints has a direct impact on solution quality.
1. Use Joiners Instead of Filters
Joiners use indexes for O(1) lookups. Filters check every item.
# Good: Uses index
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot)
)
# Bad: Checks all pairs
factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.timeslot == l2.timeslot)
2. Put Selective Joiners First
More selective joiners reduce the search space faster:
# Good: timeslot has few values, filters early
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot), # Few timeslots
Joiners.equal(lambda l: l.teacher), # More teachers
)
# Less efficient: teacher might have many values
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.teacher), # Many teachers
Joiners.equal(lambda l: l.timeslot), # Then timeslot
)
3. Avoid Expensive Lambda Operations
# Good: Simple property access
Joiners.equal(lambda l: l.timeslot)
# Bad: Complex calculation in joiner
Joiners.equal(lambda l: calculate_complex_hash(l))
4. Use Cached Properties
@planning_entity
@dataclass
class Lesson:
# Pre-calculate expensive values
@cached_property
def combined_key(self):
return (self.timeslot, self.room)
# Use cached property in constraint
Joiners.equal(lambda l: l.combined_key)
Common Optimizations
Replace for_each + filter with for_each_unique_pair
# Before: Inefficient
factory.for_each(Lesson)
.join(Lesson)
.filter(lambda l1, l2: l1.id != l2.id and l1.timeslot == l2.timeslot)
# After: Efficient
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot)
)
Use if_exists() Instead of Join + group_by
# Before: Creates pairs then groups
factory.for_each(Employee)
.join(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))
.group_by(lambda e, s: e, ConstraintCollectors.count())
.filter(lambda e, count: count > 0)
# After: Just checks existence
factory.for_each(Employee)
.if_exists(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))
Avoid Redundant Constraints
# Redundant: Two constraints that overlap
def constraint1(factory):
# Penalizes A and B in same room
...
def constraint2(factory):
# Penalizes A and B in same room and same timeslot
... # This overlaps with constraint1!
# Better: One specific constraint
def room_conflict(factory):
# Only penalizes same room AND same timeslot
factory.for_each_unique_pair(
Lesson,
Joiners.equal(lambda l: l.timeslot),
Joiners.equal(lambda l: l.room),
)
Limit Collection Sizes in Collectors
# Bad: Collects everything
ConstraintCollectors.to_list(lambda s: s)
# Better: Collect only what's needed
ConstraintCollectors.to_list(lambda s: s.start_time)
# Best: Use aggregate if possible
ConstraintCollectors.count()
Incremental Score Calculation
SolverForge uses incremental score calculation—only recalculating affected constraints when a move is made. Help this work efficiently:
Keep Constraints Independent
# Good: Constraints don't share state
def room_conflict(factory):
return factory.for_each_unique_pair(...)
def teacher_conflict(factory):
return factory.for_each_unique_pair(...)
# Bad: Shared calculation affects both
shared_data = calculate_once() # Recalculated on every change!
Avoid Global State
# Bad: References external data
external_config = load_config()
def my_constraint(factory):
return factory.for_each(Lesson)
.filter(lambda l: l.priority > external_config.threshold) # External ref
Benchmarking Constraints
Enable Debug Logging
import logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)
Time Individual Constraints
import time
def timed_constraint(factory):
start = time.time()
result = actual_constraint(factory)
print(f"Constraint built in {time.time() - start:.3f}s")
return result
Use the Benchmarker
For systematic comparison, use the Benchmarker (see Benchmarking).
Score Corruption Detection
Enable environment mode for debugging:
from solverforge_legacy.solver.config import EnvironmentMode
SolverConfig(
environment_mode=EnvironmentMode.FULL_ASSERT, # Detects score corruption
...
)
Modes:
NON_REPRODUCIBLE - Fastest, no checksREPRODUCIBLE - Deterministic but no validationFAST_ASSERT - Quick validation checksFULL_ASSERT - Complete validation (slowest)
Use FULL_ASSERT during development, REPRODUCIBLE or NON_REPRODUCIBLE in production.
| Symptom | Likely Cause | Solution |
|---|
| Very slow start | Complex constraint building | Simplify or cache |
| Slow throughout | Filter instead of joiner | Use joiners |
| Memory issues | Large collections | Use aggregates |
| Score corruption | Incorrect incremental calc | Enable FULL_ASSERT |
Next Steps
7 - Testing Constraints
Test constraints in isolation for correctness.
Testing constraints ensures they behave correctly before integrating with the full solver. This catches bugs early and documents expected behavior.
Basic Constraint Testing
Test individual constraints with minimal data:
import pytest
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
from datetime import time
from my_app.domain import Timetable, Timeslot, Room, Lesson
from my_app.constraints import define_constraints
@pytest.fixture
def solution_manager():
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=1))
)
factory = SolverFactory.create(config)
return SolutionManager.create(factory)
def test_room_conflict(solution_manager):
"""Two lessons in the same room at the same time should penalize."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
# Two lessons in same room and timeslot
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should have -1 hard for the room conflict
assert analysis.score.hard_score == -1
def test_no_room_conflict(solution_manager):
"""Two lessons in different rooms should not conflict."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room_a = Room("Room A")
room_b = Room("Room B")
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room_a)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room_b)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room_a, room_b],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should have no hard constraint violations
assert analysis.score.hard_score == 0
Testing Constraint Weight
Verify the magnitude of penalties:
def test_teacher_room_stability_weight(solution_manager):
"""Teacher using multiple rooms should incur soft penalty per extra room."""
timeslot1 = Timeslot("MONDAY", time(8, 30), time(9, 30))
timeslot2 = Timeslot("MONDAY", time(9, 30), time(10, 30))
room_a = Room("Room A")
room_b = Room("Room B")
# Same teacher, different rooms
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot1, room_a)
lesson2 = Lesson("2", "Math", "Teacher A", "Group 2", timeslot2, room_b)
problem = Timetable(
id="test",
timeslots=[timeslot1, timeslot2],
rooms=[room_a, room_b],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should have soft penalty for room instability
assert analysis.score.soft_score < 0
# Verify specific constraint triggered
room_stability = next(
ca for ca in analysis.constraint_analyses()
if ca.constraint_name == "Teacher room stability"
)
assert room_stability.match_count == 1
Testing with Fixtures
Create reusable test fixtures:
@pytest.fixture
def timeslots():
return [
Timeslot("MONDAY", time(8, 30), time(9, 30)),
Timeslot("MONDAY", time(9, 30), time(10, 30)),
Timeslot("TUESDAY", time(8, 30), time(9, 30)),
]
@pytest.fixture
def rooms():
return [Room("A"), Room("B"), Room("C")]
@pytest.fixture
def empty_problem(timeslots, rooms):
return Timetable(
id="test",
timeslots=timeslots,
rooms=rooms,
lessons=[]
)
def test_empty_problem_is_feasible(solution_manager, empty_problem):
"""Empty problem should have zero score."""
analysis = solution_manager.analyze(empty_problem)
assert analysis.score == HardSoftScore.ZERO
Testing Edge Cases
Unassigned Variables
def test_unassigned_lesson(solution_manager):
"""Unassigned lessons should not cause conflicts."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
# One assigned, one not
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", None, None)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
# Should not have room conflict (lesson2 is unassigned)
assert analysis.score.hard_score == 0
Multiple Violations
def test_multiple_conflicts(solution_manager):
"""Three lessons in same room/time should create multiple conflicts."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
lesson1 = Lesson("1", "Math", "A", "G1", timeslot, room)
lesson2 = Lesson("2", "Physics", "B", "G2", timeslot, room)
lesson3 = Lesson("3", "Chemistry", "C", "G3", timeslot, room)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2, lesson3]
)
analysis = solution_manager.analyze(problem)
# 3 lessons create 3 unique pairs: (1,2), (1,3), (2,3)
assert analysis.score.hard_score == -3
Feasibility Testing
Test that the solver can find a feasible solution:
def test_feasible_solution():
"""Solver should find a feasible solution for small problems."""
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(seconds=5))
)
factory = SolverFactory.create(config)
solver = factory.build_solver()
problem = generate_small_problem()
solution = solver.solve(problem)
assert solution.score.is_feasible, f"Solution infeasible: {solution.score}"
Parameterized Tests
Test multiple scenarios efficiently:
@pytest.mark.parametrize("num_lessons,expected_conflicts", [
(1, 0), # Single lesson: no conflicts
(2, 1), # Two lessons: one pair
(3, 3), # Three lessons: three pairs
(4, 6), # Four lessons: six pairs
])
def test_all_in_same_room_timeslot(solution_manager, num_lessons, expected_conflicts):
"""n lessons in same room/time should create n*(n-1)/2 conflicts."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
lessons = [
Lesson(str(i), f"Subject{i}", f"Teacher{i}", "Group", timeslot, room)
for i in range(num_lessons)
]
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=lessons
)
analysis = solution_manager.analyze(problem)
assert analysis.score.hard_score == -expected_conflicts
Testing Justifications
def test_constraint_justification(solution_manager):
"""Constraint should provide meaningful justification."""
timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
room = Room("Room A")
lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)
problem = Timetable(
id="test",
timeslots=[timeslot],
rooms=[room],
lessons=[lesson1, lesson2]
)
analysis = solution_manager.analyze(problem)
room_conflict_ca = next(
ca for ca in analysis.constraint_analyses()
if ca.constraint_name == "Room conflict"
)
match = next(room_conflict_ca.matches())
assert "Room A" in str(match.justification)
assert "MONDAY" in str(match.justification)
Best Practices
Do
- Test each constraint in isolation
- Test both positive and negative cases
- Test edge cases (empty, unassigned, maximum)
- Use descriptive test names
Don’t
- Skip constraint testing
- Only test happy paths
- Use production data sizes in unit tests
- Ignore constraint weights
Next Steps