This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Design Patterns

Common patterns for handling real-world planning scenarios.

Real-world planning problems often require more than basic optimization. This section covers patterns for common scenarios.

Topics

Real-Time Planning

Handle dynamic changes during solving:

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange[Timetable]):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, working_solution: Timetable, score_director):
        # Add the new lesson to the working solution
        working_solution.lessons.append(self.lesson)
        score_director.after_entity_added(self.lesson)

# Apply change while solver is running
solver.add_problem_change(AddLessonChange(new_lesson))

Continuous Planning

For problems that span long time periods, use a rolling horizon:

  1. Plan Window - Only optimize a subset of the timeline
  2. Published Window - Lock decisions that are being executed
  3. Draft Window - Future decisions that can still change

When to Use These Patterns

ScenarioPattern
New orders arrive during planningReal-Time Planning
Plan extends into the futureContinuous Planning
Daily/weekly batch optimizationRepeated Planning
Vehicle breakdowns, cancellationsReal-Time Planning
Rolling weekly schedulesContinuous Planning

1 - Real-Time Planning

Handle changes while the solver is running.

Real-time planning allows you to modify the problem while the solver is running. This is essential for handling dynamic changes like new orders, cancellations, or resource changes.

Problem Changes

Use ProblemChange to modify the working solution:

from solverforge_legacy.solver import ProblemChange

class AddLessonChange(ProblemChange[Timetable]):
    def __init__(self, lesson: Lesson):
        self.lesson = lesson

    def do_change(self, working_solution: Timetable, score_director):
        # Add to solution
        working_solution.lessons.append(self.lesson)
        # Notify score director
        score_director.after_entity_added(self.lesson)

Applying Changes

# With Solver
new_lesson = Lesson("new-1", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))

# With SolverManager
solver_manager.add_problem_change(job_id, AddLessonChange(new_lesson))

Common Change Types

Add Entity

class AddVisitChange(ProblemChange[RoutePlan]):
    def __init__(self, visit: Visit):
        self.visit = visit

    def do_change(self, solution: RoutePlan, score_director):
        solution.visits.append(self.visit)
        score_director.after_entity_added(self.visit)

Remove Entity

class RemoveVisitChange(ProblemChange[RoutePlan]):
    def __init__(self, visit_id: str):
        self.visit_id = visit_id

    def do_change(self, solution: RoutePlan, score_director):
        visit = next(v for v in solution.visits if v.id == self.visit_id)

        # Remove from vehicle if assigned
        if visit.vehicle:
            score_director.before_list_variable_changed(
                visit.vehicle, "visits", visit.vehicle.visits
            )
            visit.vehicle.visits.remove(visit)
            score_director.after_list_variable_changed(
                visit.vehicle, "visits", visit.vehicle.visits
            )

        # Remove from solution
        score_director.before_entity_removed(visit)
        solution.visits.remove(visit)
        score_director.after_entity_removed(visit)

Modify Entity

class UpdateVisitDemandChange(ProblemChange[RoutePlan]):
    def __init__(self, visit_id: str, new_demand: int):
        self.visit_id = visit_id
        self.new_demand = new_demand

    def do_change(self, solution: RoutePlan, score_director):
        visit = next(v for v in solution.visits if v.id == self.visit_id)

        score_director.before_problem_property_changed(visit)
        visit.demand = self.new_demand
        score_director.after_problem_property_changed(visit)

Add Problem Fact

class AddVehicleChange(ProblemChange[RoutePlan]):
    def __init__(self, vehicle: Vehicle):
        self.vehicle = vehicle

    def do_change(self, solution: RoutePlan, score_director):
        solution.vehicles.append(self.vehicle)
        score_director.after_problem_fact_added(self.vehicle)

Score Director Notifications

Always notify the score director of changes:

MethodWhen to Use
after_entity_added()Added planning entity
before/after_entity_removed()Removing planning entity
before/after_variable_changed()Changed planning variable
before/after_list_variable_changed()Changed list variable
before/after_problem_property_changed()Changed entity property
after_problem_fact_added()Added problem fact
before/after_problem_fact_removed()Removing problem fact

Order Matters

For removals and changes, call before_* first:

score_director.before_entity_removed(entity)
# Actually remove
solution.entities.remove(entity)
score_director.after_entity_removed(entity)

Real-Time API Example

from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager, ProblemChange

app = FastAPI()
solver_manager: SolverManager


@app.post("/visits")
async def add_visit(visit: VisitRequest, job_id: str):
    """Add a visit to an active solving job."""
    new_visit = Visit(
        id=str(uuid.uuid4()),
        location=visit.location,
        demand=visit.demand,
    )

    solver_manager.add_problem_change(
        job_id,
        AddVisitChange(new_visit)
    )

    return {"id": new_visit.id, "status": "added"}


@app.delete("/visits/{visit_id}")
async def remove_visit(visit_id: str, job_id: str):
    """Remove a visit from an active solving job."""
    solver_manager.add_problem_change(
        job_id,
        RemoveVisitChange(visit_id)
    )

    return {"status": "removed"}

