Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/poll-refresh' into fork
Browse files Browse the repository at this point in the history
  • Loading branch information
mkljczk committed Oct 3, 2024
2 parents 91c7d79 + a3038aa commit 3c78a45
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 175 deletions.
1 change: 1 addition & 0 deletions changelog.d/poll-refresh.change
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Poll results refreshing is handled asynchronously and will not attempt to keep fetching updates to a closed poll.
21 changes: 0 additions & 21 deletions lib/pleroma/object.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,6 @@ defmodule Pleroma.Object do
def get_by_id(nil), do: nil
def get_by_id(id), do: Repo.get(Object, id)

@spec get_by_id_and_maybe_refetch(integer(), list()) :: Object.t() | nil
def get_by_id_and_maybe_refetch(id, opts \\ []) do
with %Object{updated_at: updated_at} = object <- get_by_id(id) do
if opts[:interval] &&
NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
case Fetcher.refetch_object(object) do
{:ok, %Object{} = object} ->
object

e ->
Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
object
end
else
object
end
else
nil -> nil
end
end

def get_by_ap_id(nil), do: nil

def get_by_ap_id(ap_id) do
Expand Down
18 changes: 16 additions & 2 deletions lib/pleroma/web/mastodon_api/controllers/poll_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Workers.PollWorker

action_fallback(Pleroma.Web.MastodonAPI.FallbackController)

Expand All @@ -27,12 +28,16 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
defdelegate open_api_operation(action), to: Pleroma.Web.ApiSpec.PollOperation

@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@poll_refresh_interval 120

@doc "GET /api/v1/polls/:id"
def show(%{assigns: %{user: user}, private: %{open_api_spex: %{params: %{id: id}}}} = conn, _) do
with %Object{} = object <- Object.get_by_id_and_maybe_refetch(id, interval: 60),
%Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]),
with %Object{} = object <- Object.get_by_id(id),
%Activity{} = activity <-
Activity.get_create_by_object_ap_id_with_object(object.data["id"]),
true <- Visibility.visible_for_user?(activity, user) do
maybe_refresh_poll(activity)

try_render(conn, "show.json", %{object: object, for: user})
else
error when is_nil(error) or error == false ->
Expand Down Expand Up @@ -70,4 +75,13 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
end
end)
end

defp maybe_refresh_poll(%Activity{object: %Object{} = object} = activity) do
with false <- activity.local,
{:ok, end_time} <- NaiveDateTime.from_iso8601(object.data["closed"]),
{_, :lt} <- {:closed_compare, NaiveDateTime.compare(object.updated_at, end_time)} do
PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id})
|> Oban.insert(unique: [period: @poll_refresh_interval])
end
end
end
37 changes: 30 additions & 7 deletions lib/pleroma/workers/poll_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,44 @@ defmodule Pleroma.Workers.PollWorker do
alias Pleroma.Activity
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.Object.Fetcher

@stream_out_impl Pleroma.Config.get(
[__MODULE__, :stream_out],
Pleroma.Web.ActivityPub.ActivityPub
)

@impl true
def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
with %Activity{} = activity <- find_poll_activity(activity_id),
with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)},
{:ok, notifications} <- Notification.create_poll_notifications(activity) do
# Schedule a final refresh
__MODULE__.new(%{"op" => "refresh", "activity_id" => activity_id})
|> Oban.insert()

Notification.stream(notifications)
else
{:error, :poll_activity_not_found} = e -> {:cancel, e}
{:activity, nil} -> {:cancel, :poll_activity_not_found}
e -> {:error, e}
end
end

@impl true
def timeout(_job), do: :timer.seconds(5)
def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do
with {_, %Activity{object: object}} <-
{:activity, Activity.get_by_id_with_object(activity_id)},
{_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do
stream_update(activity_id)

defp find_poll_activity(activity_id) do
with nil <- Activity.get_by_id(activity_id) do
{:error, :poll_activity_not_found}
:ok
else
{:activity, nil} -> {:cancel, :poll_activity_not_found}
{:refetch, _} = e -> {:cancel, e}
end
end

@impl true
def timeout(_job), do: :timer.seconds(5)

def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do
with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <-
Object.normalize(activity),
Expand All @@ -49,4 +66,10 @@ defmodule Pleroma.Workers.PollWorker do
end

def schedule_poll_end(activity), do: {:error, activity}

defp stream_update(activity_id) do
Activity.get_by_id(activity_id)
|> Activity.normalize()
|> @stream_out_impl.stream_out()
end
end
144 changes: 0 additions & 144 deletions test/pleroma/object_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ defmodule Pleroma.ObjectTest do
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo

import ExUnit.CaptureLog
import Mox
import Pleroma.Factory
import Tesla.Mock

alias Pleroma.Activity
alias Pleroma.Hashtag
alias Pleroma.Object
alias Pleroma.Repo
Expand Down Expand Up @@ -282,148 +280,6 @@ defmodule Pleroma.ObjectTest do
end
end

describe "get_by_id_and_maybe_refetch" do
setup do
mock(fn
%{method: :get, url: "https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"} ->
%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_original.json"),
headers: HttpRequestMock.activitypub_object_headers()
}

env ->
apply(HttpRequestMock, :request, [env])
end)

mock_modified = fn resp ->
mock(fn
%{method: :get, url: "https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"} ->
resp

env ->
apply(HttpRequestMock, :request, [env])
end)
end

on_exit(fn -> mock(fn env -> apply(HttpRequestMock, :request, [env]) end) end)

[mock_modified: mock_modified]
end

test "refetches if the time since the last refetch is greater than the interval", %{
mock_modified: mock_modified
} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)

