Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue #17] fix retry and faillure queue #22

Merged
Merged
68 changes: 68 additions & 0 deletions .github/workflows/quality.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Quality

on:
push:
branches: [ "2.x", "issue_17_failed_message_never_retried" ]
pull_request:
branches: [ "2.x" ]

permissions:
contents: read

jobs:
symfony-tests:
strategy:
fail-fast: false
matrix:
php: ['8.0', '8.1', '8.2', '8.3']
runs-on: ubuntu-latest

services:
mariadb:
image: mariadb:10.11
ports:
- 3306:3306
env:
MYSQL_USER: user
MYSQL_PASSWORD: nopassword
MYSQL_DATABASE: app_test
MYSQL_ROOT_PASSWORD: nopassword
options: --health-cmd="mysqladmin ping" --health-interval=5s --health-timeout=2s --health-retries=3


env:
DATABASE_URL: mysql://root:[email protected]:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4
APP_ENV: test
steps:
# To automatically get bug fixes and new Php versions for shivammathur/setup-php,
# change this to (see https://github.com/shivammathur/setup-php#bookmark-versioning):
# uses: shivammathur/setup-php@v2
- uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: xdebug

- uses: actions/checkout@v4
# - name: Copy .env.test.local
# run: php -r "file_exists('.env.test.local') || copy('.env.test', '.env.test.local');"
- name: Cache Composer packages
id: composer-cache
uses: actions/cache@v4
with:
path: vendor
key: ${{ runner.os }}-php${{ matrix.php }}-${{ hashFiles('**/composer.lock') }}
restore-keys: |
${{ runner.os }}-php${{ matrix.php }}-
- name: Install Dependencies
run: composer install -q --no-ansi --no-interaction --no-scripts --no-progress --prefer-dist
- name: Create Database
run: |
cd tests/Application
bin/console doctrine:migration:migrate -n
- name: Execute tests (Unit and Feature tests) via PHPUnit
run: vendor/bin/phpunit --process-isolation
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
"symfony/dotenv": "^5.4|^6.0",
"symfony/runtime": "^5.4|^6.0",
"symfony/messenger": "^5.4|^6.0",
"symfony/doctrine-messenger": "^5.4|^6.0",
"symfony/monolog-bundle": "^3.10",
"doctrine/doctrine-bundle": "^2.12",
"doctrine/doctrine-migrations-bundle": "^3.3",
"doctrine/orm": "^2.19"
"doctrine/orm": "^2.19",
"phpunit/phpunit": "^9.6"
},
"suggest": {
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"
Expand Down
41 changes: 41 additions & 0 deletions lib/DependencyInjection/MessengerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
use Symfony\Component\DependencyInjection\Reference;
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;

final class MessengerPass implements CompilerPassInterface
{
Expand All @@ -16,6 +19,13 @@ final class MessengerPass implements CompilerPassInterface
private string $win32ServiceRunnerTag = TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG.'.messenger';

public function process(ContainerBuilder $container): void
{
$this->processService($container);
$this->processRetryConfig($container);
$this->processFailledConfig($container);
}

private function processService(ContainerBuilder $container): void
{
$busIds = [];
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
Expand All @@ -42,6 +52,7 @@ public function process(ContainerBuilder $container): void
$serviceRunnerDefinition = $container->getDefinition($win32ServiceId);

$serviceRunnerDefinition->replaceArgument(1, new Reference('messenger.routable_message_bus'));
$serviceRunnerDefinition->replaceArgument(7, new Reference(ResetServicesListener::class));

$serviceRunnerDefinition->replaceArgument(6, array_values($receiverNames));
try {
Expand All @@ -51,4 +62,34 @@ public function process(ContainerBuilder $container): void
}
}
}

private function processFailledConfig(ContainerBuilder $container): void
{
if (
$container->hasDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') === false
|| $container->hasDefinition(SendFailedMessageToFailureTransportListener::class) === false
) {
return;
}

$serviceSF = $container->findDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');

$serviceWin32 = $container->findDefinition(SendFailedMessageToFailureTransportListener::class);
$serviceWin32->replaceArgument('$failureSenders', $serviceSF->getArgument(0));
}

private function processRetryConfig(ContainerBuilder $container): void
{
if (
$container->hasDefinition('messenger.retry.send_failed_message_for_retry_listener') === false
|| $container->hasDefinition(SendFailedMessageForRetryListener::class) === false
) {
return;
}

$serviceSF = $container->findDefinition('messenger.retry.send_failed_message_for_retry_listener');

$serviceWin32 = $container->findDefinition(SendFailedMessageForRetryListener::class);
$serviceWin32->replaceArgument('$sendersLocator', $serviceSF->getArgument(0));
}
}
22 changes: 22 additions & 0 deletions lib/Event/MessengerWorkerStoppedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Win32ServiceBundle\Event;

use Win32ServiceBundle\Model\MessengerServiceRunner;

final class MessengerWorkerStoppedEvent
{
private MessengerServiceRunner $messengerServiceRunner;

public function __construct(MessengerServiceRunner $messengerServiceRunner)
{
$this->messengerServiceRunner = $messengerServiceRunner;
}

public function getMessengerServiceRunner(): MessengerServiceRunner
{
return $this->messengerServiceRunner;
}
}
31 changes: 31 additions & 0 deletions lib/MessengerSubscriber/AddErrorDetailsStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Win32ServiceBundle\MessengerSubscriber;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;

final class AddErrorDetailsStampListener implements EventSubscriberInterface
{
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
$stamp = ErrorDetailsStamp::create($event->getThrowable());
$previousStamp = $event->getEnvelope()->last(ErrorDetailsStamp::class);

// Do not append duplicate information
if ($previousStamp === null || !$previousStamp->equals($stamp)) {
$event->addStamps($stamp);
}
}

public static function getSubscribedEvents(): array
{
return [
// must have higher priority than SendFailedMessageForRetryListener
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 200],
];
}
}
43 changes: 43 additions & 0 deletions lib/MessengerSubscriber/ResetServicesListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

namespace Win32ServiceBundle\MessengerSubscriber;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;

/**
* @author Grégoire Pineau <[email protected]>
*/
class ResetServicesListener implements EventSubscriberInterface
{
private ServicesResetter $servicesResetter;

public function __construct(ServicesResetter $servicesResetter)
{
$this->servicesResetter = $servicesResetter;
}

public function resetServices(MessengerWorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle()) {
$this->servicesResetter->reset();
}
}

public function resetServicesAtStop(MessengerWorkerStoppedEvent $event): void
{
$this->servicesResetter->reset();
}

public static function getSubscribedEvents(): array
{
return [
MessengerWorkerRunningEvent::class => ['resetServices', -1024],
MessengerWorkerStoppedEvent::class => ['resetServicesAtStop', -1024],
];
}
}
Loading