Change Ordering

Changes are applied in the order they’re submitted:

solver.add_problem_change(change1)  # Applied first
solver.add_problem_change(change2)  # Applied second
solver.add_problem_change(change3)  # Applied third

Best Practices

Do

  • Keep changes small and focused
  • Notify score director of all modifications
  • Use before_* methods for removals/changes
  • Test changes in isolation

Don’t

  • Make changes without notifying score director
  • Modify multiple entities in one complex change
  • Forget to handle entity relationships

Debugging Changes

class DebugChange(ProblemChange[Solution]):
    def __init__(self, inner: ProblemChange):
        self.inner = inner

    def do_change(self, solution, score_director):
        print(f"Before: {len(solution.entities)} entities")
        self.inner.do_change(solution, score_director)
        print(f"After: {len(solution.entities)} entities")

Next Steps

2 - Continuous Planning

Rolling horizon and replanning strategies.

Continuous planning handles problems that span long time periods by using a rolling planning window. Instead of planning everything at once, you plan a window and move it forward as time passes.

The Challenge

Planning a full year of shifts at once:

  • Huge problem size
  • Far-future plans become irrelevant
  • Real-world changes invalidate long-term plans

Rolling Horizon

Plan only a window of time, then slide it forward:

Time ──────────────────────────────────────────►

Window 1: [====Plan====]
Window 2:    [====Plan====]
Window 3:       [====Plan====]

Implementation

from datetime import datetime, timedelta

def plan_window(start_date: date, window_days: int, problem: Schedule) -> Schedule:
    """Plan a time window."""
    end_date = start_date + timedelta(days=window_days)

    # Filter entities to window
    window_shifts = [
        s for s in problem.shifts
        if start_date <= s.date < end_date
    ]

    window_problem = Schedule(
        employees=problem.employees,
        shifts=window_shifts,
    )

    solver = create_solver()
    return solver.solve(window_problem)


def continuous_plan(problem: Schedule, window_days: int = 14):
    """Run continuous planning with rolling windows."""
    current_date = date.today()
    end_date = max(s.date for s in problem.shifts)

    while current_date < end_date:
        solution = plan_window(current_date, window_days, problem)
        save_solution(solution)

        # Move window forward
        current_date += timedelta(days=7)  # Overlap

Published vs Draft

Divide the window into published (locked) and draft (changeable):

Time ──────────────────────────────────────────►

      [Published][====Draft====]
      (Locked)   (Can change)

Implementation with Pinning

def prepare_window(problem: Schedule, publish_deadline: datetime):
    """Pin published shifts, leave draft unpinned."""
    for shift in problem.shifts:
        if shift.start_time < publish_deadline:
            shift.pinned = True
        else:
            shift.pinned = False

    return problem

Replanning Triggers

Replan when:

  1. Time-based: Every hour, day, or week
  2. Event-based: New orders, cancellations, resource changes
  3. Threshold-based: When score degrades below threshold

Event-Based Replanning

def on_new_order(order: Order, active_job_id: str):
    """Trigger replanning when new order arrives."""
    solver_manager.terminate_early(active_job_id)

    updated_problem = load_current_state()
    updated_problem.orders.append(order)

    new_job_id = start_solving(updated_problem)
    return new_job_id

Warm Starting

Start from the previous solution to preserve good assignments:

def warm_start_plan(previous: Schedule, new_shifts: list[Shift]) -> Schedule:
    """Start from previous solution, add new shifts."""
    # Keep previous assignments (pinned or as starting point)
    for shift in previous.shifts:
        if shift.employee is not None:
            shift.pinned = True  # Or just leave assigned

    # Add new unassigned shifts
    for shift in new_shifts:
        shift.employee = None
        shift.pinned = False
        previous.shifts.append(shift)

    return solve(previous)

Time Windows

Sliding Window

Week 1: Plan days 1-14
Week 2: Plan days 8-21 (7-day overlap)
Week 3: Plan days 15-28

The overlap allows replanning of near-future assignments.

Growing Window

For finite problems, grow the window:

Day 1: Plan days 1-7
Day 2: Plan days 1-14
Day 3: Plan days 1-21
...until complete

Handling Conflicts

When replanning conflicts with executed work:

def merge_with_reality(planned: Schedule, actual: Schedule) -> Schedule:
    """Merge planned schedule with actual execution."""
    for planned_shift in planned.shifts:
        actual_shift = find_actual(actual, planned_shift.id)

        if actual_shift and actual_shift.is_started:
            # Can't change started shifts
            planned_shift.employee = actual_shift.employee
            planned_shift.pinned = True

    return planned

Best Practices

Do

  • Use overlapping windows for smoother transitions
  • Pin executed/committed work
  • Warm start from previous solutions
  • Handle edge cases (window boundaries)

Don’t

  • Plan too far ahead (changes will invalidate)
  • Forget to merge with reality
  • Ignore the transition between windows

Example: Weekly Scheduling

