This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Constraints

Define constraints using the fluent Constraint Streams API.

Constraints define the rules that make a solution valid and optimal. SolverForge uses a fluent Constraint Streams API that lets you express constraints declaratively.

Topics

Constraint Types

TypePurposeExample
HardMust be satisfied for feasibilityNo two lessons in the same room at the same time
SoftPreferences to optimizeTeachers prefer consecutive lessons
MediumBetween hard and soft (optional)Important but not mandatory constraints

Example

from solverforge_legacy.solver.score import (
    constraint_provider, ConstraintFactory, Constraint, Joiners, HardSoftScore
)

@constraint_provider
def define_constraints(constraint_factory: ConstraintFactory) -> list[Constraint]:
    return [
        room_conflict(constraint_factory),
        teacher_conflict(constraint_factory),
        teacher_room_stability(constraint_factory),
    ]

def room_conflict(constraint_factory: ConstraintFactory) -> Constraint:
    # Hard constraint: No two lessons in the same room at the same time
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.timeslot),
            Joiners.equal(lambda lesson: lesson.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Room conflict")
    )

def teacher_room_stability(constraint_factory: ConstraintFactory) -> Constraint:
    # Soft constraint: Teachers prefer teaching in the same room
    return (
        constraint_factory
        .for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda lesson: lesson.teacher),
        )
        .filter(lambda lesson1, lesson2: lesson1.room != lesson2.room)
        .penalize(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher room stability")
    )

1 - Constraint Streams

Build constraints using the fluent Constraint Streams API.

The Constraint Streams API is a fluent, declarative way to define constraints. It’s inspired by Java Streams and SQL, allowing you to express complex scoring logic concisely.

Basic Structure

Every constraint follows this pattern:

from solverforge_legacy.solver.score import (
    constraint_provider, ConstraintFactory, Constraint, HardSoftScore
)

@constraint_provider
def define_constraints(factory: ConstraintFactory) -> list[Constraint]:
    return [
        my_constraint(factory),
    ]

def my_constraint(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(MyEntity)        # 1. Select entities
        .filter(lambda e: e.is_active)    # 2. Filter matches
        .penalize(HardSoftScore.ONE_HARD) # 3. Apply score impact
        .as_constraint("My constraint")   # 4. Name the constraint
    )

Stream Types

Streams are typed by the number of entities they carry:

Stream TypeEntitiesExample Use
UniConstraintStream1Single entity constraints
BiConstraintStream2Pair constraints
TriConstraintStream3Triple constraints
QuadConstraintStream4Quad constraints

Starting a Stream

for_each()

Start with all instances of an entity class:

factory.for_each(Lesson)
# Stream of: Lesson1, Lesson2, Lesson3, ...

for_each_unique_pair()

Get all unique pairs (no duplicates, no self-pairs):

factory.for_each_unique_pair(Lesson)
# Stream of: (L1,L2), (L1,L3), (L2,L3), ...
# NOT: (L1,L1), (L2,L1), ...

With joiners for efficient filtering:

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),
    Joiners.equal(lambda l: l.room),
)
# Only pairs with same timeslot AND same room

for_each_including_unassigned()

Include entities with unassigned planning variables:

factory.for_each_including_unassigned(Lesson)
# Includes lessons where timeslot=None or room=None

Filtering

filter()

Remove non-matching items:

factory.for_each(Lesson)
.filter(lambda lesson: lesson.teacher == "A. Turing")

For bi-streams:

factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.room != l2.room)

Joining

join()

Combine streams:

factory.for_each(Lesson)
.join(Room)
# BiStream of (Lesson, Room) for all combinations

With joiners:

factory.for_each(Lesson)
.join(
    Room,
    Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)
# BiStream of (Lesson, Room) where lesson.room == room

See Joiners for available joiner types.

if_exists() / if_not_exists()

Check for existence without creating pairs:

# Lessons that have at least one other lesson in the same room
factory.for_each(Lesson)
.if_exists(
    Lesson,
    Joiners.equal(lambda l: l.room),
    Joiners.filtering(lambda l1, l2: l1.id != l2.id)
)
# Employees not assigned to any shift
factory.for_each(Employee)
.if_not_exists(
    Shift,
    Joiners.equal(lambda emp: emp, lambda shift: shift.employee)
)

Grouping

group_by()

Aggregate entities:

from solverforge_legacy.solver.score import ConstraintCollectors

# Count lessons per teacher
factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.count()
)
# BiStream of (teacher, count)

Multiple collectors:

# Get count and list of lessons per teacher
factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.count(),
    ConstraintCollectors.to_list(lambda l: l)
)
# TriStream of (teacher, count, lesson_list)

See Collectors for available collector types.

Mapping

map()

Transform stream elements:

factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
# UniStream of teachers (with duplicates)

expand()

Add derived values:

