Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix endorser Nats connection going stale #1122

Merged
merged 24 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion app/services/onboarding/util/register_issuer_did.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
from logging import Logger

from aries_cloudcontroller import (
Expand All @@ -18,6 +19,8 @@
)
from shared import ACAPY_ENDORSER_ALIAS

MAX_ATTEMPTS = int(os.getenv("WAIT_ISSUER_DID_MAX_ATTEMPTS", "15"))


async def create_connection_with_endorser(
*,
Expand Down Expand Up @@ -241,7 +244,7 @@ async def wait_issuer_did_transaction_endorsed(
issuer_controller: AcaPyClient,
issuer_connection_id: str,
logger: Logger,
max_attempts: int = 15,
max_attempts: int = MAX_ATTEMPTS,
retry_delay: float = 1.0,
) -> None:
attempt = 0
Expand Down
3 changes: 3 additions & 0 deletions app/tests/e2e/test_did_exchange.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Optional

import pytest
Expand Down Expand Up @@ -99,8 +100,9 @@
filter_map={"their_did": alice_did},
)
finally:
await asyncio.sleep(1) # Short sleep assists in avoiding 500 error
# Delete connection records:
await alice_member_client.delete(

Check failure on line 105 in app/tests/e2e/test_did_exchange.py

View workflow job for this annotation

GitHub Actions / JUnit Test Report

test_did_exchange.test_create_did_exchange_request[clean-clean-None-None-False]

fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}
Raw output
self = <shared.util.rich_async_client.RichAsyncClient object at 0x7f3cefbde8a0>
url = '/v1/connections/b8223e61-b6ab-418b-a1c7-d24a1943a061', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_GPXZA - HTTP DELETE `/v1/connections/b8223e61-b6ab-418b-a1c7-d24a1943a061` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
>               response.raise_for_status()

shared/util/rich_async_client.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [500 Internal Server Error]>

    def raise_for_status(self) -> Response:
        """
        Raise the `HTTPStatusError` if one occurred.
        """
        request = self._request
        if request is None:
            raise RuntimeError(
                "Cannot call `raise_for_status` as the request "
                "instance has not been set on this response."
            )
    
        if self.is_success:
            return self
    
        if self.has_redirect_location:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "Redirect location: '{0.headers[location]}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
        else:
            message = (
                "{error_type} '{0.status_code} {0.reason_phrase}' for url '{0.url}'\n"
                "For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/{0.status_code}"
            )
    
        status_class = self.status_code // 100
        error_types = {
            1: "Informational response",
            3: "Redirect response",
            4: "Client error",
            5: "Server error",
        }
        error_type = error_types.get(status_class, "Invalid status code")
        message = message.format(self, error_type=error_type)
>       raise HTTPStatusError(message, request=request, response=self)
E       httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'https://governance-tenant-web.cloudapi.dev.didxtech.com/tenant/v1/connections/b8223e61-b6ab-418b-a1c7-d24a1943a061'
E       For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500

/usr/local/lib/python3.12/site-packages/httpx/_models.py:763: HTTPStatusError

The above exception was the direct cause of the following exception:

alice_member_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7f3cefbde8a0>
faber_client = <shared.util.rich_async_client.RichAsyncClient object at 0x7f3cefbfb620>
alice_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7f3cefbde900>
faber_acapy_client = <aries_cloudcontroller.acapy_client.AcaPyClient object at 0x7f3cefbddc40>
use_did = None, use_did_method = None, use_public_did = False

    @pytest.mark.anyio
    @pytest.mark.parametrize(
        "use_did,use_did_method,use_public_did",
        [
            (None, None, False),
            (True, None, False),
            (None, "did:peer:2", False),
            (None, "did:peer:4", False),
            (True, "did:peer:4", False),
            (None, None, True),
        ],
    )
    async def test_create_did_exchange_request(
        alice_member_client: RichAsyncClient,
        faber_client: RichAsyncClient,
        alice_acapy_client: AcaPyClient,
        faber_acapy_client: AcaPyClient,
        use_did: Optional[str],
        use_did_method: Optional[str],
        use_public_did: bool,
    ):
        faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
    
        request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
    
        if use_did:
            new_did = await acapy_wallet.create_did(controller=alice_acapy_client)
            request_data["use_did"] = new_did.did
    
        if use_did_method:
            request_data["use_did_method"] = use_did_method
    
        if use_public_did:
            request_data["use_public_did"] = use_public_did
    
        if use_public_did:  # Alice doesn't have a public DID
            with pytest.raises(HTTPException) as exc_info:
                response = await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert exc_info.value.detail == """{"detail":"No public DID configured."}"""
    
        elif use_did and use_did_method:
            with pytest.raises(HTTPException) as exc_info:
                await alice_member_client.post(
                    f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
                    params=request_data,
                )
            assert exc_info.value.status_code == 400
            assert (
                exc_info.value.detail
                == """{"detail":"Cannot specify both use_did and use_did_method."}"""
            )
        else:
            response = await alice_member_client.post(
                f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
            )
            assert response.status_code == 200
            connection_record = response.json()
            assert_that(connection_record).contains("connection_id", "state")
            assert_that(connection_record["state"]).is_equal_to("request-sent")
    
            alice_connection_id = connection_record["connection_id"]
            alice_did = connection_record["my_did"]
    
            try:
                # Due to auto-accepts, Alice's connection is complete
                assert await check_webhook_state(
                    alice_member_client,
                    topic="connections",
                    state="completed",
                    filter_map={"connection_id": alice_connection_id},
                )
                # Faber now has a complete connection too
                assert await check_webhook_state(
                    faber_client,
                    topic="connections",
                    state="completed",
                    filter_map={"their_did": alice_did},
                )
            finally:
                await asyncio.sleep(1)  # Short sleep assists in avoiding 500 error
                # Delete connection records:
