|
| 1 | +import networkx as nx |
| 2 | +import numpy as np |
| 3 | + |
| 4 | +from votekit.pref_profile import RankProfile |
| 5 | +from votekit.graphs.pairwise_comparison_graph import ( |
| 6 | + pairwise_dict, |
| 7 | + get_dominating_tiers_digraph, |
| 8 | +) |
| 9 | +from votekit.utils import tiebreak_set |
| 10 | + |
| 11 | +from votekit.elections.election_types.ranking.abstract_ranking import RankingElection |
| 12 | +from votekit.elections.election_state import ElectionState |
| 13 | + |
| 14 | + |
| 15 | +class Schulze(RankingElection): |
| 16 | + """ |
| 17 | + See <https://link.springer.com/article/10.1007/s00355-010-0475-4> and <https://arxiv.org/pdf/1804.02973> |
| 18 | +
|
| 19 | + The Schulze method uses the widest path algorithm to determine winners. For each pair |
| 20 | + of candidates, it computes the strength of the strongest path (where the strength of |
| 21 | + a path is the strength of its weakest link). For example, if Alice beats Bob by 2 votes, |
| 22 | + Bob beats Charlie by 4 votes, then the beatpath strength from Alice to Bob is 2. Candidate |
| 23 | + A is preferred to candidate B if the strongest path from A to B is stronger than the |
| 24 | + strongest path from B to A. |
| 25 | +
|
| 26 | + The Schulze method computes the strongest paths between all pairs of candidates: |
| 27 | + 1. Initialize p[i,j] = d[i,j] - d[j,i] (margin of victory) |
| 28 | + 2. For each intermediate candidate k, update p[i,j] = max(p[i,j], min(p[i,k], p[k,j])) |
| 29 | + 3. Candidate i beats j if p[i,j] > p[j,i] |
| 30 | +
|
| 31 | + Args: |
| 32 | + profile (RankProfile): Profile to conduct election on. |
| 33 | + m (int, optional): Number of seats to elect. Defaults to 1. |
| 34 | + tiebreak (str, optional): Method for breaking ties. Defaults to "lexicographic". |
| 35 | + """ |
| 36 | + |
| 37 | + def __init__( |
| 38 | + self, |
| 39 | + profile: RankProfile, |
| 40 | + tiebreak: str = "lexicographic", |
| 41 | + m: int = 1, |
| 42 | + ): |
| 43 | + if m <= 0: |
| 44 | + raise ValueError("m must be strictly positive") |
| 45 | + if len(profile.candidates_cast) < m: |
| 46 | + raise ValueError("Not enough candidates received votes to be elected.") |
| 47 | + self.m = m |
| 48 | + self.tiebreak = tiebreak |
| 49 | + |
| 50 | + def quick_tiebreak_candidates(profile: RankProfile) -> dict[str, float]: |
| 51 | + candidate_set = frozenset(profile.candidates) |
| 52 | + tiebroken_candidates = tiebreak_set(candidate_set, tiebreak=self.tiebreak) |
| 53 | + |
| 54 | + if len(tiebroken_candidates) != len(profile.candidates): |
| 55 | + raise RuntimeError("Tiebreak did not resolve all candidates.") |
| 56 | + |
| 57 | + return {next(iter(c)): i for i, c in enumerate(tiebroken_candidates[::-1])} |
| 58 | + |
| 59 | + super().__init__( |
| 60 | + profile, |
| 61 | + score_function=quick_tiebreak_candidates, |
| 62 | + sort_high_low=True, |
| 63 | + ) |
| 64 | + |
| 65 | + def _is_finished(self): |
| 66 | + """ |
| 67 | + Check if the election is finished. |
| 68 | + """ |
| 69 | + # single round election |
| 70 | + elected_cands = [c for s in self.get_elected() for c in s] |
| 71 | + |
| 72 | + if len(elected_cands) == self.m: |
| 73 | + return True |
| 74 | + return False |
| 75 | + |
| 76 | + def _run_step( |
| 77 | + self, profile: RankProfile, prev_state: ElectionState, store_states=False |
| 78 | + ) -> RankProfile: |
| 79 | + """ |
| 80 | + Run one step of an election from the given profile and previous state. Since this is |
| 81 | + a single-round election, this will complete the election and return the final profile. |
| 82 | +
|
| 83 | + The Schulze method computes the strongest paths between all pairs of candidates: |
| 84 | + 1. Initialize p[i,j] = d[i,j] - d[j,i] (margin of victory) |
| 85 | + 2. For each intermediate candidate k, update p[i,j] = max(p[i,j], min(p[i,k], p[k,j])) |
| 86 | + 3. Candidate i beats j if p[i,j] > p[j,i] |
| 87 | +
|
| 88 | + Args: |
| 89 | + profile (RankProfile): Profile of ballots. |
| 90 | + prev_state (ElectionState): The previous ElectionState. |
| 91 | + store_states (bool, optional): Included for compatibility with the base class but not |
| 92 | + used in this election type. |
| 93 | +
|
| 94 | + Returns: |
| 95 | + RankProfile: The profile of ballots after the round is completed. |
| 96 | + """ |
| 97 | + # Get pairwise comparison data: d[i,j] = number of voters who prefer i to j |
| 98 | + pairwise = pairwise_dict(profile) |
| 99 | + candidates = list(profile.candidates_cast) |
| 100 | + n = len(candidates) |
| 101 | + |
| 102 | + # Create candidate index mapping |
| 103 | + cand_to_idx = {cand: idx for idx, cand in enumerate(candidates)} |
| 104 | + |
| 105 | + # Initialize p[i,j] matrix (strongest path strengths) using NumPy |
| 106 | + # p[i,j] represents the strength of the strongest path from i to j |
| 107 | + p = np.zeros((n, n), dtype=np.float64) |
| 108 | + |
| 109 | + # Step 1: Initialize p[i,j] = d[i,j] - d[j,i] for all pairs (i != j) |
| 110 | + # pairwise_dict returns (a, b): (weight_a, weight_b) where: |
| 111 | + # weight_a = number of voters preferring a to b |
| 112 | + # weight_b = number of voters preferring b to a |
| 113 | + for (a, b), (weight_a, weight_b) in pairwise.items(): |
| 114 | + i = cand_to_idx[a] |
| 115 | + j = cand_to_idx[b] |
| 116 | + # p[i,j] is the margin by which i beats j (can be negative if j beats i) |
| 117 | + p[i, j] = weight_a - weight_b |
| 118 | + # Also set the reverse direction |
| 119 | + p[j, i] = weight_b - weight_a |
| 120 | + |
| 121 | + # Step 2: Floyd-Warshall style algorithm to compute strongest (widest) paths |
| 122 | + # Schulze requires: p[i,j] = max(p[i,j], min(p[i,k], p[k,j])) |
| 123 | + # We use NumPy broadcasting to vectorize the inner two loops for performance. |
| 124 | + for k in range(n): |
| 125 | + # p[:, k:k+1] is column k (shape n x 1), p[k:k+1, :] is row k (shape 1 x n) |
| 126 | + p = np.maximum(p, np.minimum(p[:, k : k + 1], p[k : k + 1, :])) |
| 127 | + |
| 128 | + # Step 3: Build directed graph where i -> j if p[i,j] > p[j,i] |
| 129 | + graph: nx.DiGraph = nx.DiGraph() |
| 130 | + graph.add_nodes_from(candidates) |
| 131 | + |
| 132 | + for i in range(n): |
| 133 | + for j in range(n): |
| 134 | + if i != j and p[i, j] > p[j, i]: |
| 135 | + graph.add_edge(candidates[i], candidates[j]) |
| 136 | + |
| 137 | + # Get dominating tiers from the graph |
| 138 | + dominating_tiers = get_dominating_tiers_digraph(graph) |
| 139 | + |
| 140 | + tiebreak_resolutions = {} |
| 141 | + for candidate_tier_set in dominating_tiers: |
| 142 | + if len(candidate_tier_set) > 1: |
| 143 | + tiebreak_resolutions[frozenset(candidate_tier_set)] = tiebreak_set( |
| 144 | + frozenset(candidate_tier_set), tiebreak=self.tiebreak |
| 145 | + ) |
| 146 | + |
| 147 | + ordered_candidates = [ |
| 148 | + candidate |
| 149 | + for candidate_set in dominating_tiers |
| 150 | + for candidate in sorted(candidate_set) |
| 151 | + ] |
| 152 | + |
| 153 | + elected = tuple(frozenset({c}) for c in ordered_candidates[: self.m]) |
| 154 | + remaining = tuple(frozenset({c}) for c in ordered_candidates[self.m :]) |
| 155 | + |
| 156 | + if store_states: |
| 157 | + new_state = ElectionState( |
| 158 | + round_number=prev_state.round_number + 1, |
| 159 | + elected=elected, |
| 160 | + remaining=remaining, |
| 161 | + tiebreaks=tiebreak_resolutions, |
| 162 | + ) |
| 163 | + |
| 164 | + self.election_states.append(new_state) |
| 165 | + |
| 166 | + return profile |
0 commit comments