factory.for_each(Lesson)
.expand(lambda lesson: lesson.duration_minutes)
# BiStream of (Lesson, duration)

distinct()

Remove duplicates:

factory.for_each(Lesson)
.map(lambda lesson: lesson.teacher)
.distinct()
# UniStream of unique teachers

Scoring

penalize()

Apply negative score for matches:

# Hard constraint
.penalize(HardSoftScore.ONE_HARD)

# Soft constraint
.penalize(HardSoftScore.ONE_SOFT)

# Dynamic weight
.penalize(HardSoftScore.ONE_SOFT, lambda lesson: lesson.priority)

reward()

Apply positive score for matches:

# Reward preferred assignments
.reward(HardSoftScore.ONE_SOFT, lambda lesson: lesson.preference_score)

impact()

Apply positive or negative score based on value:

# Positive values reward, negative values penalize
.impact(HardSoftScore.ONE_SOFT, lambda l: l.score_impact)

Finalizing

as_constraint()

Name the constraint (required):

.as_constraint("Room conflict")

justify_with()

Add custom justification for score explanation:

.penalize(HardSoftScore.ONE_HARD)
.justify_with(lambda l1, l2, score: RoomConflictJustification(l1, l2, score))
.as_constraint("Room conflict")

indict_with()

Specify which entities to blame:

.penalize(HardSoftScore.ONE_HARD)
.indict_with(lambda l1, l2: [l1, l2])
.as_constraint("Room conflict")

Complete Examples

Room Conflict (Hard)

def room_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda l: l.timeslot),
            Joiners.equal(lambda l: l.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Room conflict")
    )

Teacher Room Stability (Soft)

def teacher_room_stability(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda l: l.teacher)
        )
        .filter(lambda l1, l2: l1.room != l2.room)
        .penalize(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher room stability")
    )

Balance Workload (Soft)

def balance_workload(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Shift)
        .group_by(
            lambda shift: shift.employee,
            ConstraintCollectors.count()
        )
        .filter(lambda employee, count: count > 5)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda employee, count: count - 5  # Penalize excess shifts
        )
        .as_constraint("Balance workload")
    )

Best Practices

Do

  • Use joiners in for_each_unique_pair() for efficiency
  • Name constraints descriptively
  • Break complex constraints into helper functions

Don’t

  • Use filter() when a joiner would work (less efficient)
  • Create overly complex single constraints (split them)
  • Forget to call as_constraint()

Next Steps

2 - Joiners

Efficiently filter and match entities in constraint streams.

Joiners efficiently filter pairs of entities during joins and unique pair operations. They’re more efficient than post-join filtering because they use indexing.

Basic Usage

from solverforge_legacy.solver.score import Joiners

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda lesson: lesson.timeslot),
    Joiners.equal(lambda lesson: lesson.room),
)

Multiple joiners are combined with AND logic.

Available Joiners

equal()

Match when property values are equal:

# Same timeslot
Joiners.equal(lambda lesson: lesson.timeslot)

# In a join, specify both sides
factory.for_each(Lesson).join(
    Room,
    Joiners.equal(lambda lesson: lesson.room, lambda room: room)
)

less_than() / less_than_or_equal()

Match when first value is less than second:

# l1.priority < l2.priority
Joiners.less_than(lambda lesson: lesson.priority)

# l1.start_time <= l2.start_time
Joiners.less_than_or_equal(lambda lesson: lesson.start_time)

greater_than() / greater_than_or_equal()

Match when first value is greater than second:

# l1.priority > l2.priority
Joiners.greater_than(lambda lesson: lesson.priority)

# l1.end_time >= l2.end_time
Joiners.greater_than_or_equal(lambda lesson: lesson.end_time)

overlapping()

Match when ranges overlap:

# Time overlap: [start1, end1) overlaps [start2, end2)
Joiners.overlapping(
    lambda l: l.start_time,   # Start of range 1
    lambda l: l.end_time,     # End of range 1
    lambda l: l.start_time,   # Start of range 2
    lambda l: l.end_time,     # End of range 2
)

For a join between different types:

factory.for_each(Meeting).join(
    Availability,
    Joiners.overlapping(
        lambda m: m.start_time,
        lambda m: m.end_time,
        lambda a: a.start_time,
        lambda a: a.end_time,
    )
)

filtering()

Custom filter function (less efficient, use as last resort):

# Custom logic that can't be expressed with other joiners
Joiners.filtering(lambda l1, l2: l1.is_compatible_with(l2))

Combining Joiners

Joiners are combined with AND:

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),    # Same timeslot AND
    Joiners.equal(lambda l: l.room),         # Same room
)

Performance Considerations

Index-Based Joiners (Preferred)

These joiners use internal indexes for O(1) or O(log n) lookup:

  • equal() - Hash index
  • less_than(), greater_than() - Tree index
  • overlapping() - Interval tree

Filtering Joiner (Slower)

filtering() checks every pair, O(n²):

