Integration
Integrate SolverForge with web frameworks and other systems.
SolverForge integrates easily with Python web frameworks and data systems.
Topics
FastAPI Example
from fastapi import FastAPI
from solverforge_legacy.solver import SolverManager
app = FastAPI()
solver_manager = SolverManager.create(solver_factory)
@app.post("/solve")
async def solve(problem: Timetable) -> str:
job_id = str(uuid.uuid4())
solver_manager.solve_and_listen(
job_id,
lambda _: problem,
on_best_solution_changed
)
return job_id
@app.get("/solution/{job_id}")
async def get_solution(job_id: str) -> Timetable:
return solver_manager.get_best_solution(job_id)
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
solver_manager.terminate_early(job_id)
Serialization
SolverForge domain objects are standard Python dataclasses, making them easy to serialize:
import json
from dataclasses import asdict
# Serialize to JSON
json_str = json.dumps(asdict(solution))
# With Pydantic for validation
from pydantic.dataclasses import dataclass as pydantic_dataclass
@pydantic_dataclass
class TimetableDTO:
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
Database Integration
Use any Python ORM (SQLAlchemy, Django ORM, etc.) for persistence:
- Load data from database into domain objects
- Run the solver
- Save results back to database
The solver works with in-memory Python objects, so any data source that can produce those objects will work.
1 - FastAPI Integration
Build REST APIs for your solver with FastAPI.
FastAPI is a modern Python web framework that works well with SolverForge. This guide shows common patterns for building solver APIs.
Basic Setup
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import uuid
from solverforge_legacy.solver import SolverFactory, SolverManager
from solverforge_legacy.solver.config import (
SolverConfig, ScoreDirectorFactoryConfig, TerminationConfig, Duration
)
# Global state
solver_manager: SolverManager | None = None
solutions: dict[str, Solution] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize solver on startup, cleanup on shutdown."""
global solver_manager
config = SolverConfig(
solution_class=Timetable,
entity_class_list=[Lesson],
score_director_factory_config=ScoreDirectorFactoryConfig(
constraint_provider_function=define_constraints
),
termination_config=TerminationConfig(spent_limit=Duration(minutes=5)),
)
solver_factory = SolverFactory.create(config)
solver_manager = SolverManager.create(solver_factory)
yield
if solver_manager:
solver_manager.close()
app = FastAPI(
title="Solver API",
description="Planning optimization API",
lifespan=lifespan,
)
API Endpoints
Submit Problem
@app.post("/solve", response_model=str)
async def submit_problem(request: ProblemRequest) -> str:
"""Submit a problem for solving. Returns job ID."""
job_id = str(uuid.uuid4())
problem = request.to_domain()
def on_best_solution(solution):
solutions[job_id] = solution
solver_manager.solve_and_listen(
job_id,
problem_finder=lambda _: problem,
best_solution_consumer=on_best_solution,
)
return job_id
Get Solution
@app.get("/solution/{job_id}", response_model=SolutionResponse)
async def get_solution(job_id: str) -> SolutionResponse:
"""Get the current best solution."""
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
status = solver_manager.get_solver_status(job_id)
return SolutionResponse.from_domain(solution, status)
Stop Solving
@app.delete("/solve/{job_id}")
async def stop_solving(job_id: str):
"""Stop solving early."""
solver_manager.terminate_early(job_id)
return {"status": "terminating"}
Get Score Analysis
@app.get("/analysis/{job_id}")
async def get_analysis(job_id: str):
"""Get detailed score analysis."""
if job_id not in solutions:
raise HTTPException(404, "Job not found")
solution = solutions[job_id]
analysis = solution_manager.analyze(solution)
return {
"score": str(analysis.score),
"is_feasible": analysis.score.is_feasible,
"constraints": [
{
"name": c.constraint_name,
"score": str(c.score),
"matches": c.match_count,
}
for c in analysis.constraint_analyses()
],
}
Request/Response Models
Use Pydantic for validation:
from pydantic import BaseModel
from datetime import time
class TimeslotDTO(BaseModel):
day: str
start_time: str
end_time: str
def to_domain(self) -> Timeslot:
return Timeslot(
self.day,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
@classmethod
def from_domain(cls, timeslot: Timeslot) -> "TimeslotDTO":
return cls(
day=timeslot.day,
start_time=timeslot.start_time.isoformat(),
end_time=timeslot.end_time.isoformat(),
)
class ProblemRequest(BaseModel):
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
def to_domain(self) -> Timetable:
timeslots = [t.to_domain() for t in self.timeslots]
rooms = [r.to_domain() for r in self.rooms]
lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
return Timetable("api", timeslots, rooms, lessons)
class SolutionResponse(BaseModel):
status: str
score: str | None
is_feasible: bool | None
lessons: list[LessonDTO]
@classmethod
def from_domain(cls, solution: Timetable, status) -> "SolutionResponse":
return cls(
status=status.name,
score=str(solution.score) if solution.score else None,
is_feasible=solution.score.is_feasible if solution.score else None,
lessons=[LessonDTO.from_domain(l) for l in solution.lessons],
)
Real-Time Updates
Problem Changes
@app.post("/solve/{job_id}/lessons")
async def add_lesson(job_id: str, lesson: LessonDTO):
"""Add a lesson to an active job."""
new_lesson = lesson.to_domain()
solver_manager.add_problem_change(
job_id,
AddLessonChange(new_lesson)
)
return {"status": "added", "id": new_lesson.id}
WebSocket Updates
from fastapi import WebSocket
@app.websocket("/ws/{job_id}")
async def websocket_updates(websocket: WebSocket, job_id: str):
await websocket.accept()
async def send_update(solution):
await websocket.send_json({
"score": str(solution.score),
"timestamp": datetime.now().isoformat(),
})
# Register listener
# (Implementation depends on your event system)
try:
while True:
await asyncio.sleep(1)
if job_id in solutions:
await send_update(solutions[job_id])
except WebSocketDisconnect:
pass
Error Handling
from fastapi import HTTPException
from fastapi.responses import JSONResponse
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
return JSONResponse(
status_code=500,
content={"error": str(exc)},
)
@app.get("/solution/{job_id}")
async def get_solution(job_id: str):
if job_id not in solutions:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} not found"
)
# ...
Testing
from fastapi.testclient import TestClient
def test_submit_and_get():
client = TestClient(app)
# Submit problem
response = client.post("/solve", json=problem_data)
assert response.status_code == 200
job_id = response.json()
# Wait for solving
time.sleep(5)
# Get solution
response = client.get(f"/solution/{job_id}")
assert response.status_code == 200
assert response.json()["is_feasible"]
Deployment
Docker
FROM python:3.11-slim
# Install JDK
RUN apt-get update && apt-get install -y openjdk-17-jdk
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Running
# Development
uvicorn main:app --reload
# Production
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Next Steps
2 - Serialization
JSON serialization with dataclasses and Pydantic.
SolverForge domain objects are Python dataclasses, making them easy to serialize to JSON for APIs and storage.
Basic JSON Serialization
Using dataclasses
from dataclasses import dataclass, asdict
import json
@dataclass
class Timeslot:
day: str
start_time: str
end_time: str
@dataclass
class Room:
name: str
# Serialize
timeslot = Timeslot("MONDAY", "08:30", "09:30")
json_str = json.dumps(asdict(timeslot))
# {"day": "MONDAY", "start_time": "08:30", "end_time": "09:30"}
# Deserialize
data = json.loads(json_str)
timeslot = Timeslot(**data)
Handling Complex Types
For types like datetime and time:
from dataclasses import dataclass
from datetime import time
import json
class TimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, time):
return obj.isoformat()
return super().default(obj)
def time_decoder(dct):
for key, value in dct.items():
if key.endswith('_time') and isinstance(value, str):
try:
dct[key] = time.fromisoformat(value)
except ValueError:
pass
return dct
# Serialize
json_str = json.dumps(asdict(obj), cls=TimeEncoder)
# Deserialize
data = json.loads(json_str, object_hook=time_decoder)
Pydantic Integration
Pydantic provides automatic validation and serialization:
DTO Pattern
Separate API models from domain models:
from pydantic import BaseModel
from datetime import time
# API model
class TimeslotDTO(BaseModel):
day: str
start_time: str
end_time: str
def to_domain(self) -> Timeslot:
return Timeslot(
self.day,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
@classmethod
def from_domain(cls, timeslot: Timeslot) -> "TimeslotDTO":
return cls(
day=timeslot.day,
start_time=timeslot.start_time.isoformat(),
end_time=timeslot.end_time.isoformat(),
)
# Domain model (unchanged)
@dataclass
class Timeslot:
day: str
start_time: time
end_time: time
Full Example
from pydantic import BaseModel
from datetime import time
class LessonDTO(BaseModel):
id: str
subject: str
teacher: str
student_group: str
timeslot: TimeslotDTO | None = None
room: RoomDTO | None = None
def to_domain(self, timeslots: list[Timeslot], rooms: list[Room]) -> Lesson:
ts = None
if self.timeslot:
ts = find_timeslot(timeslots, self.timeslot)
rm = None
if self.room:
rm = find_room(rooms, self.room)
return Lesson(
id=self.id,
subject=self.subject,
teacher=self.teacher,
student_group=self.student_group,
timeslot=ts,
room=rm,
)
@classmethod
def from_domain(cls, lesson: Lesson) -> "LessonDTO":
return cls(
id=lesson.id,
subject=lesson.subject,
teacher=lesson.teacher,
student_group=lesson.student_group,
timeslot=TimeslotDTO.from_domain(lesson.timeslot) if lesson.timeslot else None,
room=RoomDTO.from_domain(lesson.room) if lesson.room else None,
)
class TimetableDTO(BaseModel):
id: str
timeslots: list[TimeslotDTO]
rooms: list[RoomDTO]
lessons: list[LessonDTO]
score: str | None = None
def to_domain(self) -> Timetable:
timeslots = [t.to_domain() for t in self.timeslots]
rooms = [r.to_domain() for r in self.rooms]
lessons = [l.to_domain(timeslots, rooms) for l in self.lessons]
return Timetable(self.id, timeslots, rooms, lessons)
@classmethod
def from_domain(cls, timetable: Timetable) -> "TimetableDTO":
return cls(
id=timetable.id,
timeslots=[TimeslotDTO.from_domain(t) for t in timetable.timeslots],
rooms=[RoomDTO.from_domain(r) for r in timetable.rooms],
lessons=[LessonDTO.from_domain(l) for l in timetable.lessons],
score=str(timetable.score) if timetable.score else None,
)
Reference Resolution
When deserializing, resolve references to shared objects:
def find_timeslot(timeslots: list[Timeslot], dto: TimeslotDTO) -> Timeslot:
"""Find matching timeslot by properties."""
for ts in timeslots:
if (ts.day == dto.day and
ts.start_time.isoformat() == dto.start_time):
return ts
raise ValueError(f"Timeslot not found: {dto}")
def find_room(rooms: list[Room], dto: RoomDTO) -> Room:
"""Find matching room by name."""
for room in rooms:
if room.name == dto.name:
return room
raise ValueError(f"Room not found: {dto}")
Score Serialization
from solverforge_legacy.solver.score import HardSoftScore
# To string
score_str = str(solution.score) # "-2hard/-15soft"
# From string
score = HardSoftScore.parse("-2hard/-15soft")
# To dict
score_dict = {
"hard": solution.score.hard_score,
"soft": solution.score.soft_score,
"feasible": solution.score.is_feasible,
}
Database Persistence
SQLAlchemy Example
from sqlalchemy import Column, String, Integer, ForeignKey
from sqlalchemy.orm import relationship
class TimeslotModel(Base):
__tablename__ = "timeslots"
id = Column(Integer, primary_key=True)
day = Column(String)
start_time = Column(String)
end_time = Column(String)
def to_domain(self) -> Timeslot:
return Timeslot(
self.day,
time.fromisoformat(self.start_time),
time.fromisoformat(self.end_time),
)
class LessonModel(Base):
__tablename__ = "lessons"
id = Column(String, primary_key=True)
subject = Column(String)
teacher = Column(String)
student_group = Column(String)
timeslot_id = Column(Integer, ForeignKey("timeslots.id"), nullable=True)
room_id = Column(Integer, ForeignKey("rooms.id"), nullable=True)
timeslot = relationship("TimeslotModel")
room = relationship("RoomModel")
Best Practices
Do
- Use DTOs for API boundaries
- Validate input with Pydantic
- Handle None values explicitly
- Use consistent naming conventions
Don’t
- Serialize domain objects directly (may expose internals)
- Forget to handle score serialization
- Ignore reference resolution
- Mix API and domain models
Next Steps
3 - Logging
Configure logging for debugging and monitoring.
Configure Python logging to monitor solver behavior and debug issues.
Basic Configuration
import logging
# Configure root logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Get logger for your app
logger = logging.getLogger("my_app")
Solver Logging
The solver uses the ai.timefold logger hierarchy:
# Enable solver debug logging
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)
# Or just specific components
logging.getLogger("ai.timefold.solver").setLevel(logging.DEBUG)
Log Levels
| Level | Use Case |
|---|
| DEBUG | Detailed solver internals |
| INFO | Progress updates, scores |
| WARNING | Potential issues |
| ERROR | Failures |
Progress Logging
Log solver progress with event listeners:
from solverforge_legacy.solver import BestSolutionChangedEvent
logger = logging.getLogger("solver")
def on_progress(event: BestSolutionChangedEvent):
logger.info(
f"Score: {event.new_best_score} | "
f"Time: {event.time_spent} | "
f"Initialized: {event.is_new_best_solution_initialized}"
)
solver.add_event_listener(on_progress)
File Logging
Write logs to a file:
import logging
# Create file handler
file_handler = logging.FileHandler("solver.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
))
# Add to logger
logging.getLogger().addHandler(file_handler)
Structured Logging
For production, use structured logging:
import json
import logging
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "score"):
log_data["score"] = record.score
if hasattr(record, "job_id"):
log_data["job_id"] = record.job_id
return json.dumps(log_data)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.getLogger().addHandler(handler)
Logging with Context
def log_with_context(logger, job_id, message, **kwargs):
extra = {"job_id": job_id, **kwargs}
logger.info(message, extra=extra)
# Usage
log_with_context(logger, "job-123", "Solving started", entities=100)
FastAPI Logging
from fastapi import FastAPI, Request
import logging
import time
logger = logging.getLogger("api")
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
logger.info(
f"{request.method} {request.url.path} "
f"- {response.status_code} - {duration:.3f}s"
)
return response
Debugging Tips
Enable Verbose Logging
# Maximum verbosity
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("ai.timefold").setLevel(logging.DEBUG)
Log Constraint Matches
def debug_constraints(solution):
logger = logging.getLogger("constraints")
analysis = solution_manager.analyze(solution)
for constraint in analysis.constraint_analyses():
logger.debug(
f"{constraint.constraint_name}: "
f"score={constraint.score}, matches={constraint.match_count}"
)
for match in constraint.matches():
logger.debug(f" - {match.justification}")
Log Configuration
def log_config(config: SolverConfig):
logger = logging.getLogger("config")
logger.info(f"Solution class: {config.solution_class}")
logger.info(f"Entity classes: {config.entity_class_list}")
logger.info(f"Termination: {config.termination_config}")
Production Recommendations
Log Aggregation
Send logs to a central system:
# Example with Python logging to stdout (for container orchestration)
logging.basicConfig(
level=logging.INFO,
format='%(message)s', # JSON formatted
stream=sys.stdout,
)
Metrics
Track key metrics:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SolveMetrics:
job_id: str
start_time: datetime
end_time: datetime | None = None
final_score: str | None = None
is_feasible: bool | None = None
def log(self):
duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0
logger.info(
f"Job {self.job_id}: "
f"duration={duration:.1f}s, "
f"score={self.final_score}, "
f"feasible={self.is_feasible}"
)
Alerting
Alert on issues:
def check_solution_quality(solution, job_id):
if not solution.score.is_feasible:
logger.warning(f"Job {job_id} produced infeasible solution!")
send_alert(f"Infeasible solution for job {job_id}")
if solution.score.soft_score < -10000:
logger.warning(f"Job {job_id} has poor soft score: {solution.score}")
Next Steps