Design Patterns
Common patterns for handling real-world planning scenarios.
Real-world planning problems often require more than basic optimization. This section covers patterns for common scenarios.
Topics
Real-Time Planning
Handle dynamic changes during solving:
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange[Timetable]):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, working_solution: Timetable, score_director):
# Add the new lesson to the working solution
working_solution.lessons.append(self.lesson)
score_director.after_entity_added(self.lesson)
# Apply change while solver is running
solver.add_problem_change(AddLessonChange(new_lesson))
Continuous Planning
For problems that span long time periods, use a rolling horizon:
- Plan Window - Only optimize a subset of the timeline
- Published Window - Lock decisions that are being executed
- Draft Window - Future decisions that can still change
When to Use These Patterns
| Scenario | Pattern |
|---|
| New orders arrive during planning | Real-Time Planning |
| Plan extends into the future | Continuous Planning |
| Daily/weekly batch optimization | Repeated Planning |
| Vehicle breakdowns, cancellations | Real-Time Planning |
| Rolling weekly schedules | Continuous Planning |
1 - Real-Time Planning
Handle changes while the solver is running.
Real-time planning allows you to modify the problem while the solver is running. This is essential for handling dynamic changes like new orders, cancellations, or resource changes.
Problem Changes
Use ProblemChange to modify the working solution:
from solverforge_legacy.solver import ProblemChange
class AddLessonChange(ProblemChange[Timetable]):
def __init__(self, lesson: Lesson):
self.lesson = lesson
def do_change(self, working_solution: Timetable, score_director):
# Add to solution
working_solution.lessons.append(self.lesson)
# Notify score director
score_director.after_entity_added(self.lesson)
Applying Changes
# With Solver
new_lesson = Lesson("new-1", "Art", "S. Dali", "Group A")
solver.add_problem_change(AddLessonChange(new_lesson))
# With SolverManager
solver_manager.add_problem_change(job_id, AddLessonChange(new_lesson))
Common Change Types
Add Entity
class AddVisitChange(ProblemChange[RoutePlan]):
def __init__(self, visit: Visit):
self.visit = visit
def do_change(self, solution: RoutePlan, score_director):
solution.visits.append(self.visit)
score_director.after_entity_added(self.visit)
Remove Entity
class RemoveVisitChange(ProblemChange[RoutePlan]):
def __init__(self, visit_id: str):
self.visit_id = visit_id
def do_change(self, solution: RoutePlan, score_director):
visit = next(v for v in solution.visits if v.id == self.visit_id)
# Remove from vehicle if assigned
if visit.vehicle:
score_director.before_list_variable_changed(
visit.vehicle, "visits", visit.vehicle.visits
)
visit.vehicle.visits.remove(visit)
score_director.after_list_variable_changed(
visit.vehicle, "visits", visit.vehicle.visits
)
# Remove from solution
score_director.before_entity_removed(visit)
solution.visits.remove(visit)
score_director.after_entity_removed(visit)
Modify Entity
class UpdateVisitDemandChange(ProblemChange[RoutePlan]):
def __init__(self, visit_id: str, new_demand: int):
self.visit_id = visit_id
self.new_demand = new_demand
def do_change(self, solution: RoutePlan, score_director):
visit = next(v for v in solution.visits if v.id == self.visit_id)
score_director.before_problem_property_changed(visit)
visit.demand = self.new_demand
score_director.after_problem_property_changed(visit)
Add Problem Fact
class AddVehicleChange(ProblemChange[RoutePlan]):
def __init__(self, vehicle: Vehicle):
self.vehicle = vehicle
def do_change(self, solution: RoutePlan, score_director):
solution.vehicles.append(self.vehicle)
score_director.after_problem_fact_added(self.vehicle)
Score Director Notifications
Always notify the score director of changes:
| Method | When to Use |
|---|
after_entity_added() | Added planning entity |
before/after_entity_removed() | Removing planning entity |
before/after_variable_changed() | Changed planning variable |
before/after_list_variable_changed() | Changed list variable |
before/after_problem_property_changed() | Changed entity property |
after_problem_fact_added() | Added problem fact |
before/after_problem_fact_removed() | Removing problem fact |
Order Matters
For removals and changes, call before_* first:
score_director.before_entity_removed(entity)
# Actually remove
solution.entities.remove(entity)
score_director.after_entity_removed(entity)
Real-Time API Example
from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager, ProblemChange
app = FastAPI()
solver_manager: SolverManager
@app.post("/visits")
async def add_visit(visit: VisitRequest, job_id: str):
"""Add a visit to an active solving job."""
new_visit = Visit(
id=str(uuid.uuid4()),
location=visit.location,
demand=visit.demand,
)
solver_manager.add_problem_change(
job_id,
AddVisitChange(new_visit)
)
return {"id": new_visit.id, "status": "added"}
@app.delete("/visits/{visit_id}")
async def remove_visit(visit_id: str, job_id: str):
"""Remove a visit from an active solving job."""
solver_manager.add_problem_change(
job_id,
RemoveVisitChange(visit_id)
)
return {"status": "removed"}
Change Ordering
Changes are applied in the order they’re submitted:
solver.add_problem_change(change1) # Applied first
solver.add_problem_change(change2) # Applied second
solver.add_problem_change(change3) # Applied third
Best Practices
Do
- Keep changes small and focused
- Notify score director of all modifications
- Use
before_* methods for removals/changes - Test changes in isolation
Don’t
- Make changes without notifying score director
- Modify multiple entities in one complex change
- Forget to handle entity relationships
Debugging Changes
class DebugChange(ProblemChange[Solution]):
def __init__(self, inner: ProblemChange):
self.inner = inner
def do_change(self, solution, score_director):
print(f"Before: {len(solution.entities)} entities")
self.inner.do_change(solution, score_director)
print(f"After: {len(solution.entities)} entities")
Next Steps
2 - Continuous Planning
Rolling horizon and replanning strategies.
Continuous planning handles problems that span long time periods by using a rolling planning window. Instead of planning everything at once, you plan a window and move it forward as time passes.
The Challenge
Planning a full year of shifts at once:
- Huge problem size
- Far-future plans become irrelevant
- Real-world changes invalidate long-term plans
Rolling Horizon
Plan only a window of time, then slide it forward:
Time ──────────────────────────────────────────►
Window 1: [====Plan====]
Window 2: [====Plan====]
Window 3: [====Plan====]
Implementation
from datetime import datetime, timedelta
def plan_window(start_date: date, window_days: int, problem: Schedule) -> Schedule:
"""Plan a time window."""
end_date = start_date + timedelta(days=window_days)
# Filter entities to window
window_shifts = [
s for s in problem.shifts
if start_date <= s.date < end_date
]
window_problem = Schedule(
employees=problem.employees,
shifts=window_shifts,
)
solver = create_solver()
return solver.solve(window_problem)
def continuous_plan(problem: Schedule, window_days: int = 14):
"""Run continuous planning with rolling windows."""
current_date = date.today()
end_date = max(s.date for s in problem.shifts)
while current_date < end_date:
solution = plan_window(current_date, window_days, problem)
save_solution(solution)
# Move window forward
current_date += timedelta(days=7) # Overlap
Published vs Draft
Divide the window into published (locked) and draft (changeable):
Time ──────────────────────────────────────────►
[Published][====Draft====]
(Locked) (Can change)
Implementation with Pinning
def prepare_window(problem: Schedule, publish_deadline: datetime):
"""Pin published shifts, leave draft unpinned."""
for shift in problem.shifts:
if shift.start_time < publish_deadline:
shift.pinned = True
else:
shift.pinned = False
return problem
Replanning Triggers
Replan when:
- Time-based: Every hour, day, or week
- Event-based: New orders, cancellations, resource changes
- Threshold-based: When score degrades below threshold
Event-Based Replanning
def on_new_order(order: Order, active_job_id: str):
"""Trigger replanning when new order arrives."""
solver_manager.terminate_early(active_job_id)
updated_problem = load_current_state()
updated_problem.orders.append(order)
new_job_id = start_solving(updated_problem)
return new_job_id
Warm Starting
Start from the previous solution to preserve good assignments:
def warm_start_plan(previous: Schedule, new_shifts: list[Shift]) -> Schedule:
"""Start from previous solution, add new shifts."""
# Keep previous assignments (pinned or as starting point)
for shift in previous.shifts:
if shift.employee is not None:
shift.pinned = True # Or just leave assigned
# Add new unassigned shifts
for shift in new_shifts:
shift.employee = None
shift.pinned = False
previous.shifts.append(shift)
return solve(previous)
Time Windows
Sliding Window
Week 1: Plan days 1-14
Week 2: Plan days 8-21 (7-day overlap)
Week 3: Plan days 15-28
The overlap allows replanning of near-future assignments.
Growing Window
For finite problems, grow the window:
Day 1: Plan days 1-7
Day 2: Plan days 1-14
Day 3: Plan days 1-21
...until complete
Handling Conflicts
When replanning conflicts with executed work:
def merge_with_reality(planned: Schedule, actual: Schedule) -> Schedule:
"""Merge planned schedule with actual execution."""
for planned_shift in planned.shifts:
actual_shift = find_actual(actual, planned_shift.id)
if actual_shift and actual_shift.is_started:
# Can't change started shifts
planned_shift.employee = actual_shift.employee
planned_shift.pinned = True
return planned
Best Practices
Do
- Use overlapping windows for smoother transitions
- Pin executed/committed work
- Warm start from previous solutions
- Handle edge cases (window boundaries)
Don’t
- Plan too far ahead (changes will invalidate)
- Forget to merge with reality
- Ignore the transition between windows
Example: Weekly Scheduling
class WeeklyScheduler:
def __init__(self):
self.solver_manager = create_solver_manager()
def plan_next_week(self):
"""Run weekly planning cycle."""
# Load current state
current = load_current_schedule()
# Determine window
today = date.today()
window_start = today + timedelta(days=(7 - today.weekday())) # Next Monday
window_end = window_start + timedelta(days=14)
# Pin this week (being executed)
for shift in current.shifts:
if shift.date < window_start:
shift.pinned = True
elif shift.date < window_end:
shift.pinned = False # Can replan
else:
continue # Outside window
# Solve
solution = self.solve(current)
# Publish next week
publish_week(solution, window_start, window_start + timedelta(days=7))
return solution
Next Steps
3 - Repeated Planning
Batch optimization and periodic replanning.
Repeated planning runs the solver on a regular schedule, optimizing batches of work. Unlike continuous planning, each run is independent.
Use Cases
- Daily route optimization
- Weekly shift scheduling
- Periodic resource allocation
- Batch order assignment
Basic Pattern
from datetime import datetime
import schedule
import time
def daily_optimization():
"""Run optimization every day at 2 AM."""
# Load today's problem
problem = load_todays_problem()
# Solve
solver = create_solver()
solution = solver.solve(problem)
# Save results
save_solution(solution)
notify_stakeholders(solution)
# Schedule daily run
schedule.every().day.at("02:00").do(daily_optimization)
while True:
schedule.run_pending()
time.sleep(60)
Batch Processing
Process multiple independent problems:
def optimize_all_regions():
"""Optimize each region independently."""
regions = load_regions()
results = {}
for region in regions:
problem = load_region_problem(region)
solution = solve(problem)
results[region] = solution
save_solution(region, solution)
return results
Parallel Batch Processing
from concurrent.futures import ThreadPoolExecutor
def optimize_regions_parallel():
"""Optimize regions in parallel."""
regions = load_regions()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(solve_region, region): region
for region in regions
}
results = {}
for future in futures:
region = futures[future]
results[region] = future.result()
return results
Time-Based Replanning
Fixed Schedule
# Every hour
schedule.every().hour.do(replan)
# Every day at specific time
schedule.every().day.at("06:00").do(replan)
# Every Monday
schedule.every().monday.at("00:00").do(weekly_plan)
Cron-Based
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Run at 2 AM every day
scheduler.add_job(daily_optimization, 'cron', hour=2)
# Run every 30 minutes
scheduler.add_job(frequent_replan, 'cron', minute='*/30')
scheduler.start()
Handling Failures
def robust_optimization():
"""Optimization with retry and fallback."""
max_retries = 3
for attempt in range(max_retries):
try:
problem = load_problem()
solution = solve(problem)
save_solution(solution)
return solution
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(60) # Wait before retry
else:
# Use previous solution as fallback
return load_previous_solution()
Comparing Solutions
Track solution quality over time:
def track_solution_quality(solution: Schedule):
"""Log solution metrics for analysis."""
metrics = {
"timestamp": datetime.now().isoformat(),
"score": str(solution.score),
"feasible": solution.score.is_feasible,
"entity_count": len(solution.shifts),
"assigned_count": sum(1 for s in solution.shifts if s.employee),
}
log_metrics(metrics)
# Alert if quality degrades
if not solution.score.is_feasible:
send_alert("Infeasible solution generated!")
Incremental vs Fresh
Fresh Start
Each run starts from scratch:
def fresh_optimization():
problem = load_problem()
# All entities unassigned
for entity in problem.entities:
entity.planning_variable = None
return solve(problem)
Incremental (Warm Start)
Start from previous solution:
def incremental_optimization():
previous = load_previous_solution()
# Keep good assignments, clear bad ones
for entity in previous.entities:
if should_keep(entity):
entity.pinned = True
else:
entity.planning_variable = None
entity.pinned = False
return solve(previous)
Monitoring
class OptimizationMonitor:
def __init__(self):
self.runs = []
def record_run(self, solution, duration):
self.runs.append({
"time": datetime.now(),
"score": solution.score,
"duration": duration,
"feasible": solution.score.is_feasible,
})
def get_statistics(self):
if not self.runs:
return None
feasible_rate = sum(r["feasible"] for r in self.runs) / len(self.runs)
avg_duration = sum(r["duration"] for r in self.runs) / len(self.runs)
return {
"total_runs": len(self.runs),
"feasibility_rate": feasible_rate,
"avg_duration_seconds": avg_duration,
}
Best Practices
Do
- Log all runs for analysis
- Implement retry logic
- Monitor solution quality trends
- Use appropriate scheduling library
Don’t
- Run optimization during peak hours
- Ignore failures silently
- Forget to save results
- Overload with too frequent replanning
Next Steps