Skip to content

Instantly share code, notes, and snippets.

@pascualmg
Last active November 24, 2022 10:26
Show Gist options
  • Save pascualmg/f334591c20b76ccfa06126d9e626c97e to your computer and use it in GitHub Desktop.
Save pascualmg/f334591c20b76ccfa06126d9e626c97e to your computer and use it in GitHub Desktop.
MiddleWare HOC to drop messages with concrete conditions
<?php
namespace PcComponentes\AfterSales\Infrastructure\MessagingMiddleware;
use PcComponentes\Ddd\Util\Message\Message;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Throwable;
class DropExceptionWhenMiddleware implements MiddlewareInterface
{
private $predicateHasToDrop;
/**
* @param callable|null $predicate
* @example new DropExceptionWhenMiddleware(function (Message $message, Throwable $throwable) { return $Throwable instanceof UnrecoverableExceptionInterface ; })
*/
public function __construct(callable $predicate)
{
$this->predicateHasToDrop = $predicate;
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$isNotFromConsumer = static fn(Envelope $envelope) => $envelope->all(AmqpReceivedStamp::class) === [];
$messageFromEnvelope = static fn(Envelope $envelope): Message => $envelope->getMessage();
if ($isNotFromConsumer($envelope)) {
return $stack->next()->handle($envelope, $stack);
}
try {
return $stack->next()->handle($envelope, $stack);
} catch (Throwable $exception) {
if (($this->predicateHasToDrop)(
$messageFromEnvelope($envelope),
$exception,
)) {
return $envelope;
}
throw $exception;
}
}
}
@pascualmg
Copy link
Author

pascualmg commented Nov 24, 2022

based on

<?php

declare(strict_types=1);

namespace PcComponentes\AfterSales\Infrastructure\MessagingMiddleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;

class ExpectedExceptionMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $isFromConsumer = static fn (Envelope $envelope) => $envelope->all(AmqpReceivedStamp::class) !== [];

        if (false === $isFromConsumer($envelope)) {
            return $stack->next()->handle($envelope, $stack);
        }

        try {
            return $stack->next()->handle($envelope, $stack);
        } catch (UnrecoverableExceptionInterface $unrecoverableException) {
            return $envelope;
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment