Created
March 6, 2024 18:52
-
-
Save jwage/3dfb8f42c36dd6a9b16f3641ab9d77a0 to your computer and use it in GitHub Desktop.
Symfony MessageBusInterface implementation that handles AMQPConnectionException and retries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger; | |
use AMQPConnectionException; | |
use App\Event\Messenger\WorkerMessageDispatchedEvent; | |
use App\Messenger\Stamp\DispatchedAtStamp; | |
use Psr\Log\LoggerInterface; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\MessageBusInterface; | |
use Symfony\Component\Messenger\Stamp\TransportNamesStamp; | |
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; | |
use Throwable; | |
class MessageBus implements MessageBusInterface | |
{ | |
private const MAX_RETRIES = 1; | |
public function __construct( | |
private MessageBusInterface $wrappedBus, | |
private EventDispatcherInterface $eventDispatcher, | |
private LoggerInterface $logger, | |
) { | |
} | |
/** @inheritDoc */ | |
public function dispatch(object $message, array $stamps = []): Envelope | |
{ | |
$stamps[] = new DispatchedAtStamp(); | |
if ($message instanceof RoutableMessage) { | |
$stamps[] = new TransportNamesStamp($message->getTransportNames()); | |
} | |
$envelope = Envelope::wrap($message, $stamps); | |
$event = new WorkerMessageDispatchedEvent($envelope); | |
$this->eventDispatcher->dispatch($event); | |
$retries = 0; | |
retry: | |
try { | |
return $this->wrappedBus->dispatch($event->getEnvelope()); | |
} catch (Throwable $e) { | |
if ($this->isRetryableException($e) && ++$retries <= self::MAX_RETRIES) { | |
$this->logger->warning('Retrying send message {class}', [ | |
'class' => $message::class, | |
'exception' => $e, | |
'message' => $e->getMessage(), | |
'retries' => $retries, | |
'maxRetries' => self::MAX_RETRIES, | |
]); | |
goto retry; | |
} | |
throw $e; | |
} | |
} | |
/** | |
* Proxies all method calls to the wrapped bus. | |
* | |
* @param array<mixed> $arguments | |
*/ | |
public function __call(string $method, array $arguments): mixed | |
{ | |
return $this->wrappedBus->{$method}(...$arguments); | |
} | |
private function isRetryableException(Throwable $e): bool | |
{ | |
return $e->getPrevious() instanceof AMQPConnectionException; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment