Skip to content

Commit

Permalink
Merge branch 'publisher-cc-fix' into 'develop'
Browse files Browse the repository at this point in the history
Fix follow requests which get stuck pending

See merge request pleroma/pleroma!4208
  • Loading branch information
feld committed Aug 6, 2024
2 parents 9cf684d + 0bfe592 commit c019589
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 83 deletions.
1 change: 1 addition & 0 deletions changelog.d/follow-request.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed malformed follow requests that cause them to appear stuck pending due to the recipient being unable to process them.
66 changes: 47 additions & 19 deletions lib/pleroma/web/activity_pub/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Publisher.Prepared
alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Workers.PublisherWorker
Expand Down Expand Up @@ -76,14 +77,13 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
end

@doc """
Publish a single message to a peer. Takes a struct with the following
parameters set:
Prepare an activity for publishing from an Oban job
* `inbox`: the inbox to publish to
* `activity_id`: the internal activity id
* `cc`: the cc recipients relevant to this inbox (optional)
"""
def publish_one(%{inbox: inbox, activity_id: activity_id} = params) do
@spec prepare_one(map()) :: Prepared.t()
def prepare_one(%{inbox: inbox, activity_id: activity_id} = params) do
activity = Activity.get_by_id_with_user_actor(activity_id)
actor = activity.user_actor

Expand All @@ -93,7 +93,7 @@ defmodule Pleroma.Web.ActivityPub.Publisher do

{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)

cc = Map.get(params, :cc)
cc = Map.get(params, :cc, [])

json =
data
Expand All @@ -113,27 +113,49 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
date: date
})

%Prepared{
activity_id: activity_id,
json: json,
date: date,
signature: signature,
digest: digest,
inbox: inbox,
unreachable_since: params[:unreachable_since]
}
end

@doc """
Publish a single message to a peer. Takes a struct with the following
parameters set:
* `activity_id`: the activity id
* `json`: the json payload
* `date`: the signed date from Pleroma.Signature.signed_date()
* `signature`: the signature from Pleroma.Signature.sign/2
* `digest`: base64 encoded the hash of the json payload prefixed with "SHA-256="
* `inbox`: the inbox URI of this delivery
* `unreachable_since`: timestamp the instance was marked unreachable
"""
def publish_one(%Prepared{} = p) do
with {:ok, %{status: code}} = result when code in 200..299 <-
HTTP.post(
inbox,
json,
p.inbox,
p.json,
[
{"Content-Type", "application/activity+json"},
{"Date", date},
{"signature", signature},
{"digest", digest}
{"Date", p.date},
{"signature", p.signature},
{"digest", p.digest}
]
) do
if not Map.has_key?(params, :unreachable_since) || params[:unreachable_since] do
Instances.set_reachable(inbox)
end
maybe_set_reachable(p.unreachable_since, p.inbox)

result
else
{_post_result, %{status: code} = response} = e ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
Logger.metadata(activity: activity_id, inbox: inbox, status: code)
Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
maybe_set_unreachable(p.unreachable_since, p.inbox)
Logger.metadata(activity: p.activity_id, inbox: p.inbox, status: code)
Logger.error("Publisher failed to inbox #{p.inbox} with status #{code}")

case response do
%{status: 400} -> {:cancel, :bad_request}
Expand All @@ -152,15 +174,21 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
connection_pool_snooze()

e ->
unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
Logger.metadata(activity: activity_id, inbox: inbox)
Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
maybe_set_unreachable(p.unreachable_since, p.inbox)
Logger.metadata(activity: p.activity_id, inbox: p.inbox)
Logger.error("Publisher failed to inbox #{p.inbox} #{inspect(e)}")
{:error, e}
end
end

defp connection_pool_snooze, do: {:snooze, 3}

defp maybe_set_reachable(%NaiveDateTime{}, inbox), do: Instances.set_reachable(inbox)
defp maybe_set_reachable(_, _), do: :ok

defp maybe_set_unreachable(nil, inbox), do: Instances.set_unreachable(inbox)
defp maybe_set_unreachable(%NaiveDateTime{}, _), do: :ok

defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
if port == URI.default_port(scheme) do
host
Expand Down
8 changes: 8 additions & 0 deletions lib/pleroma/web/activity_pub/publisher/prepared.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.ActivityPub.Publisher.Prepared do
@type t :: %__MODULE__{}
defstruct [:activity_id, :json, :date, :signature, :digest, :inbox, :unreachable_since]
end
5 changes: 4 additions & 1 deletion lib/pleroma/web/federator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ defmodule Pleroma.Web.Federator do
# Job Worker Callbacks

@spec perform(atom(), any()) :: {:ok, any()} | {:error, any()}
def perform(:publish_one, params), do: Publisher.publish_one(params)
def perform(:publish_one, params) do
Publisher.prepare_one(params)
|> Publisher.publish_one()
end

def perform(:publish, activity) do
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
Expand Down
Loading

0 comments on commit c019589

Please sign in to comment.