# Avoid when possible - checks all pairs
Joiners.filtering(lambda l1, l2: some_complex_check(l1, l2))

Optimization Tips

Good: Index joiners first, filtering last:

factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),       # Index first
    Joiners.filtering(lambda l1, l2: custom(l1, l2))  # Filter remaining
)

Bad: Only filtering (checks all pairs):

factory.for_each_unique_pair(
    Lesson,
    Joiners.filtering(lambda l1, l2: l1.timeslot == l2.timeslot and custom(l1, l2))
)

Examples

Time Conflict Detection

def time_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Shift,
            Joiners.equal(lambda s: s.employee),
            Joiners.overlapping(
                lambda s: s.start_time,
                lambda s: s.end_time,
                lambda s: s.start_time,
                lambda s: s.end_time,
            ),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Employee time conflict")
    )

Same Day Sequential

def same_day_sequential(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Lesson)
        .join(
            Lesson,
            Joiners.equal(lambda l: l.teacher),
            Joiners.equal(lambda l: l.timeslot.day_of_week),
            Joiners.less_than(lambda l: l.timeslot.start_time),
            Joiners.filtering(lambda l1, l2:
                (l2.timeslot.start_time - l1.timeslot.end_time).seconds <= 1800
            ),
        )
        .reward(HardSoftScore.ONE_SOFT)
        .as_constraint("Teacher consecutive lessons")
    )

Resource Assignment

def resource_assignment(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Task)
        .join(
            Resource,
            Joiners.equal(lambda t: t.required_skill, lambda r: r.skill),
            Joiners.greater_than_or_equal(lambda t: t.priority, lambda r: r.min_priority),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Resource skill match")
    )

Joiner vs Filter

Use Joiner WhenUse Filter When
Checking equalityComplex logic
Comparing valuesMultiple conditions with OR
Range overlapCalling methods on entities
Performance mattersSimple one-off checks

Next Steps

3 - Collectors

Aggregate data in constraint streams using collectors.

Collectors aggregate data when grouping entities. They’re used with group_by() to compute counts, sums, lists, and other aggregations.

Basic Usage

from solverforge_legacy.solver.score import ConstraintCollectors

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,       # Group key
    ConstraintCollectors.count()        # Collector
)
# Result: BiStream of (employee, count)

Available Collectors

count()

Count items in each group:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.count()
)
# (Employee, int)

count_distinct()

Count unique values:

factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.count_distinct(lambda l: l.room)
)
# (Teacher, number of distinct rooms)

sum()

Sum numeric values:

factory.for_each(Visit)
.group_by(
    lambda visit: visit.vehicle,
    ConstraintCollectors.sum(lambda v: v.demand)
)
# (Vehicle, total demand)

min() / max()

Find minimum or maximum:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.min(lambda s: s.start_time)
)
# (Employee, earliest start time)

With comparator:

ConstraintCollectors.max(
    lambda shift: shift,
    key=lambda s: s.priority
)
# Returns the shift with highest priority

average()

Calculate average:

factory.for_each(Task)
.group_by(
    lambda task: task.worker,
    ConstraintCollectors.average(lambda t: t.duration)
)
# (Worker, average task duration)

to_list()

Collect into a list:

factory.for_each(Visit)
.group_by(
    lambda visit: visit.vehicle,
    ConstraintCollectors.to_list(lambda v: v)
)
# (Vehicle, list of visits)

to_set()

Collect into a set (unique values):

factory.for_each(Lesson)
.group_by(
    lambda lesson: lesson.teacher,
    ConstraintCollectors.to_set(lambda l: l.room)
)
# (Teacher, set of rooms)

to_sorted_set()

Collect into a sorted set:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.to_sorted_set(lambda s: s.start_time)
)
# (Employee, sorted set of start times)

compose()

Combine multiple collectors:

ConstraintCollectors.compose(
    ConstraintCollectors.count(),
    ConstraintCollectors.sum(lambda s: s.hours),
    lambda count, total_hours: (count, total_hours)
)
# Returns (count, sum) tuple

conditional()

Collect only matching items:

ConstraintCollectors.conditional(
    lambda shift: shift.is_night,
    ConstraintCollectors.count()
)
# Count only night shifts

Multiple Collectors

Use multiple collectors in one group_by:

factory.for_each(Shift)
.group_by(
    lambda shift: shift.employee,
    ConstraintCollectors.count(),
    ConstraintCollectors.sum(lambda s: s.hours),
    ConstraintCollectors.min(lambda s: s.start_time),
)
# QuadStream: (Employee, count, total_hours, earliest_start)

Grouping Patterns

Count Per Category

def balance_shift_count(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Shift)
        .group_by(
            lambda shift: shift.employee,
            ConstraintCollectors.count()
        )
        .filter(lambda employee, count: count > 5)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda employee, count: (count - 5) ** 2
        )
        .as_constraint("Balance shift count")
    )

