SolverManager
Manage concurrent and asynchronous solving jobs.
SolverManager handles concurrent solving jobs, making it ideal for web applications and services.
Creating a SolverManager
from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)
solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)
Basic Solving
solve()
Non-blocking solve that returns a future:
import uuid
job_id = str(uuid.uuid4())
# Start solving (non-blocking)
future = solver_manager.solve(job_id, problem)
# ... do other work ...
# Get result (blocks until done)
solution = future.get_final_best_solution()
print(f"Score: {solution.score}")
solve_and_listen()
Solve with progress callbacks:
def on_best_solution_changed(solution: Timetable):
print(f"New best: {solution.score}")
# Update UI, save to database, etc.
def on_exception(error):
print(f"Solving failed: {error}")
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_best_solution_changed,
exception_handler=on_exception,
)
Managing Jobs
Check Job Status
status = solver_manager.get_solver_status(job_id)
# Returns: NOT_SOLVING, SOLVING_ACTIVE, SOLVING_ENDED
Get Current Best Solution
solution = solver_manager.get_best_solution(job_id)
if solution:
print(f"Current best: {solution.score}")
Terminate Early
solver_manager.terminate_early(job_id)
FastAPI Integration
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid
solver_manager: SolverManager | None = None
solutions: dict[str, Timetable] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
global solver_manager
config = SolverConfig(...)
factory = SolverFactory.create(config)
solver_manager = SolverManager.create(factory)
yield
solver_manager.close()
app = FastAPI(lifespan=lifespan)
@app.post("/solve")
async def start_solving(problem: TimetableRequest) -> str:
job_id = str(uuid.uuid4())
def on_best_solution(solution: Timetable):
solutions[job_id] = solution
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem.to_domain(),
best_solution_consumer=on_best_solution,
)
return job_id
@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
status = solver_manager.get_solver_status(job_id)
return {
"status": status.name,
"score": str(solution.score),
"solution": TimetableResponse.from_domain(solution),
}
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
solver_manager.terminate_early(job_id)
return {"status": "terminating"}
Concurrent Jobs
SolverManager handles multiple jobs concurrently:
# Start multiple jobs
job1 = solver_manager.solve("job1", problem1)
job2 = solver_manager.solve("job2", problem2)
job3 = solver_manager.solve("job3", problem3)
# Each runs in its own thread
# Results available when ready
solution1 = job1.get_final_best_solution()
Resource Limits
By default, jobs run with no limit on concurrent execution. For resource management:
# Limit concurrent solvers
solver_manager = SolverManager.create(
solver_factory,
parallel_solver_count=4, # Max 4 concurrent jobs
)
Problem Changes During Solving
Add changes to running jobs:
from solverforge_legacy.solver import ProblemChange
class AddEntity(ProblemChange[Timetable]):
def __init__(self, entity):
self.entity = entity
def do_change(self, working_solution, score_director):
working_solution.lessons.append(self.entity)
score_director.after_entity_added(self.entity)
# Add change to running job
solver_manager.add_problem_change(job_id, AddEntity(new_lesson))
Cleanup
Always close the SolverManager when done:
# Using context manager
with SolverManager.create(factory) as manager:
# ... use manager ...
# Automatically closed
# Manual cleanup
try:
# ... use manager ...
finally:
solver_manager.close()
Error Handling
def on_exception(job_id: str, exception: Exception):
logger.error(f"Job {job_id} failed: {exception}")
# Clean up, notify user, etc.
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_solution,
exception_handler=on_exception,
)
Best Practices
Do
- Use
solve_and_listen()for progress updates - Store solutions externally (database, cache)
- Handle exceptions properly
- Close SolverManager on shutdown
Don’t
- Block the main thread waiting for results
- Store solutions only in memory (lose on restart)
- Forget to handle job cleanup
Next Steps
- SolutionManager - Analyze solutions
- Real-Time Planning - Problem changes
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.