Create asynchronous notifier

This article provides step-by-step instructions on how to create a notification feature that leverages asynchronous processing.

Let's say we want to notify the registered customers about a new product that has been added to the system.

1. Create a message class NewProductNotificationMessage

This class represents a message that will be sent to the queue for further processing.

// app/src/Model/Product/Notification/ProductNotificationMessage.php

declare(strict_types=1);

namespace App\Model\Product\Notification;

class ProductNotificationMessage
{
    /**
     * @param int $productId
     */
    public function __construct(
        public readonly int $productId,
    ) {
    }
}

Notice that we store only the product ID in the message. This is because the message will be serialized and stored in the queue and any necessary data will be retrieved from the database when the message is processed.

2. Create a message dispatcher class ProductNotificationMessageDispatcher

This class is responsible for dispatching the message to the queue.

// app/src/Model/Product/Notification/ProductNotificationMessageDispatcher.php

declare(strict_types=1);

namespace App\Model\Product\Notification;

use Shopsys\FrameworkBundle\Component\Messenger\AbstractMessageDispatcher;

class ProductRecalculationDispatcher extends AbstractMessageDispatcher
{
    /**
     * @param int $productId
     */
    public function dispatchProductId(int $productId): void
    {
        $this->messageBus->dispatch(new ProductNotificationMessage($productId));
    }
}

The messageBus dependency is injected by the parent class AbstractMessageDispatcher and ready at your disposal.

This class will be autoconfigured as a service, so now we can call it from the ProductFacade class in a method responsible for product creation.

// app/src/Model/Product/ProductFacade.php

declare(strict_types=1);

namespace App\Model\Product;

use Shopsys\FrameworkBundle\Model\Product\ProductFacade as BaseProductFacade;
// ... other use statements

class ProductFacade extends BaseProductFacade

    public function __construct(
        // ... other dependencies
        private readonly ProductNotificationMessageDispatcher $productNotificationMessageDispatcher,
    ) {
        // parent::__construct(...);
    }

    public function create(ProductData $productData)
    {
        $product = parent::create($productData);

        $this->productNotificationMessageDispatcher->dispatchProductId($product->getId());

        return $product;
    }

3. Symfony messenger configuration

Now we have a base for our asynchronous notification feature, but we need to configure the Symfony messenger component to know about our message and how to handle it.

# app/config/packages/messenger.yaml

framework:
    messenger:
        # ... other configuration
        transports:
            # ... other transports
            product_notification_transport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    vhost: '%env(MESSENGER_TRANSPORT_VHOST)%'
                    exchange:
                        name: product_notification
                        type: direct
                    queues:
                        product_notification: ~

        routing:
            # ... other routing
            App\Model\Product\Notification\ProductNotificationMessage: product_notification_transport

Now we have transport called product_notification_transport that is configured to use the RabbitMQ queue called product_notification. The message of type ProductNotificationMessage will be sent to this queue thanks to the routing configuration.

In the other words, anytime we call the ProductNotificationMessageDispatcher::dispatchProductId() method, the message will be sent to the RabbitMQ queue.

4. Create a message handler class ProductNotificationMessageHandler

Now we need to create a class that will handle the message from the queue. This class will be autoconfigured as a service, so we can inject any dependencies we need.

This class will do the heavy lifting of the notification feature – it will retrieve the data it needs and send the notification to the customers. Notice that the __invoke() gets the message as an argument, so anything we sent in the message will be available in the handler.

// app/src/Model/Product/Notification/ProductNotificationMessageHandler.php

declare(strict_types=1);

namespace App\Model\Product\Notification;

use App\Model\Component\Notification\CustomerNotificationFacade;
use Shopsys\FrameworkBundle\Model\Product\ProductFacade;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class ProductNotificationMessageHandler
{
    public function __construct(
        protected readonly ProductFacade $productFacade,
        protected readonly CustomerNotificationFacade $customerNotificationFacade,
    ) {
    }

    public function __invoke(ProductNotificationMessage $productNotificationMessage): void
    {
        $product = $this->productFacade->getById($productNotificationMessage->productId);

        $this->customerNotificationFacade->sendNotificationToRegisteredCustomers($product);
    }
}

Note

Thanks to the #[AsMessageHandler] attribute the class will be autoconfigured as a message handler.

The exact implementation of the CustomerNotificationFacade is not important for this article. Let's presume the sendNotificationToRegisteredCustomers() method accepts the product and sends email to all registered customers.

Now we can create a new product in the administration, and run the consumer that will process the messages from the queue and test our feature.

# run inside the docker container

php bin/console messenger:consume product_notification_transport

5. Local consumer configuration

The previous step works, but it is not very convenient to run the consumer manually every time we want to process the messages from the queue. We can configure the consumer to run automatically in the background by just adding a line to the app/docker/php-fpm/consumer-entrypoint.sh file.

    #!/bin/sh

    TIME_LIMIT=${1:-60}

    php ./bin/console messenger:consume \
        product_recalculation_priority_high \
        product_recalculation_priority_regular \
        placed_order_transport \
+       product_notification_transport \
        --time-limit=$TIME_LIMIT

You can notice the running container php-consumer. This container is responsible for running the consumer in the background and will process messages from all transports configured in this file. For more information, see the Consumer infrastructure section.

6. Prepare for deployment

The last step is to prepare the deployment configuration. Thanks to the shopsys/deployment package, adding the new consumer is just a matter of a line in a app/deploy/deploy-project.sh file:

    # ... rest of the script

    function merge() {
        # Specify consumers configuration with the default configuration in the format:
        # <consumer-name>:<transport-names-separated-by-space>:<number-of-consumers>
        DEFAULT_CONSUMERS=(
            "product-recalculation:product_recalculation_priority_high product_recalculation_priority_regular:1"
            "placed_order:placed_order_transport:1"
+           "product-notification:product_notification_transport:1"
        )

        source "${BASE_PATH}/vendor/shopsys/deployment/deploy/functions.sh"
        merge_configuration
        create_consumer_manifests $DEFAULT_CONSUMERS
    }

Now we can deploy the project, and the new consumer will be automatically configured and started. For more information, see the Consumer infrastructure section.

Conclusion

Now we have a fully functional asynchronous notification feature that will email all registered customers when a new product is created. Thanks to the asynchronous processing, the product creation will not be slowed down by the notification feature – it's not necessary to load the customer's data during the product creation.

For more information about the asynchronous processing, see the Asynchronous Processing and the Symfony documentation about the Messenger component.