Sum with Threshold

def vehicle_capacity(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Visit)
        .group_by(
            lambda visit: visit.vehicle,
            ConstraintCollectors.sum(lambda v: v.demand)
        )
        .filter(lambda vehicle, total: total > vehicle.capacity)
        .penalize(
            HardSoftScore.ONE_HARD,
            lambda vehicle, total: total - vehicle.capacity
        )
        .as_constraint("Vehicle capacity")
    )

Load Distribution

def fair_distribution(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Task)
        .group_by(
            lambda task: task.worker,
            ConstraintCollectors.count()
        )
        .group_by(
            ConstraintCollectors.min(lambda worker, count: count),
            ConstraintCollectors.max(lambda worker, count: count),
        )
        .filter(lambda min_count, max_count: max_count - min_count > 2)
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda min_count, max_count: max_count - min_count
        )
        .as_constraint("Fair task distribution")
    )

Consecutive Detection

def consecutive_shifts(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Shift)
        .group_by(
            lambda shift: shift.employee,
            ConstraintCollectors.to_sorted_set(lambda s: s.date)
        )
        .filter(lambda employee, dates: has_consecutive_days(dates, 6))
        .penalize(HardSoftScore.ONE_HARD)
        .as_constraint("Max consecutive days")
    )

def has_consecutive_days(dates: set, max_consecutive: int) -> bool:
    sorted_dates = sorted(dates)
    consecutive = 1
    for i in range(1, len(sorted_dates)):
        if (sorted_dates[i] - sorted_dates[i-1]).days == 1:
            consecutive += 1
            if consecutive > max_consecutive:
                return True
        else:
            consecutive = 1
    return False

Performance Tips

Prefer count() over to_list()

# Good: Efficient counting
ConstraintCollectors.count()

# Avoid: Creates list just to count
ConstraintCollectors.to_list(lambda x: x).map(len)

Use conditional() for Filtered Counts

# Good: Single pass
ConstraintCollectors.conditional(
    lambda s: s.is_weekend,
    ConstraintCollectors.count()
)

# Avoid: Filter then count
factory.for_each(Shift)
.filter(lambda s: s.is_weekend)
.group_by(...)

Minimize Data in Collectors

# Good: Collect only needed data
ConstraintCollectors.to_list(lambda s: s.start_time)

# Avoid: Collect entire objects
ConstraintCollectors.to_list(lambda s: s)

Next Steps

4 - Score Types

Choose the right score type for your constraints.

Score types determine how constraint violations and rewards are measured. Choose the type that matches your problem’s structure.

Available Score Types

Score TypeLevelsUse Case
SimpleScore1Single optimization objective
HardSoftScore2Feasibility + optimization
HardMediumSoftScore3Hard + important + nice-to-have
BendableScoreNCustom number of levels
*DecimalScore variants-Decimal precision

SimpleScore

For single-objective optimization:

from solverforge_legacy.solver.score import SimpleScore

# In domain model
score: Annotated[SimpleScore, PlanningScore] = field(default=None)

# In constraints
.penalize(SimpleScore.ONE)
.reward(SimpleScore.of(10))

Use when: You only need to maximize or minimize one thing (e.g., total profit, total distance).

HardSoftScore

The most common type—separates feasibility from optimization:

from solverforge_legacy.solver.score import HardSoftScore

# In domain model
score: Annotated[HardSoftScore, PlanningScore] = field(default=None)

# In constraints
.penalize(HardSoftScore.ONE_HARD)     # Broken constraint
.penalize(HardSoftScore.ONE_SOFT)     # Suboptimal
.penalize(HardSoftScore.of_hard(5))   # Weighted hard
.penalize(HardSoftScore.of_soft(10))  # Weighted soft

