Documentation

Collectors

Aggregation functions for group_by operations in constraint streams.

Collectors aggregate values within groups created by group_by(...). They transform a stream of individual matches into grouped summaries. For simple fairness rules, prefer balance(...); use collectors when you need explicit counts, totals, or custom imbalance data.

In the current release, the collector trait is Collector<Input>. Input is the borrowed stream match shape: unary and projected grouping pass &A or &Out, while direct cross-join grouping passes (&A, &B). Stock collectors such as count, sum, collect_vec, consecutive_runs, and indexed_presence use the same protocol across those shapes.

Using Collectors

Pass a collector as the second argument to group_by:

factory.for_each(Schedule::shifts())
    .group_by(
        |shift: &Shift| shift.employee_idx,   // grouping key
        count(),                              // collector
    )
    // Result: grouped stream of (key, usize)

Available Collectors

count()

Counts the number of matches in each group. Returns usize.

.group_by(|s: &Shift| s.employee_idx, count())
// → (key, usize)

sum(mapper)

Sums numeric values in each group. The mapper extracts the value to sum.

.group_by(|s: &Shift| s.employee_idx, sum(|s: &Shift| s.hours))
// → (key, i64)

For direct cross-join grouping, the mapper receives the joined pair as a tuple:

type Streams = ConstraintFactory<Plan, HardSoftScore>;

Streams::new()
    .for_each(Plan::assignments())
    .join((
        Streams::new().for_each(Plan::capacities()),
        equal_bi(
            |assignment: &Assignment| assignment.capacity_id,
            |capacity: &Capacity| Some(capacity.id),
        ),
    ))
    .group_by(
        |assignment: &Assignment, _capacity: &Capacity| assignment.bucket,
        sum(|(assignment, capacity): (&Assignment, &Capacity)| {
            capacity.amount - assignment.demand
        }),
    )

load_balance(key_fn, metric_fn)

Measures load imbalance across a grouping key. Returns a LoadBalance<K> with unfairness metric.

.group_by(
    |s: &Shift| s.department_idx,
    load_balance(|s: &Shift| s.employee_idx, |s: &Shift| 1i64),
)

consecutive_runs(index_fn)

Groups integer points into consecutive runs. Duplicate points increase item_count() for the run but still count as one unique point for point_count().

factory.for_each(Schedule::shifts())
    .filter(|shift: &Shift| shift.employee_idx.is_some())
    .group_by(
        |shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
        consecutive_runs(|shift: &Shift| shift.date as i64),
    )
    .penalize(|_employee_idx: &usize, runs: &Runs| {
        let excess_days = runs
            .runs()
            .iter()
            .map(|run| run.point_count().saturating_sub(5) as i64)
            .sum();
        HardSoftScore::of_soft(excess_days)
    })
    .named("Long work streaks")

Run exposes start(), end(), point_count(), and item_count(). Runs exposes runs(), point_count(), item_count(), len(), and is_empty().

collect_vec(mapper)

Collects mapped values for each group and returns a CollectedVec<T>. The collector owns values once and can retract them exactly during incremental updates, so T does not need Copy, Clone, or PartialEq just to participate in grouped scoring.

.group_by(
    |shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
    collect_vec(|shift: &Shift| shift.required_skill.clone()),
)
.penalize(|_employee_idx: &usize, labels: &CollectedVec<String>| {
    HardSoftScore::of_soft(labels.len().saturating_sub(5) as i64)
})

indexed_presence(index_fn)

Tracks active ordinal positions and exposes both presence and complement runs. Use it when a rule needs to know which slots are covered and which slots are missing.

.group_by(
    |shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
    indexed_presence(|shift: &Shift| shift.date as i64),
)
.penalize(|_employee_idx: &usize, presence: &IndexedPresence| {
    HardSoftScore::of_soft(presence.complement_runs(0..7).len() as i64)
})

Complemented Groups

Use complement(...) after group_by(...) when a grouped rule needs rows for keys that have no source matches. For example, workload fairness often needs a zero-count row for employees with no assigned shifts:

factory.for_each(Schedule::shifts())
    .filter(|shift: &Shift| shift.employee_idx.is_some())
    .group_by(
        |shift: &Shift| shift.employee_idx.unwrap_or(usize::MAX),
        count(),
    )
    .complement(Schedule::employees(), |employee: &Employee| employee.index, |_employee| 0usize)
    .penalize(|_employee_idx: &usize, count: &usize| {
        HardSoftScore::of_soft((*count as i64 - 4).abs())
    })
    .named("Balanced workload")

Projected grouped streams can also use complement(...) after grouping when the source rows were created by project(...).

Direct cross-join grouped streams can also use complement(...) after grouping. This is useful when the group key comes from a joined target and the rule needs default rows for targets that have no matching left-side rows:

type Streams = ConstraintFactory<Plan, HardSoftScore>;

Streams::new()
    .for_each(Plan::assignments())
    .join((
        Streams::new().for_each(Plan::capacities()),
        equal_bi(
            |assignment: &Assignment| assignment.capacity_id,
            |capacity: &Capacity| Some(capacity.id),
        ),
    ))
    .group_by(
        |_assignment: &Assignment, capacity: &Capacity| capacity.id,
        sum(|(assignment, _capacity): (&Assignment, &Capacity)| assignment.demand),
    )
    .complement(
        Plan::capacities(),
        |capacity: &Capacity| capacity.id,
        |_capacity: &Capacity| 0i64,
    )

Balance Stream Operation

For simple load balancing without group_by, use the balance stream operation directly:

factory.for_each(Schedule::shifts())
    .balance(|shift: &Shift| shift.employee_idx)
    .penalize(HardSoftScore::ONE_SOFT)
    .named("Fair distribution")

The key function returns Option<K>None values (unassigned entities) are skipped.

See Also