Object.set_cache(object)

assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0

mock_modified.(%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_modified.json"),
headers: HttpRequestMock.activitypub_object_headers()
})

updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 8
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 3
end

test "returns the old object if refetch fails", %{mock_modified: mock_modified} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)

Object.set_cache(object)

assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0

assert capture_log(fn ->
mock_modified.(%Tesla.Env{status: 404, body: ""})

updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 0
end) =~
"[error] Couldn't refresh https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d"
end

test "does not refetch if the time since the last refetch is greater than the interval", %{
mock_modified: mock_modified
} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)

Object.set_cache(object)

assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0

mock_modified.(%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_modified.json"),
headers: HttpRequestMock.activitypub_object_headers()
})

updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: 100)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 0
end

test "preserves internal fields on refetch", %{mock_modified: mock_modified} do
%Object{} =
object =
Object.normalize("https://patch.cx/objects/9a172665-2bc5-452d-8428-2361d4c33b1d",
fetch: true
)

Object.set_cache(object)

assert Enum.at(object.data["oneOf"], 0)["replies"]["totalItems"] == 4
assert Enum.at(object.data["oneOf"], 1)["replies"]["totalItems"] == 0

user = insert(:user)
activity = Activity.get_create_by_object_ap_id(object.data["id"])
{:ok, activity} = CommonAPI.favorite(activity.id, user)
object = Object.get_by_ap_id(activity.data["object"])

assert object.data["like_count"] == 1

mock_modified.(%Tesla.Env{
status: 200,
body: File.read!("test/fixtures/tesla_mock/poll_modified.json"),
headers: HttpRequestMock.activitypub_object_headers()
})

updated_object = Object.get_by_id_and_maybe_refetch(object.id, interval: -1)
object_in_cache = Object.get_cached_by_ap_id(object.data["id"])
assert updated_object == object_in_cache
assert Enum.at(updated_object.data["oneOf"], 0)["replies"]["totalItems"] == 8
assert Enum.at(updated_object.data["oneOf"], 1)["replies"]["totalItems"] == 3

assert updated_object.data["like_count"] == 1
end
end

describe ":hashtags association" do
test "Hashtag records are created with Object record and updated on its change" do
user = insert(:user)
Expand Down
28 changes: 28 additions & 0 deletions test/pleroma/web/mastodon_api/controllers/poll_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.MastodonAPI.PollControllerTest do
use Oban.Testing, repo: Pleroma.Repo
use Pleroma.Web.ConnCase, async: true

alias Pleroma.Object
Expand All @@ -27,6 +28,33 @@ defmodule Pleroma.Web.MastodonAPI.PollControllerTest do
response = json_response_and_validate_schema(conn, 200)
id = to_string(object.id)
assert %{"id" => ^id, "expired" => false, "multiple" => false} = response

# Local activities should not generate an Oban job to refresh
assert activity.local

refute_enqueued(
worker: Pleroma.Workers.PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end

test "creates an oban job to refresh poll if activity is remote", %{conn: conn} do
user = insert(:user, local: false)
question = insert(:question, user: user)
activity = insert(:question_activity, question: question, local: false)

# Ensure this is not represented as a local activity
refute activity.local

object = Object.normalize(activity, fetch: false)

get(conn, "/api/v1/polls/#{object.id}")
|> json_response_and_validate_schema(200)

assert_enqueued(
worker: Pleroma.Workers.PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end

test "does not expose polls for private statuses", %{conn: conn} do
Expand Down
35 changes: 35 additions & 0 deletions test/pleroma/workers/poll_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@ defmodule Pleroma.Workers.PollWorkerTest do
# Ensure notifications were streamed out when job executes
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], :_))
assert called(Pleroma.Web.Push.send(:_))

# Ensure we scheduled a final refresh of the poll
assert_enqueued(
worker: PollWorker,
args: %{"op" => "refresh", "activity_id" => activity.id}
)
end
end

test "poll refresh" do
user = insert(:user, local: false)
question = insert(:question, user: user)
activity = insert(:question_activity, question: question)

PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id})
|> Oban.insert()

expected_job_args = %{"activity_id" => activity.id, "op" => "refresh"}

assert_enqueued(args: expected_job_args)

with_mocks([
{
Pleroma.Web.Streamer,
[],
[
stream: fn _, _ -> nil end
]
}
]) do
[job] = all_enqueued(worker: PollWorker)
PollWorker.perform(job)

# Ensure updates are streamed out
assert called(Pleroma.Web.Streamer.stream(["user", "list", "public", "public:local"], :_))
end
end
end
3 changes: 2 additions & 1 deletion test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ defmodule Pleroma.Factory do

def question_factory(attrs \\ %{}) do
user = attrs[:user] || insert(:user)
closed = attrs[:closed] || DateTime.utc_now() |> DateTime.add(86_400) |> DateTime.to_iso8601()

data = %{
"id" => Pleroma.Web.ActivityPub.Utils.generate_object_id(),
Expand All @@ -251,7 +252,7 @@ defmodule Pleroma.Factory do
"to" => ["https://www.w3.org/ns/activitystreams#Public"],
"cc" => [user.follower_address],
"context" => Pleroma.Web.ActivityPub.Utils.generate_context_id(),
"closed" => DateTime.utc_now() |> DateTime.add(86_400) |> DateTime.to_iso8601(),
"closed" => closed,
"content" => "Which flavor of ice cream do you prefer?",
"oneOf" => [
%{
Expand Down

0 comments on commit 3c78a45

Please sign in to comment.