Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions src/votekit/ballot_generator/bloc_slate_generator/cumulative.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import numpy as np
import apportionment.methods as apportion
from typing import Optional

from votekit.ballot import ScoreBallot
from votekit.pref_profile import ScoreProfile
Expand All @@ -21,13 +22,16 @@
# ===========================================================


def _inner_name_cumulative(config: BlocSlateConfig) -> dict[str, ScoreProfile]:
def _inner_name_cumulative(
config: BlocSlateConfig, total_points: int
) -> dict[str, ScoreProfile]:
"""
Inner function to generate cumulative profiles by bloc using the name-Cumulative model.

Args:
config (BlocSlateConfig): Configuration object containing all necessary parameters for
working with a bloc-slate ballot generator.
total_points (int): The total number of points to distribute among candidates.

Returns:
dict[str, ScoreProfile]: A dictionary whose keys are bloc strings and values are
Expand Down Expand Up @@ -67,7 +71,7 @@ def _inner_name_cumulative(config: BlocSlateConfig) -> dict[str, ScoreProfile]:

# Vectorized: one multinomial per ballot -> shape (num_ballots, n_cands)
# Each row sums to n_voters and the entries are counts for each candidate
counts = rng.multinomial(n=n_voters, pvals=p, size=num_ballots)
counts = rng.multinomial(n=total_points, pvals=p, size=num_ballots)

ballots = [
ScoreBallot(scores=dict(zip(non_zero_cands, row.astype(float))), weight=1)
Expand All @@ -86,6 +90,7 @@ def _inner_name_cumulative(config: BlocSlateConfig) -> dict[str, ScoreProfile]:
def name_cumulative_profile_generator(
config: BlocSlateConfig,
*,
total_points: Optional[int] = None,
group_ballots: bool = True,
) -> ScoreProfile:
"""
Expand All @@ -97,14 +102,24 @@ def name_cumulative_profile_generator(
Args:
config (BlocSlateConfig): Configuration object containing all necessary parameters for
working with a bloc-slate ballot generator.

Kwargs:
total_points (Optional[int]): The total number of points to distribute among candidates.
If None, defaults to the number of candidates in the configuration. Defaults to None.
group_ballots (bool): If True, groups identical ballots in the resulting profile.
Defaults to True.

Returns:
ScoreProfile: A `ScoreProfile` object representing the generated ballots.
"""
config.is_valid(raise_errors=True)
pp_by_bloc = _inner_name_cumulative(config)

if total_points is None:
total_points = len(config.candidates)
if total_points <= 0:
raise ValueError("total_points must be a positive integer")

pp_by_bloc = _inner_name_cumulative(config, total_points=total_points)

pp = ScoreProfile()
for profile in pp_by_bloc.values():
Expand All @@ -119,6 +134,7 @@ def name_cumulative_profile_generator(
def name_cumulative_ballot_generator_by_bloc(
config: BlocSlateConfig,
*,
total_points: Optional[int] = None,
group_ballots: bool = True,
) -> dict[str, ScoreProfile]:
"""
Expand All @@ -130,15 +146,25 @@ def name_cumulative_ballot_generator_by_bloc(
Args:
config (BlocSlateConfig): Configuration object containing all necessary parameters for
working with a bloc-slate ballot generator.
group_ballots (bool): If True, groups identical ballots in the resulting profiles.

Kwargs:
total_points (Optional[int]): The total number of points to distribute among candidates.
If None, defaults to the number of candidates in the configuration. Defaults to None.
group_ballots (bool): If True, groups identical ballots in the resulting profile.
Defaults to True.

Returns:
dict[str, ScoreProfile]: A dictionary whose keys are bloc strings and values are
`ScoreProfile` objects representing the generated ballots for each bloc.
"""
config.is_valid(raise_errors=True)
pp_by_bloc = _inner_name_cumulative(config)

if total_points is None:
total_points = len(config.candidates)
if total_points <= 0:
raise ValueError("'total_points' must be a positive integer")

pp_by_bloc = _inner_name_cumulative(config, total_points=total_points)

if group_ballots:
for bloc in pp_by_bloc:
Expand Down
154 changes: 154 additions & 0 deletions tests/ballot_generators/bloc_slate_generators/test_cumulative_bg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import pytest

import votekit.ballot_generator as bg
from votekit import PreferenceInterval


def test_name_cumulative_total_points_None_is_n_cands():
n_voters = 100
bloc_proportions = {"all_voters": 1}
slate_to_candidates = {"all_voters": ["A", "B", "C"]}

preference_mapping = {
"all_voters": {
"all_voters": PreferenceInterval({"A": 0.80, "B": 0.15, "C": 0.05})
}
}

cohesion_mapping = {"all_voters": {"all_voters": 1}}

config = bg.BlocSlateConfig(
n_voters=n_voters,
bloc_proportions=bloc_proportions,
slate_to_candidates=slate_to_candidates,
preference_mapping=preference_mapping,
cohesion_mapping=cohesion_mapping,
)

profile = bg.name_cumulative_profile_generator(config)

df = profile.df

assert all(df[["A", "B", "C"]].sum(axis=1) == 3)
assert all(df[["A", "B", "C"]].max(axis=1) <= 3)
assert (df[["A", "B", "C"]].sum(axis=1) * df["Weight"]).sum() == len(
config.candidates
) * n_voters


def test_name_cumulative_total_points_less_than_n_candidates():
total_points = 2
n_voters = 100
bloc_proportions = {"all_voters": 1}
slate_to_candidates = {"all_voters": ["A", "B", "C"]}

preference_mapping = {
"all_voters": {
"all_voters": PreferenceInterval({"A": 0.80, "B": 0.15, "C": 0.05})
}
}

cohesion_mapping = {"all_voters": {"all_voters": 1}}

config = bg.BlocSlateConfig(
n_voters=n_voters,
bloc_proportions=bloc_proportions,
slate_to_candidates=slate_to_candidates,
preference_mapping=preference_mapping,
cohesion_mapping=cohesion_mapping,
)

profile = bg.name_cumulative_profile_generator(config, total_points=total_points)

df = profile.df

assert all(df[["A", "B", "C"]].sum(axis=1) == total_points)
assert all(df[["A", "B", "C"]].max(axis=1) <= total_points)
assert (
df[["A", "B", "C"]].sum(axis=1) * df["Weight"]
).sum() == total_points * n_voters


def test_name_cumulative_total_points_more_than_n_candidates():
total_points = 12
n_voters = 10
bloc_proportions = {"all_voters": 1}
slate_to_candidates = {"all_voters": ["A", "B", "C"]}

preference_mapping = {
"all_voters": {
"all_voters": PreferenceInterval({"A": 0.80, "B": 0.15, "C": 0.05})
}
}

cohesion_mapping = {"all_voters": {"all_voters": 1}}

config = bg.BlocSlateConfig(
n_voters=n_voters,
bloc_proportions=bloc_proportions,
slate_to_candidates=slate_to_candidates,
preference_mapping=preference_mapping,
cohesion_mapping=cohesion_mapping,
)

profile = bg.name_cumulative_profile_generator(config, total_points=total_points)

df = profile.df

assert all(df[["A", "B", "C"]].sum(axis=1) == total_points)
assert all(df[["A", "B", "C"]].max(axis=1) <= total_points)
assert (
df[["A", "B", "C"]].sum(axis=1) * df["Weight"]
).sum() == total_points * n_voters


def test_name_cumulative_total_points_zero_errors():
total_points = 0
n_voters = 10
bloc_proportions = {"all_voters": 1}
slate_to_candidates = {"all_voters": ["A", "B", "C"]}

preference_mapping = {
"all_voters": {
"all_voters": PreferenceInterval({"A": 0.80, "B": 0.15, "C": 0.05})
}
}

cohesion_mapping = {"all_voters": {"all_voters": 1}}

config = bg.BlocSlateConfig(
n_voters=n_voters,
bloc_proportions=bloc_proportions,
slate_to_candidates=slate_to_candidates,
preference_mapping=preference_mapping,
cohesion_mapping=cohesion_mapping,
)

with pytest.raises(ValueError, match="must be a positive integer"):
bg.name_cumulative_profile_generator(config, total_points=total_points)


def test_name_cumulative_total_points_negative_errors():
total_points = -1
n_voters = 10
bloc_proportions = {"all_voters": 1}
slate_to_candidates = {"all_voters": ["A", "B", "C"]}

preference_mapping = {
"all_voters": {
"all_voters": PreferenceInterval({"A": 0.80, "B": 0.15, "C": 0.05})
}
}

cohesion_mapping = {"all_voters": {"all_voters": 1}}

config = bg.BlocSlateConfig(
n_voters=n_voters,
bloc_proportions=bloc_proportions,
slate_to_candidates=slate_to_candidates,
preference_mapping=preference_mapping,
cohesion_mapping=cohesion_mapping,
)

with pytest.raises(ValueError, match="must be a positive integer"):
bg.name_cumulative_profile_generator(config, total_points=total_points)