Hard constraints:

  • Must be satisfied for a feasible solution
  • Score format: Xhard/Ysoft
  • 0hard/*soft = feasible

Soft constraints:

  • Preferences to optimize
  • Better soft scores are preferred among feasible solutions

Use when: You have rules that must be followed AND preferences to optimize.

HardMediumSoftScore

Three levels of priority:

from solverforge_legacy.solver.score import HardMediumSoftScore

# In domain model
score: Annotated[HardMediumSoftScore, PlanningScore] = field(default=None)

# In constraints
.penalize(HardMediumSoftScore.ONE_HARD)    # Must satisfy
.penalize(HardMediumSoftScore.ONE_MEDIUM)  # Important preference
.penalize(HardMediumSoftScore.ONE_SOFT)    # Nice to have

Use when:

  • Medium = “Assign as many as possible”
  • Medium = “Important but not mandatory”
  • Medium = “Prefer over soft, but not as critical as hard”

Example: Meeting scheduling where:

  • Hard: Required attendees must be available
  • Medium: Preferred attendees should attend
  • Soft: Room size preferences

BendableScore

Custom number of hard and soft levels:

from solverforge_legacy.solver.score import BendableScore

# Configure levels (3 hard, 2 soft)
score: Annotated[BendableScore, PlanningScore] = field(default=None)

# In constraints
.penalize(BendableScore.of_hard(0, 1))   # First hard level
.penalize(BendableScore.of_hard(1, 1))   # Second hard level
.penalize(BendableScore.of_soft(0, 1))   # First soft level

Use when: You need more than 3 priority levels.

Decimal Score Variants

For precise calculations:

from solverforge_legacy.solver.score import HardSoftDecimalScore

score: Annotated[HardSoftDecimalScore, PlanningScore] = field(default=None)

# In constraints
from decimal import Decimal
.penalize(HardSoftDecimalScore.of_soft(Decimal("0.01")))

Available variants:

  • SimpleDecimalScore
  • HardSoftDecimalScore
  • HardMediumSoftDecimalScore
  • BendableDecimalScore

Use when: Integer scores aren’t precise enough (e.g., money, distances).

Score Constants

Common score values are predefined:

# SimpleScore
SimpleScore.ZERO
SimpleScore.ONE
SimpleScore.of(n)

# HardSoftScore
HardSoftScore.ZERO
HardSoftScore.ONE_HARD
HardSoftScore.ONE_SOFT
HardSoftScore.of_hard(n)
HardSoftScore.of_soft(n)
HardSoftScore.of(hard, soft)

# HardMediumSoftScore
HardMediumSoftScore.ZERO
HardMediumSoftScore.ONE_HARD
HardMediumSoftScore.ONE_MEDIUM
HardMediumSoftScore.ONE_SOFT
HardMediumSoftScore.of(hard, medium, soft)

Dynamic Weights

Apply weights based on entity properties:

def weighted_penalty(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each(Task)
        .filter(lambda t: t.is_late())
        .penalize(
            HardSoftScore.ONE_SOFT,
            lambda task: task.priority  # High priority = bigger penalty
        )
        .as_constraint("Late task")
    )

Score Comparison

Scores are compared level by level:

# Hard first, then soft
0hard/-100soft > -1hard/0soft    (first is feasible)
-1hard/-50soft > -2hard/-10soft  (first has better hard)
0hard/-50soft > 0hard/-100soft   (same hard, better soft)

Score Properties

score = HardSoftScore.of(-2, -100)

score.is_feasible          # False (hard < 0)
score.hard_score           # -2
score.soft_score           # -100
str(score)                 # "-2hard/-100soft"

HardSoftScore.parse("-2hard/-100soft")  # Parse from string

Choosing a Score Type

QuestionRecommendation
Need feasibility check?Use HardSoftScore
Single objective only?Use SimpleScore
“Assign as many as possible”?Use HardMediumSoftScore
More than 3 priority levels?Use BendableScore
Need decimal precision?Use *DecimalScore variant

Best Practices

Do

  • Use HardSoftScore as default choice
  • Keep hard constraints truly hard (legal requirements, physical limits)
  • Use consistent weight scales within each level

Don’t

  • Use medium level for actual hard constraints
  • Over-complicate with BendableScore when HardMediumSoftScore works
  • Mix units in the same level (e.g., minutes and dollars)

Next Steps

5 - Score Analysis

Understand why a solution has its score.

Score analysis helps you understand why a solution received its score. This is essential for debugging constraints and explaining results to users.

SolutionManager

Use SolutionManager to analyze solutions:

from solverforge_legacy.solver import SolverFactory, SolutionManager

solver_factory = SolverFactory.create(solver_config)
solution_manager = SolutionManager.create(solver_factory)

# Analyze a solution
analysis = solution_manager.analyze(solution)

Score Explanation

Get a breakdown of constraint scores:

analysis = solution_manager.analyze(solution)

# Overall score
print(f"Score: {analysis.score}")

# Per-constraint breakdown
for constraint_analysis in analysis.constraint_analyses():
    print(f"{constraint_analysis.constraint_name}: {constraint_analysis.score}")
    print(f"  Match count: {constraint_analysis.match_count}")

Example output:

Score: -2hard/-15soft
Room conflict: -2hard
  Match count: 2
Teacher room stability: -10soft
  Match count: 10
Teacher time efficiency: -5soft
  Match count: 5

Constraint Matches

See exactly which entities triggered each constraint:

for constraint_analysis in analysis.constraint_analyses():
    print(f"\n{constraint_analysis.constraint_name}:")
    for match in constraint_analysis.matches():
        print(f"  Match: {match.justification}")
        print(f"  Score: {match.score}")

Indictments

Find which entities are causing problems:

# Get indictments (entities blamed for score impact)
for indictment in analysis.indictments():
    print(f"\nEntity: {indictment.indicted_object}")
    print(f"Total score impact: {indictment.score}")
    for match in indictment.matches():
        print(f"  - {match.constraint_name}: {match.score}")

Example output:

Entity: Lesson(id=1, subject='Math')
Total score impact: -1hard/-3soft
  - Room conflict: -1hard
  - Teacher room stability: -3soft

Custom Justifications

Add explanations to your constraints:

@dataclass
class RoomConflictJustification:
    lesson1: Lesson
    lesson2: Lesson
    timeslot: Timeslot
    room: Room

    def __str__(self):
        return (f"{self.lesson1.subject} and {self.lesson2.subject} "
                f"both scheduled in {self.room} at {self.timeslot}")

def room_conflict(factory: ConstraintFactory) -> Constraint:
    return (
        factory.for_each_unique_pair(
            Lesson,
            Joiners.equal(lambda l: l.timeslot),
            Joiners.equal(lambda l: l.room),
        )
        .penalize(HardSoftScore.ONE_HARD)
        .justify_with(lambda l1, l2, score: RoomConflictJustification(
            l1, l2, l1.timeslot, l1.room
        ))
        .as_constraint("Room conflict")
    )

Debugging Constraints

Verify Score Calculation

# Calculate score without solving
score = solution_manager.update(solution)
print(f"Calculated score: {score}")

Find Missing Constraints

If a constraint isn’t firing when expected:

# Check if specific entities match
for constraint_analysis in analysis.constraint_analyses():
    if constraint_analysis.constraint_name == "Room conflict":
        if constraint_analysis.match_count == 0:
            print("No room conflicts detected!")
            # Check your joiners and filters

Verify Feasibility

if not solution.score.is_feasible:
    print("Solution is infeasible!")
    for ca in analysis.constraint_analyses():
        if ca.score.hard_score < 0:
            print(f"Hard constraint broken: {ca.constraint_name}")
            for match in ca.matches():
                print(f"  {match.justification}")

Integration with FastAPI

Expose score analysis in your API:

from fastapi import FastAPI

@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
    solution = solutions.get(job_id)
    if not solution:
        raise HTTPException(404, "Job not found")

    analysis = solution_manager.analyze(solution)

    return {
        "score": str(analysis.score),
        "is_feasible": analysis.score.is_feasible,
        "constraints": [
            {
                "name": ca.constraint_name,
                "score": str(ca.score),
                "match_count": ca.match_count,
            }
            for ca in analysis.constraint_analyses()
        ]
    }

Best Practices

Do

  • Use justify_with() for user-facing explanations
  • Check score analysis when debugging constraints
  • Expose score breakdown in your UI

Don’t

  • Analyze every solution during solving (performance)
  • Ignore indictments when troubleshooting
  • Forget to handle infeasible solutions

Score Comparison

Compare two solutions:

def compare_solutions(old: Timetable, new: Timetable):
    old_analysis = solution_manager.analyze(old)
    new_analysis = solution_manager.analyze(new)

    print(f"Score improved: {old.score} -> {new.score}")

    old_constraints = {ca.constraint_name: ca for ca in old_analysis.constraint_analyses()}
    new_constraints = {ca.constraint_name: ca for ca in new_analysis.constraint_analyses()}

    for name in old_constraints:
        old_ca = old_constraints[name]
        new_ca = new_constraints.get(name)
        if new_ca and old_ca.score != new_ca.score:
            print(f"  {name}: {old_ca.score} -> {new_ca.score}")

Next Steps

6 - Constraint Performance

Optimize constraint evaluation for faster solving.

Efficient constraint evaluation is critical for solver performance. Most solving time is spent calculating scores, so optimizing constraints has a direct impact on solution quality.

Performance Principles

1. Use Joiners Instead of Filters

Joiners use indexes for O(1) lookups. Filters check every item.

# Good: Uses index
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot)
)

# Bad: Checks all pairs
factory.for_each_unique_pair(Lesson)
.filter(lambda l1, l2: l1.timeslot == l2.timeslot)

2. Put Selective Joiners First

More selective joiners reduce the search space faster:

# Good: timeslot has few values, filters early
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot),  # Few timeslots
    Joiners.equal(lambda l: l.teacher),    # More teachers
)

# Less efficient: teacher might have many values
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.teacher),    # Many teachers
    Joiners.equal(lambda l: l.timeslot),   # Then timeslot
)

3. Avoid Expensive Lambda Operations

# Good: Simple property access
Joiners.equal(lambda l: l.timeslot)

# Bad: Complex calculation in joiner
Joiners.equal(lambda l: calculate_complex_hash(l))

4. Use Cached Properties

@planning_entity
@dataclass
class Lesson:
    # Pre-calculate expensive values
    @cached_property
    def combined_key(self):
        return (self.timeslot, self.room)

# Use cached property in constraint
Joiners.equal(lambda l: l.combined_key)

Common Optimizations

Replace for_each + filter with for_each_unique_pair

# Before: Inefficient
factory.for_each(Lesson)
.join(Lesson)
.filter(lambda l1, l2: l1.id != l2.id and l1.timeslot == l2.timeslot)

# After: Efficient
factory.for_each_unique_pair(
    Lesson,
    Joiners.equal(lambda l: l.timeslot)
)

Use if_exists() Instead of Join + group_by

# Before: Creates pairs then groups
factory.for_each(Employee)
.join(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))
.group_by(lambda e, s: e, ConstraintCollectors.count())
.filter(lambda e, count: count > 0)

# After: Just checks existence
factory.for_each(Employee)
.if_exists(Shift, Joiners.equal(lambda e: e, lambda s: s.employee))

Avoid Redundant Constraints

# Redundant: Two constraints that overlap
def constraint1(factory):
    # Penalizes A and B in same room
    ...

def constraint2(factory):
    # Penalizes A and B in same room and same timeslot
    ...  # This overlaps with constraint1!

# Better: One specific constraint
def room_conflict(factory):
    # Only penalizes same room AND same timeslot
    factory.for_each_unique_pair(
        Lesson,
        Joiners.equal(lambda l: l.timeslot),
        Joiners.equal(lambda l: l.room),
    )

Limit Collection Sizes in Collectors

# Bad: Collects everything
ConstraintCollectors.to_list(lambda s: s)

# Better: Collect only what's needed
ConstraintCollectors.to_list(lambda s: s.start_time)

# Best: Use aggregate if possible
ConstraintCollectors.count()

Incremental Score Calculation

SolverForge uses incremental score calculation—only recalculating affected constraints when a move is made. Help this work efficiently:

Keep Constraints Independent

# Good: Constraints don't share state
def room_conflict(factory):
    return factory.for_each_unique_pair(...)

def teacher_conflict(factory):
    return factory.for_each_unique_pair(...)

# Bad: Shared calculation affects both
shared_data = calculate_once()  # Recalculated on every change!

Avoid Global State

# Bad: References external data
external_config = load_config()

def my_constraint(factory):
    return factory.for_each(Lesson)
    .filter(lambda l: l.priority > external_config.threshold)  # External ref

Benchmarking Constraints

Enable Debug Logging

import logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)

Time Individual Constraints

import time

def timed_constraint(factory):
    start = time.time()
    result = actual_constraint(factory)
    print(f"Constraint built in {time.time() - start:.3f}s")
    return result

Use the Benchmarker

For systematic comparison, use the Benchmarker (see Benchmarking).

Score Corruption Detection

Enable environment mode for debugging:

from solverforge_legacy.solver.config import EnvironmentMode

SolverConfig(
    environment_mode=EnvironmentMode.FULL_ASSERT,  # Detects score corruption
    ...
)

Modes:

  • NON_REPRODUCIBLE - Fastest, no checks
  • REPRODUCIBLE - Deterministic but no validation
  • FAST_ASSERT - Quick validation checks
  • FULL_ASSERT - Complete validation (slowest)

Use FULL_ASSERT during development, REPRODUCIBLE or NON_REPRODUCIBLE in production.

Common Performance Issues

SymptomLikely CauseSolution
Very slow startComplex constraint buildingSimplify or cache
Slow throughoutFilter instead of joinerUse joiners
Memory issuesLarge collectionsUse aggregates
Score corruptionIncorrect incremental calcEnable FULL_ASSERT

Next Steps

7 - Testing Constraints

Test constraints in isolation for correctness.

Testing constraints ensures they behave correctly before integrating with the full solver. This catches bugs early and documents expected behavior.

Basic Constraint Testing

Test individual constraints with minimal data:

import pytest
from solverforge_legacy.solver import SolverFactory
from solverforge_legacy.solver.config import (
    SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
from datetime import time

from my_app.domain import Timetable, Timeslot, Room, Lesson
from my_app.constraints import define_constraints


@pytest.fixture
def solution_manager():
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(spent_limit=Duration(seconds=1))
    )
    factory = SolverFactory.create(config)
    return SolutionManager.create(factory)


def test_room_conflict(solution_manager):
    """Two lessons in the same room at the same time should penalize."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    # Two lessons in same room and timeslot
    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should have -1 hard for the room conflict
    assert analysis.score.hard_score == -1


def test_no_room_conflict(solution_manager):
    """Two lessons in different rooms should not conflict."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room_a = Room("Room A")
    room_b = Room("Room B")

    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room_a)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room_b)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room_a, room_b],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should have no hard constraint violations
    assert analysis.score.hard_score == 0

Testing Constraint Weight

Verify the magnitude of penalties:

def test_teacher_room_stability_weight(solution_manager):
    """Teacher using multiple rooms should incur soft penalty per extra room."""
    timeslot1 = Timeslot("MONDAY", time(8, 30), time(9, 30))
    timeslot2 = Timeslot("MONDAY", time(9, 30), time(10, 30))
    room_a = Room("Room A")
    room_b = Room("Room B")

    # Same teacher, different rooms
    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot1, room_a)
    lesson2 = Lesson("2", "Math", "Teacher A", "Group 2", timeslot2, room_b)

    problem = Timetable(
        id="test",
        timeslots=[timeslot1, timeslot2],
        rooms=[room_a, room_b],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should have soft penalty for room instability
    assert analysis.score.soft_score < 0

    # Verify specific constraint triggered
    room_stability = next(
        ca for ca in analysis.constraint_analyses()
        if ca.constraint_name == "Teacher room stability"
    )
    assert room_stability.match_count == 1

Testing with Fixtures

Create reusable test fixtures:

@pytest.fixture
def timeslots():
    return [
        Timeslot("MONDAY", time(8, 30), time(9, 30)),
        Timeslot("MONDAY", time(9, 30), time(10, 30)),
        Timeslot("TUESDAY", time(8, 30), time(9, 30)),
    ]


@pytest.fixture
def rooms():
    return [Room("A"), Room("B"), Room("C")]


@pytest.fixture
def empty_problem(timeslots, rooms):
    return Timetable(
        id="test",
        timeslots=timeslots,
        rooms=rooms,
        lessons=[]
    )


def test_empty_problem_is_feasible(solution_manager, empty_problem):
    """Empty problem should have zero score."""
    analysis = solution_manager.analyze(empty_problem)
    assert analysis.score == HardSoftScore.ZERO

Testing Edge Cases

Unassigned Variables

def test_unassigned_lesson(solution_manager):
    """Unassigned lessons should not cause conflicts."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    # One assigned, one not
    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", None, None)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    # Should not have room conflict (lesson2 is unassigned)
    assert analysis.score.hard_score == 0

Multiple Violations

def test_multiple_conflicts(solution_manager):
    """Three lessons in same room/time should create multiple conflicts."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    lesson1 = Lesson("1", "Math", "A", "G1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "B", "G2", timeslot, room)
    lesson3 = Lesson("3", "Chemistry", "C", "G3", timeslot, room)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2, lesson3]
    )

    analysis = solution_manager.analyze(problem)

    # 3 lessons create 3 unique pairs: (1,2), (1,3), (2,3)
    assert analysis.score.hard_score == -3

Feasibility Testing

Test that the solver can find a feasible solution:

def test_feasible_solution():
    """Solver should find a feasible solution for small problems."""
    config = SolverConfig(
        solution_class=Timetable,
        entity_class_list=[Lesson],
        score_director_factory_config=ScoreDirectorFactoryConfig(
            constraint_provider_function=define_constraints
        ),
        termination_config=TerminationConfig(spent_limit=Duration(seconds=5))
    )

    factory = SolverFactory.create(config)
    solver = factory.build_solver()

    problem = generate_small_problem()
    solution = solver.solve(problem)

    assert solution.score.is_feasible, f"Solution infeasible: {solution.score}"

Parameterized Tests

Test multiple scenarios efficiently:

@pytest.mark.parametrize("num_lessons,expected_conflicts", [
    (1, 0),  # Single lesson: no conflicts
    (2, 1),  # Two lessons: one pair
    (3, 3),  # Three lessons: three pairs
    (4, 6),  # Four lessons: six pairs
])
def test_all_in_same_room_timeslot(solution_manager, num_lessons, expected_conflicts):
    """n lessons in same room/time should create n*(n-1)/2 conflicts."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    lessons = [
        Lesson(str(i), f"Subject{i}", f"Teacher{i}", "Group", timeslot, room)
        for i in range(num_lessons)
    ]

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=lessons
    )

    analysis = solution_manager.analyze(problem)
    assert analysis.score.hard_score == -expected_conflicts

Testing Justifications

def test_constraint_justification(solution_manager):
    """Constraint should provide meaningful justification."""
    timeslot = Timeslot("MONDAY", time(8, 30), time(9, 30))
    room = Room("Room A")

    lesson1 = Lesson("1", "Math", "Teacher A", "Group 1", timeslot, room)
    lesson2 = Lesson("2", "Physics", "Teacher B", "Group 2", timeslot, room)

    problem = Timetable(
        id="test",
        timeslots=[timeslot],
        rooms=[room],
        lessons=[lesson1, lesson2]
    )

    analysis = solution_manager.analyze(problem)

    room_conflict_ca = next(
        ca for ca in analysis.constraint_analyses()
        if ca.constraint_name == "Room conflict"
    )

    match = next(room_conflict_ca.matches())
    assert "Room A" in str(match.justification)
    assert "MONDAY" in str(match.justification)

Best Practices

Do

  • Test each constraint in isolation
  • Test both positive and negative cases
  • Test edge cases (empty, unassigned, maximum)
  • Use descriptive test names

Don’t

  • Skip constraint testing
  • Only test happy paths
  • Use production data sizes in unit tests
  • Ignore constraint weights

Next Steps