>               await alice_member_client.delete(
                    f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
                )

app/tests/e2e/test_did_exchange.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <shared.util.rich_async_client.RichAsyncClient object at 0x7f3cefbde8a0>
url = '/v1/connections/b8223e61-b6ab-418b-a1c7-d24a1943a061', kwargs = {}
response = <Response [500 Internal Server Error]>, code = 500
message = '{"detail":"Internal Server Error"}'
log_message = 'Tenant alice_GPXZA - HTTP DELETE `/v1/connections/b8223e61-b6ab-418b-a1c7-d24a1943a061` failed. Status code: 500. Response: `{"detail":"Internal Server Error"}`.'

    async def delete(self, url: str, **kwargs) -> Response:
        try:
            response = await super().delete(url, **kwargs)
            if self.raise_status_error:
                response.raise_for_status()
        except HTTPStatusError as e:
            code = e.response.status_code
            message = e.response.text
            log_message = f"{self.name} DELETE `{url}` failed. Status code: {code}. Response: `{message}`."
            logger.error(log_message)
    
>           raise HTTPException(status_code=code, detail=message) from e
E           fastapi.exceptions.HTTPException: 500: {"detail":"Internal Server Error"}

shared/util/rich_async_client.py:68: HTTPException
f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
)

Expand Down Expand Up @@ -171,6 +173,7 @@
filter_map={"connection_id": faber_connection_id},
)
finally:
await asyncio.sleep(1) # Short sleep assists in avoiding 500 error
# Delete connection records:
await alice_member_client.delete(
f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ services:
env_file:
- environments/endorser/endorser.env
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3009/health || exit 1"]
test: ["CMD-SHELL", "curl -f http://localhost:3009/health/ready || exit 1"]
interval: 30s
timeout: 10s
retries: 3
Expand Down
33 changes: 32 additions & 1 deletion endorser/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from contextlib import asynccontextmanager

Expand Down Expand Up @@ -58,7 +59,7 @@ async def scalar_html():
)


