diff --git a/.credo.exs b/.credo.exs new file mode 100644 index 0000000..f396a01 --- /dev/null +++ b/.credo.exs @@ -0,0 +1,204 @@ +# This file contains the configuration for Credo and you are probably reading +# this after creating it with `mix credo.gen.config`. +# +# If you find anything wrong or unclear in this file, please report an +# issue on GitHub: https://github.com/rrrene/credo/issues +# +%{ + # + # You can have as many configs as you like in the `configs:` field. + configs: [ + %{ + # + # Run any config using `mix credo -C `. If no config name is given + # "default" is used. + # + name: "default", + # + # These are the files included in the analysis: + files: %{ + # + # You can give explicit globs or simply directories. + # In the latter case `**/*.{ex,exs}` will be used. + # + included: [ + "lib/", + "test/", + "priv/repo/" + ], + excluded: [~r"/_build/", ~r"/deps/", ~r"/node_modules/"] + }, + # + # Load and configure plugins here: + # + plugins: [], + # + # If you create your own checks, you must specify the source files for + # them here, so they can be loaded by Credo before running the analysis. + # + requires: [], + # + # If you want to enforce a style guide and need a more traditional linting + # experience, you can change `strict` to `true` below: + # + strict: false, + # + # To modify the timeout for parsing files, change this value: + # + parse_timeout: 5000, + # + # If you want to use uncolored output by default, you can change `color` + # to `false` below: + # + color: true, + # + # You can customize the parameters of any check by adding a second element + # to the tuple. + # + # To disable a check put `false` as second element: + # + # {Credo.Check.Design.DuplicatedCode, false} + # + checks: %{ + enabled: [ + # + ## Consistency Checks + # + {Credo.Check.Consistency.ExceptionNames, []}, + {Credo.Check.Consistency.LineEndings, []}, + {Credo.Check.Consistency.ParameterPatternMatching, []}, + {Credo.Check.Consistency.SpaceAroundOperators, []}, + {Credo.Check.Consistency.SpaceInParentheses, []}, + {Credo.Check.Consistency.TabsOrSpaces, []}, + + # + ## Design Checks + # + # You can customize the priority of any check + # Priority values are: `low, normal, high, higher` + # + {Credo.Check.Design.AliasUsage, + [priority: :low, if_nested_deeper_than: 2, if_called_more_often_than: 0]}, + {Credo.Check.Design.TagTODO, false}, + {Credo.Check.Design.TagFIXME, false}, + + # + ## Readability Checks + # + {Credo.Check.Readability.AliasOrder, []}, + {Credo.Check.Readability.FunctionNames, []}, + {Credo.Check.Readability.LargeNumbers, []}, + {Credo.Check.Readability.MaxLineLength, [priority: :low, max_length: 120]}, + {Credo.Check.Readability.ModuleAttributeNames, []}, + {Credo.Check.Readability.ModuleDoc, []}, + {Credo.Check.Readability.ModuleNames, []}, + {Credo.Check.Readability.ParenthesesInCondition, []}, + {Credo.Check.Readability.ParenthesesOnZeroArityDefs, []}, + {Credo.Check.Readability.PipeIntoAnonymousFunctions, []}, + {Credo.Check.Readability.PredicateFunctionNames, []}, + {Credo.Check.Readability.PreferImplicitTry, []}, + {Credo.Check.Readability.RedundantBlankLines, []}, + {Credo.Check.Readability.Semicolons, []}, + {Credo.Check.Readability.SpaceAfterCommas, []}, + {Credo.Check.Readability.StringSigils, []}, + {Credo.Check.Readability.TrailingBlankLine, []}, + {Credo.Check.Readability.TrailingWhiteSpace, []}, + {Credo.Check.Readability.UnnecessaryAliasExpansion, []}, + {Credo.Check.Readability.VariableNames, []}, + {Credo.Check.Readability.WithSingleClause, []}, + + # + ## Refactoring Opportunities + # + {Credo.Check.Refactor.Apply, []}, + {Credo.Check.Refactor.CondStatements, []}, + {Credo.Check.Refactor.CyclomaticComplexity, [max_complexity: 10]}, + {Credo.Check.Refactor.FunctionArity, []}, + {Credo.Check.Refactor.LongQuoteBlocks, []}, + {Credo.Check.Refactor.MatchInCondition, []}, + {Credo.Check.Refactor.MapJoin, []}, + {Credo.Check.Refactor.NegatedConditionsInUnless, []}, + {Credo.Check.Refactor.NegatedConditionsWithElse, []}, + {Credo.Check.Refactor.Nesting, [max_nesting: 3]}, + {Credo.Check.Refactor.UnlessWithElse, []}, + {Credo.Check.Refactor.WithClauses, []}, + {Credo.Check.Refactor.FilterCount, []}, + {Credo.Check.Refactor.FilterFilter, []}, + {Credo.Check.Refactor.RejectReject, []}, + {Credo.Check.Refactor.RedundantWithClauseResult, []}, + + # + ## Warnings + # + {Credo.Check.Warning.ApplicationConfigInModuleAttribute, []}, + {Credo.Check.Warning.BoolOperationOnSameValues, []}, + {Credo.Check.Warning.Dbg, []}, + {Credo.Check.Warning.ExpensiveEmptyEnumCheck, []}, + {Credo.Check.Warning.IExPry, []}, + {Credo.Check.Warning.IoInspect, []}, + {Credo.Check.Warning.MissedMetadataKeyInLoggerConfig, []}, + {Credo.Check.Warning.OperationOnSameValues, []}, + {Credo.Check.Warning.OperationWithConstantResult, []}, + {Credo.Check.Warning.RaiseInsideRescue, []}, + {Credo.Check.Warning.SpecWithStruct, []}, + {Credo.Check.Warning.WrongTestFileExtension, []}, + {Credo.Check.Warning.UnusedEnumOperation, []}, + {Credo.Check.Warning.UnusedFileOperation, []}, + {Credo.Check.Warning.UnusedKeywordOperation, []}, + {Credo.Check.Warning.UnusedListOperation, []}, + {Credo.Check.Warning.UnusedPathOperation, []}, + {Credo.Check.Warning.UnusedRegexOperation, []}, + {Credo.Check.Warning.UnusedStringOperation, []}, + {Credo.Check.Warning.UnusedTupleOperation, []}, + {Credo.Check.Warning.UnsafeExec, []} + ], + disabled: [ + # + # Controversial and experimental checks (opt-in, just move the check to `:enabled` + # and be sure to use `mix credo --strict` to see low priority checks) + # + {Credo.Check.Consistency.MultiAliasImportRequireUse, []}, + {Credo.Check.Consistency.UnusedVariableNames, []}, + {Credo.Check.Design.DuplicatedCode, []}, + {Credo.Check.Design.SkipTestWithoutComment, []}, + {Credo.Check.Readability.AliasAs, []}, + {Credo.Check.Readability.BlockPipe, []}, + {Credo.Check.Readability.ImplTrue, []}, + {Credo.Check.Readability.MultiAlias, []}, + {Credo.Check.Readability.NestedFunctionCalls, []}, + {Credo.Check.Readability.OneArityFunctionInPipe, []}, + {Credo.Check.Readability.SeparateAliasRequire, []}, + {Credo.Check.Readability.SingleFunctionToBlockPipe, []}, + {Credo.Check.Readability.SinglePipe, []}, + {Credo.Check.Readability.Specs, []}, + {Credo.Check.Readability.StrictModuleLayout, []}, + {Credo.Check.Readability.WithCustomTaggedTuple, []}, + {Credo.Check.Readability.OnePipePerLine, []}, + {Credo.Check.Refactor.ABCSize, []}, + {Credo.Check.Refactor.AppendSingleItem, []}, + {Credo.Check.Refactor.DoubleBooleanNegation, []}, + {Credo.Check.Refactor.FilterReject, []}, + {Credo.Check.Refactor.IoPuts, []}, + {Credo.Check.Refactor.MapMap, []}, + {Credo.Check.Refactor.ModuleDependencies, []}, + {Credo.Check.Refactor.NegatedIsNil, []}, + {Credo.Check.Refactor.PassAsyncInTestCases, []}, + {Credo.Check.Refactor.PipeChainStart, []}, + {Credo.Check.Refactor.RejectFilter, []}, + {Credo.Check.Refactor.VariableRebinding, []}, + {Credo.Check.Warning.LazyLogging, []}, + {Credo.Check.Warning.LeakyEnvironment, []}, + {Credo.Check.Warning.MapGetUnsafePass, []}, + {Credo.Check.Warning.MixEnv, []}, + {Credo.Check.Warning.UnsafeToAtom, []} + + # {Credo.Check.Refactor.MapInto, []}, + + # + # Custom checks can be created using `mix credo.gen.check`. + # + ] + } + } + ] +} diff --git a/config/config.exs b/config/config.exs index 73b7632..870aef2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -51,6 +51,13 @@ config :atlas, Oban, imports: 1, exchanges: 1, schedule_generator: 1 + ], + plugins: [ + {Oban.Plugins.Cron, + crontab: [ + # Run every 5 minutes + {"*/5 * * * *", Atlas.Workers.ShiftExchange} + ]} ] # Import environment specific config. This must remain at the bottom diff --git a/lib/atlas/exchange.ex b/lib/atlas/exchange.ex index ee336f3..68c296b 100644 --- a/lib/atlas/exchange.ex +++ b/lib/atlas/exchange.ex @@ -42,7 +42,7 @@ defmodule Atlas.Exchange do |> apply_filters(opts) |> where([r], r.status == :pending) |> distinct([r], [r.shift_from, r.shift_to]) - |> order_by([r], asc: r.inserted_at) + |> order_by([r], asc: r.shift_from, asc: r.shift_to, asc: r.inserted_at) |> Repo.all() end @@ -236,10 +236,58 @@ defmodule Atlas.Exchange do Constants.set("exchange_period_end", nil) end + # Advisory lock key for exchange solving - must be unique across the application + @exchange_lock_key 736_849_275 + + @doc """ + Processes all pending exchange requests: first solves cycles, then auto-approves eligible requests. + Uses an advisory lock to prevent concurrent execution which could cause race conditions. + """ + def process_exchanges(opts \\ []) do + case Repo.query("SELECT pg_try_advisory_lock($1)", [@exchange_lock_key]) do + {:ok, %{rows: [[true]]}} -> + try do + solve_result = do_solve_exchanges(opts) + do_maybe_auto_approve_pending_requests(opts) + solve_result + after + Repo.query("SELECT pg_advisory_unlock($1)", [@exchange_lock_key]) + end + + {:ok, %{rows: [[false]]}} -> + %{cycles_found: 0, requests_approved: 0, skipped: :lock_held} + + {:error, _} -> + %{cycles_found: 0, requests_approved: 0, skipped: :lock_error} + end + end + @doc """ Attempts to solve pending shift exchange requests by finding cycles in the exchange graph and approving them. + Uses an advisory lock to prevent concurrent execution which could cause race conditions. """ def solve_exchanges(opts \\ []) do + # Use pg_try_advisory_lock to attempt to acquire an exclusive lock + # If another worker is already processing, we skip this run + case Repo.query("SELECT pg_try_advisory_lock($1)", [@exchange_lock_key]) do + {:ok, %{rows: [[true]]}} -> + try do + do_solve_exchanges(opts) + after + # Always release the lock when done + Repo.query("SELECT pg_advisory_unlock($1)", [@exchange_lock_key]) + end + + {:ok, %{rows: [[false]]}} -> + # Another worker is already processing exchanges, skip this run + %{cycles_found: 0, requests_approved: 0, skipped: :lock_held} + + {:error, _} -> + %{cycles_found: 0, requests_approved: 0, skipped: :lock_error} + end + end + + defp do_solve_exchanges(opts) do pending = list_unique_pending_shift_exchange_requests(opts) graph = build_graph(pending) cycles = find_cycles(graph) @@ -257,6 +305,25 @@ defmodule Atlas.Exchange do end def maybe_auto_approve_pending_requests(opts \\ []) do + # Use the same advisory lock as solve_exchanges to prevent concurrent modifications + case Repo.query("SELECT pg_try_advisory_lock($1)", [@exchange_lock_key]) do + {:ok, %{rows: [[true]]}} -> + try do + do_maybe_auto_approve_pending_requests(opts) + after + Repo.query("SELECT pg_advisory_unlock($1)", [@exchange_lock_key]) + end + + {:ok, %{rows: [[false]]}} -> + # Another worker is already processing exchanges, skip + :skipped + + {:error, _} -> + :error + end + end + + defp do_maybe_auto_approve_pending_requests(opts) do pending_requests = list_pending_shift_exchange_requests(opts) Enum.each(pending_requests, fn req -> @@ -347,18 +414,78 @@ defmodule Atlas.Exchange do end defp approve_cycle(g, cycle_vertices) do - cycle_set = MapSet.new(cycle_vertices) + # Get only the edges that form this specific cycle path + # For cycle [A, B, C], the edges are: A→B, B→C, C→A + cycle_edges = + cycle_vertices + |> Enum.with_index() + |> Enum.map(fn {vertex, idx} -> + next_vertex = Enum.at(cycle_vertices, rem(idx + 1, length(cycle_vertices))) + {vertex, next_vertex} + end) + |> MapSet.new() - # Get all requests whose edges are inside the cycle + # Get only the requests for edges that are part of this specific cycle requests = g |> Graph.edges() - |> Enum.filter(fn e -> e.v1 in cycle_set and e.v2 in cycle_set end) + |> Enum.filter(fn e -> MapSet.member?(cycle_edges, {e.v1, e.v2}) end) |> Enum.map(& &1.label) |> Enum.filter(&match?(%ShiftExchangeRequest{}, &1)) + # Validate that we have exactly one request per edge in the cycle + # If not, the cycle is incomplete and should not be approved + if length(requests) != length(cycle_vertices) do + {:error, :incomplete_cycle} + else + approve_complete_cycle(g, cycle_vertices, requests) + end + end + + defp approve_complete_cycle(_g, cycle_vertices, requests) do + # Build the multi with locking to prevent race conditions multi = - Enum.reduce(requests, Multi.new(), fn %ShiftExchangeRequest{} = req, m -> + Multi.new() + # First, lock all shifts involved in the cycle to serialize concurrent operations + |> Multi.run(:lock_shifts, fn _repo, _changes -> + shift_ids = cycle_vertices + + locked_shifts = + Repo.all( + from(s in Atlas.University.Degrees.Courses.Shifts.Shift, + where: s.id in ^shift_ids, + lock: "FOR UPDATE", + order_by: s.id + ) + ) + + {:ok, locked_shifts} + end) + # Then, lock and verify all requests are still pending + |> Multi.run(:verify_requests_pending, fn _repo, _changes -> + request_ids = Enum.map(requests, & &1.id) + + current_requests = + Repo.all( + from(r in ShiftExchangeRequest, + where: r.id in ^request_ids, + lock: "FOR UPDATE" + ) + ) + + # Check that all requests are still pending + all_pending = Enum.all?(current_requests, &(&1.status == :pending)) + + if all_pending and length(current_requests) == length(requests) do + {:ok, current_requests} + else + {:error, :requests_already_processed} + end + end) + + # Add the actual operations for each request + multi = + Enum.reduce(requests, multi, fn %ShiftExchangeRequest{} = req, m -> # Update the request status to approved m = Multi.update( @@ -367,7 +494,7 @@ defmodule Atlas.Exchange do Ecto.Changeset.change(req, status: :approved) ) - # Delete the student’s enrollment in the origin shift + # Delete the student's enrollment in the origin shift m = Multi.delete_all( m, @@ -424,7 +551,43 @@ defmodule Atlas.Exchange do defp maybe_auto_approve_request(%ShiftExchangeRequest{} = req) do Multi.new() + |> Multi.run(:verify_request_pending, fn _repo, _changes -> + # Lock the request and verify it's still pending + # This prevents race conditions with cycle approval + current_request = + Repo.one( + from(r in ShiftExchangeRequest, + where: r.id == ^req.id, + lock: "FOR UPDATE" + ) + ) + + case current_request do + nil -> {:error, :request_not_found} + %{status: :pending} -> {:ok, current_request} + _ -> {:error, :request_already_processed} + end + end) |> Multi.run(:shift_has_space, fn _repo, _changes -> + # Lock both shifts to prevent concurrent modifications + # This ensures that capacity checks are serialized + from_shift = + Repo.one!( + from(s in Atlas.University.Degrees.Courses.Shifts.Shift, + where: s.id == ^req.shift_from, + lock: "FOR UPDATE" + ) + ) + + to_shift = + Repo.one!( + from(s in Atlas.University.Degrees.Courses.Shifts.Shift, + where: s.id == ^req.shift_to, + lock: "FOR UPDATE" + ) + ) + + # Now count enrollments while holding the locks enrolled_count = Repo.one( from(se in ShiftEnrollment, @@ -433,15 +596,19 @@ defmodule Atlas.Exchange do ) ) - shift = Shifts.get_shift!(req.shift_to) - from_shift = Shifts.get_shift!(req.shift_from) - from_shift_occupation = University.get_shift_enrollment_count(req.shift_from) + from_shift_occupation = + Repo.one( + from(se in ShiftEnrollment, + where: se.shift_id == ^req.shift_from and se.status in [:active, :inactive], + select: count(se.student_id, :distinct) + ) + ) cond do from_shift_occupation - 1 <= round(from_shift.capacity * 0.8) -> {:error, :shift_from_underoccupied} - enrolled_count < shift.capacity -> + enrolled_count < to_shift.capacity -> {:ok, :has_space} true -> diff --git a/lib/atlas/workers/shift_exchange.ex b/lib/atlas/workers/shift_exchange.ex index e5be1f5..bc48fee 100644 --- a/lib/atlas/workers/shift_exchange.ex +++ b/lib/atlas/workers/shift_exchange.ex @@ -8,8 +8,7 @@ defmodule Atlas.Workers.ShiftExchange do @impl Oban.Worker def perform(_job) do - Exchange.solve_exchanges() - Exchange.maybe_auto_approve_pending_requests() + Exchange.process_exchanges() :ok end