Documentation
Collectors
Aggregation functions for group_by operations in constraint streams.
Collectors aggregate values within groups created by group_by(...). They
transform a stream of individual matches into grouped summaries. For simple
fairness rules, prefer balance(...); use collectors when you need explicit
counts, totals, or custom imbalance data.
In the current release, the collector trait is Collector<Input>. Input is
the borrowed stream match shape: unary and projected grouping pass &A or
&Out, while direct cross-join grouping passes (&A, &B). Stock collectors
such as count, sum, collect_vec, consecutive_runs, and
indexed_presence use the same protocol across those shapes.
Using Collectors
Pass a collector as the second argument to group_by:
factory.for_each(Schedule::shifts())
.group_by(
|shift: &Shift| shift.employee_idx, // grouping key
count(), // collector
)
// Result: grouped stream of (key, usize)
Available Collectors
count()
Counts the number of matches in each group. Returns usize.
.group_by(|s: &Shift| s.employee_idx, count())
// → (key, usize)
sum(mapper)
Sums numeric values in each group. The mapper extracts the value to sum.
.group_by(|s: &Shift| s.employee_idx, sum(|s: &Shift| s.hours))
// → (key, i64)
For direct cross-join grouping, the mapper receives the joined pair as a tuple:
type Streams = ConstraintFactory<Plan, HardSoftScore>;
Streams::new()
.for_each(Plan::assignments())
.join((
Streams::new().for_each(Plan::capacities()),
equal_bi(
|assignment: &Assignment| assignment.capacity_id,
|capacity: &Capacity| Some(capacity.id),
),
))
.group_by(
|assignment: &Assignment, _capacity: &Capacity| assignment.bucket,
sum(|(assignment, capacity): (&Assignment, &Capacity)| {
capacity.amount - assignment.demand
}),
)
load_balance(key_fn, metric_fn)
Measures load imbalance across a grouping key. Returns a LoadBalance<K> with unfairness metric.
.group_by(
|s: &Shift| s.department_idx,
load_balance(|s: &Shift| s.employee_idx, |s: &Shift| 1i64),
)
consecutive_runs(index_fn)
Groups integer points into consecutive runs. Duplicate points increase
item_count() for the run but still count as one unique point for
point_count().
factory.for_each(Schedule::shifts())
.filter(|shift: &Shift| shift.employee_idx.is_some())
.group_by(
|shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
consecutive_runs(|shift: &Shift| shift.date as i64),
)
.penalize(|_employee_idx: &usize, runs: &Runs| {
let excess_days = runs
.runs()
.iter()
.map(|run| run.point_count().saturating_sub(5) as i64)
.sum();
HardSoftScore::of_soft(excess_days)
})
.named("Long work streaks")
Run exposes start(), end(), point_count(), and item_count(). Runs
exposes runs(), point_count(), item_count(), len(), and is_empty().
collect_vec(mapper)
Collects mapped values for each group and returns a CollectedVec<T>.
The collector owns values once and can retract them exactly during incremental
updates, so T does not need Copy, Clone, or PartialEq just to
participate in grouped scoring.
.group_by(
|shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
collect_vec(|shift: &Shift| shift.required_skill.clone()),
)
.penalize(|_employee_idx: &usize, labels: &CollectedVec<String>| {
HardSoftScore::of_soft(labels.len().saturating_sub(5) as i64)
})
indexed_presence(index_fn)
Tracks active ordinal positions and exposes both presence and complement runs. Use it when a rule needs to know which slots are covered and which slots are missing.
.group_by(
|shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
indexed_presence(|shift: &Shift| shift.date as i64),
)
.penalize(|_employee_idx: &usize, presence: &IndexedPresence| {
HardSoftScore::of_soft(presence.complement_runs(0..7).len() as i64)
})
Complemented Groups
Use complement(...) after group_by(...) when a grouped rule needs rows for
keys that have no source matches. For example, workload fairness often needs a
zero-count row for employees with no assigned shifts:
factory.for_each(Schedule::shifts())
.filter(|shift: &Shift| shift.employee_idx.is_some())
.group_by(
|shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
count(),
)
.complement(Schedule::employees(), |employee: &Employee| employee.index, |_employee| 0usize)
.penalize(|_employee_idx: &usize, count: &usize| {
HardSoftScore::of_soft((*count as i64 - 4).abs())
})
.named("Balanced workload")
Projected grouped streams can also use complement(...) after grouping when
the source rows were created by project(...).
Direct cross-join grouped streams can also use complement(...) after grouping.
This is useful when the group key comes from a joined target and the rule needs
default rows for targets that have no matching left-side rows:
type Streams = ConstraintFactory<Plan, HardSoftScore>;
Streams::new()
.for_each(Plan::assignments())
.join((
Streams::new().for_each(Plan::capacities()),
equal_bi(
|assignment: &Assignment| assignment.capacity_id,
|capacity: &Capacity| Some(capacity.id),
),
))
.group_by(
|_assignment: &Assignment, capacity: &Capacity| capacity.id,
sum(|(assignment, _capacity): (&Assignment, &Capacity)| assignment.demand),
)
.complement(
Plan::capacities(),
|capacity: &Capacity| capacity.id,
|_capacity: &Capacity| 0i64,
)
Balance Stream Operation
For simple load balancing without group_by, use the balance stream operation directly:
factory.for_each(Schedule::shifts())
.balance(|shift: &Shift| shift.employee_idx)
.penalize(HardSoftScore::ONE_SOFT)
.named("Fair distribution")
The key function returns Option<K> — None values (unassigned entities) are skipped.
See Also
- Constraint Streams - The
group_byoperation - Constraint Factory Methods - Generated collection sources
- docs.rs/solverforge - Full collector API reference