@app.get("/health")
@app.get("/health/live")
@inject
async def health_check(
endorsement_processor: EndorsementProcessor = Depends(
Expand All @@ -71,3 +72,33 @@ async def health_check(
raise HTTPException(
status_code=503, detail="One or more background tasks are not running."
)


@app.get("/health/ready")
@inject
async def health_ready(
endorsement_processor: EndorsementProcessor = Depends(
Provide[Container.endorsement_processor]
),
):
try:
jetstream_status = await asyncio.wait_for(
endorsement_processor.check_jetstream(), timeout=5.0
)
except asyncio.TimeoutError:
raise HTTPException(
status_code=503,
detail={"status": "not ready", "error": "JetStream health check timed out"},
)
except Exception as e: # pylint: disable=W0718
raise HTTPException(
status_code=500, detail={"status": "error", "error": str(e)}
)

if jetstream_status["is_working"]:
return {"status": "ready", "jetstream": jetstream_status}
else:
raise HTTPException(
status_code=503,
detail={"status": "not ready", "jetstream": "JetStream not ready"},
)
25 changes: 22 additions & 3 deletions endorser/services/endorsement_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from aries_cloudcontroller import AcaPyClient
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError

from endorser.util.endorsement import accept_endorsement, should_accept_endorsement
from shared.constants import (
Expand Down Expand Up @@ -85,7 +86,7 @@ async def _process_endorsement_requests(self) -> NoReturn:
subscription = await self._subscribe()
while True:
try:
messages = await subscription.fetch(batch=1, timeout=60)
messages = await subscription.fetch(batch=1, timeout=60, heartbeat=1)
for message in messages:
message_subject = message.subject
message_data = message.data.decode()
Expand All @@ -103,9 +104,13 @@ async def _process_endorsement_requests(self) -> NoReturn:
)
finally:
await message.ack()
except TimeoutError:
logger.trace("Timeout fetching messages continuing...")
except FetchTimeoutError:
logger.trace("FetchTimeoutError continuing...")
await asyncio.sleep(0.1)
except TimeoutError as e:
logger.warning("Timeout error fetching messages re-subscribing: {}", e)
await subscription.unsubscribe()
subscription = await self._subscribe()
except Exception: # pylint: disable=W0718
logger.exception("Unexpected error in endorsement processing loop")
await asyncio.sleep(2)
Expand Down Expand Up @@ -192,3 +197,17 @@ async def _subscribe(self) -> JetStreamContext.PullSubscription:
logger.debug("Subscribed to NATS subject")

return subscription

async def check_jetstream(self):
try:
account_info = await self.jetstream.account_info()
is_working = account_info.streams > 0
logger.trace("JetStream check completed. Is working: {}", is_working)
return {
"is_working": is_working,
"streams_count": account_info.streams,
"consumers_count": account_info.consumers,
}
except Exception: # pylint: disable=W0718
logger.exception("Caught exception while checking jetstream status")
return {"is_working": False}
3 changes: 2 additions & 1 deletion endorser/tests/test_endorser_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from nats.aio.client import Client as NATS
from nats.errors import BadSubscriptionError, Error, TimeoutError
from nats.js.client import JetStreamContext
from nats.js.errors import FetchTimeoutError

from endorser.services.endorsement_processor import EndorsementProcessor
from shared.constants import (
Expand Down Expand Up @@ -128,7 +129,7 @@ async def test_process_endorsement_requests_timeout(
mock_nats_client.pull_subscribe.return_value = mock_subscription

# Simulate a timeout, then a CancelledError to stop the loop
mock_subscription.fetch.side_effect = [TimeoutError, asyncio.CancelledError]
mock_subscription.fetch.side_effect = [FetchTimeoutError, asyncio.CancelledError]

# Test
with patch("asyncio.sleep") as mock_sleep:
Expand Down
95 changes: 92 additions & 3 deletions endorser/tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, Mock, patch

import pytest
from fastapi import FastAPI, HTTPException

from endorser.main import app, app_lifespan, health_check
from endorser.main import app, app_lifespan, health_check, health_ready
from endorser.services.endorsement_processor import EndorsementProcessor


def test_create_app():
Expand All @@ -14,8 +16,9 @@ def test_create_app():
# Get all routes in app
routes = [route.path for route in app.routes]

expected_routes = "/health"
assert expected_routes in routes
expected_routes = ["/health/live", "/health/ready", "/docs"]
for route in expected_routes:
assert route in routes


@pytest.mark.anyio
Expand Down Expand Up @@ -63,3 +66,89 @@ async def test_health_check_unhealthy():
await health_check(endorsement_processor=endorsement_processor_mock)
assert exc_info.value.status_code == 503
assert exc_info.value.detail == "One or more background tasks are not running."


@pytest.mark.anyio
async def test_health_ready_success():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.return_value = {
"is_working": True,
"streams_count": 1,
"consumers_count": 1,
}

response = await health_ready(endorsement_processor=endorsement_processor_mock)

assert response == {
"status": "ready",
"jetstream": {"is_working": True, "streams_count": 1, "consumers_count": 1},
}


@pytest.mark.anyio
async def test_health_ready_jetstream_not_working():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.return_value = {
"is_working": False,
"error": "No streams available",
}

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 503
assert exc_info.value.detail == {
"status": "not ready",
"jetstream": "JetStream not ready",
}


@pytest.mark.anyio
async def test_health_ready_timeout():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.side_effect = asyncio.TimeoutError()

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 503
assert exc_info.value.detail == {
"status": "not ready",
"error": "JetStream health check timed out",
}


@pytest.mark.anyio
async def test_health_ready_unexpected_error():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.side_effect = Exception(
"Unexpected error"
)

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 500
assert exc_info.value.detail == {"status": "error", "error": "Unexpected error"}


@pytest.mark.anyio
async def test_health_ready_with_timeout():
endorsement_processor_mock = AsyncMock(spec=EndorsementProcessor)

endorsement_processor_mock.check_jetstream.side_effect = (
asyncio.TimeoutError
) # Simulate a slow response

with pytest.raises(HTTPException) as exc_info:
await health_ready(endorsement_processor=endorsement_processor_mock)

assert exc_info.value.status_code == 503
assert exc_info.value.detail == {
"status": "not ready",
"error": "JetStream health check timed out",
}
Loading