<?php
declare(strict_types=0);
/*
* WellCommerce Foundation
*
* This file is part of the WellCommerce package.
*
* (c) Adam Piotrowski <adam@wellcommerce.org>, Adrian Potepa <adrian@wellcommerce.org>
*
* For the full copyright and license information,
* please view the LICENSE file that was distributed with this source code.
*/
namespace WellCommerce\Bundle\AppBundle\EventListener;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use WellCommerce\Bundle\AppBundle\Entity\MessengerQueue;
use WellCommerce\Bundle\CoreBundle\DependencyInjection\AbstractServiceSubscriber;
use WellCommerce\Bundle\CoreBundle\Service\Messenger\MessageInterface;
/**
* Class AdminSubscriber
*
* @author Adam Piotrowski <adam@wellcommerce.org>
*/
class MessengerQueueSubscriber extends AbstractServiceSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents()
{
return [
WorkerMessageHandledEvent::class => ['onWorkerMessageHandledEvent', 0],
WorkerMessageFailedEvent::class => ['onWorkerMessageFailedEvent', 0],
];
}
public function onWorkerMessageHandledEvent(WorkerMessageHandledEvent $event)
{
$env = $event->getEnvelope();
$message = $env->getMessage();
if ($message instanceof MessageInterface) {
$this->registerMessengerQueue($message);
}
}
public function onWorkerMessageFailedEvent(WorkerMessageFailedEvent $event)
{
$env = $event->getEnvelope();
$message = $env->getMessage();
if ($message instanceof MessageInterface) {
$this->registerMessengerQueue($message, $event->getThrowable());
}
}
protected function registerMessengerQueue(MessageInterface $message, ?\Throwable $throwable = null)
{
$queue = new MessengerQueue();
$queue->setType(get_class($message));
$queue->setIdentifier($message->getIdentifier());
$queue->setSuccess(true);
if ($throwable instanceof \Throwable) {
$queue->setSuccess(false);
$queue->setErrorMsg($throwable->getMessage());
}
$this->getEntityManager()->persist($queue);
$this->getEntityManager()->flush();
}
}