class WeeklyScheduler:
    def __init__(self):
        self.solver_manager = create_solver_manager()

    def plan_next_week(self):
        """Run weekly planning cycle."""
        # Load current state
        current = load_current_schedule()

        # Determine window
        today = date.today()
        window_start = today + timedelta(days=(7 - today.weekday()))  # Next Monday
        window_end = window_start + timedelta(days=14)

        # Pin this week (being executed)
        for shift in current.shifts:
            if shift.date < window_start:
                shift.pinned = True
            elif shift.date < window_end:
                shift.pinned = False  # Can replan
            else:
                continue  # Outside window

        # Solve
        solution = self.solve(current)

        # Publish next week
        publish_week(solution, window_start, window_start + timedelta(days=7))

        return solution

Next Steps

3 - Repeated Planning

Batch optimization and periodic replanning.

Repeated planning runs the solver on a regular schedule, optimizing batches of work. Unlike continuous planning, each run is independent.

Use Cases

  • Daily route optimization
  • Weekly shift scheduling
  • Periodic resource allocation
  • Batch order assignment

Basic Pattern

from datetime import datetime
import schedule
import time

def daily_optimization():
    """Run optimization every day at 2 AM."""
    # Load today's problem
    problem = load_todays_problem()

    # Solve
    solver = create_solver()
    solution = solver.solve(problem)

    # Save results
    save_solution(solution)
    notify_stakeholders(solution)

# Schedule daily run
schedule.every().day.at("02:00").do(daily_optimization)

while True:
    schedule.run_pending()
    time.sleep(60)

Batch Processing

Process multiple independent problems:

def optimize_all_regions():
    """Optimize each region independently."""
    regions = load_regions()
    results = {}

    for region in regions:
        problem = load_region_problem(region)
        solution = solve(problem)
        results[region] = solution
        save_solution(region, solution)

    return results

Parallel Batch Processing

from concurrent.futures import ThreadPoolExecutor

def optimize_regions_parallel():
    """Optimize regions in parallel."""
    regions = load_regions()

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(solve_region, region): region
            for region in regions
        }

        results = {}
        for future in futures:
            region = futures[future]
            results[region] = future.result()

    return results

Time-Based Replanning

Fixed Schedule

# Every hour
schedule.every().hour.do(replan)

# Every day at specific time
schedule.every().day.at("06:00").do(replan)

# Every Monday
schedule.every().monday.at("00:00").do(weekly_plan)

Cron-Based

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# Run at 2 AM every day
scheduler.add_job(daily_optimization, 'cron', hour=2)

# Run every 30 minutes
scheduler.add_job(frequent_replan, 'cron', minute='*/30')

scheduler.start()

Handling Failures

def robust_optimization():
    """Optimization with retry and fallback."""
    max_retries = 3

    for attempt in range(max_retries):
        try:
            problem = load_problem()
            solution = solve(problem)
            save_solution(solution)
            return solution

        except Exception as e:
            logger.error(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(60)  # Wait before retry
            else:
                # Use previous solution as fallback
                return load_previous_solution()

Comparing Solutions

Track solution quality over time:

def track_solution_quality(solution: Schedule):
    """Log solution metrics for analysis."""
    metrics = {
        "timestamp": datetime.now().isoformat(),
        "score": str(solution.score),
        "feasible": solution.score.is_feasible,
        "entity_count": len(solution.shifts),
        "assigned_count": sum(1 for s in solution.shifts if s.employee),
    }

    log_metrics(metrics)

    # Alert if quality degrades
    if not solution.score.is_feasible:
        send_alert("Infeasible solution generated!")

Incremental vs Fresh

Fresh Start

Each run starts from scratch:

def fresh_optimization():
    problem = load_problem()
    # All entities unassigned
    for entity in problem.entities:
        entity.planning_variable = None
    return solve(problem)

Incremental (Warm Start)

Start from previous solution:

def incremental_optimization():
    previous = load_previous_solution()

    # Keep good assignments, clear bad ones
    for entity in previous.entities:
        if should_keep(entity):
            entity.pinned = True
        else:
            entity.planning_variable = None
            entity.pinned = False

    return solve(previous)

Monitoring

class OptimizationMonitor:
    def __init__(self):
        self.runs = []

    def record_run(self, solution, duration):
        self.runs.append({
            "time": datetime.now(),
            "score": solution.score,
            "duration": duration,
            "feasible": solution.score.is_feasible,
        })

    def get_statistics(self):
        if not self.runs:
            return None

        feasible_rate = sum(r["feasible"] for r in self.runs) / len(self.runs)
        avg_duration = sum(r["duration"] for r in self.runs) / len(self.runs)

        return {
            "total_runs": len(self.runs),
            "feasibility_rate": feasible_rate,
            "avg_duration_seconds": avg_duration,
        }

Best Practices

Do

  • Log all runs for analysis
  • Implement retry logic
  • Monitor solution quality trends
  • Use appropriate scheduling library

Don’t

  • Run optimization during peak hours
  • Ignore failures silently
  • Forget to save results
  • Overload with too frequent replanning

Next Steps