Adding a New Elasticsearch Index

In this cookbook, we will add a new Elasticsearch index for categories, implement basic functions for data export, implement a cron module, and support for partial export.

New Elasticsearch mapping

As a first step, we need to define Elasticsearch mapping in src/Resources/definition/category/ for all domains (e.g., for 2 domains with ID 1 and 2: 1.json, 2.json).

Example of mapping part for English domain with ID 1 (category/1.json)

{
    "settings": {
        "index": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "analysis": {
            "filter": {
                "english_stop": {
                    "type": "stop",
                    "stopwords": "_english_"
                },
                "english_stemmer": {
                    "type": "stemmer",
                    "language": "english"
                },
                "edge_ngram": {
                    "type": "edgeNGram",
                    "min_gram": 2,
                    "max_gram": 20
                }
            },
            "tokenizer": {
                "keep_special_chars": {
                    "type": "pattern",
                    "pattern": "[^\\p{L}\\d-/]+"
                }
            },
            "analyzer": {
                "full_with_diacritic": {
                    "tokenizer": "keep_special_chars",
                    "filter": ["lowercase"]
                },
                "full_without_diacritic": {
                    "tokenizer": "keep_special_chars",
                    "filter": ["lowercase", "asciifolding"]
                },
                "stemming": {
                    "tokenizer": "standard",
                    "filter": ["lowercase", "english_stemmer", "english_stop", "asciifolding"]
                },
                "edge_ngram_with_diacritic": {
                    "tokenizer": "keep_special_chars",
                    "filter": ["edge_ngram", "lowercase"]
                },
                "edge_ngram_without_diacritic": {
                    "tokenizer": "keep_special_chars",
                    "filter": ["edge_ngram", "lowercase", "asciifolding"]
                },
                "edge_ngram_without_diacritic_html": {
                    "char_filter": "html_strip",
                    "tokenizer": "keep_special_chars",
                    "filter": ["edge_ngram", "lowercase", "asciifolding"]
                },
                "edge_ngram_unanalyzed": {
                    "tokenizer": "keyword",
                    "filter": ["edge_ngram"]
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "name": {
                "type": "text",
                "analyzer": "stemming",
                "fields": {
                    "full_with_diacritic": {
                        "type": "text",
                        "analyzer": "full_with_diacritic"
                    },
                    "full_without_diacritic": {
                        "type": "text",
                        "analyzer": "full_without_diacritic"
                    },
                    "edge_ngram_with_diacritic": {
                        "type": "text",
                        "analyzer": "edge_ngram_with_diacritic"
                    },
                    "edge_ngram_without_diacritic": {
                        "type": "text",
                        "analyzer": "edge_ngram_without_diacritic"
                    },
                    "keyword": {
                        "type": "icu_collation_keyword",
                        "language": "en",
                        "index": false
                    }
                }
            },
            "description": {
                "type": "text",
                "analyzer": "edge_ngram_without_diacritic_html"
            },
            "parrent_id": {
                "type": "integer"
            },
            "level": {
                "type": "integer"
            },
            "uuid": {
                "type": "text"
            }
        }
    }
}

New CategoryIndex

Registering a child of Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractIndex into the application will allow you to manage the Elasticsearch index easily for common tasks (create, delete, migrate structure and export data). All you need to do is implement abstract methods from the parent class. In this cookbook, we will do it step by step.

Create class CategoryIndex in src/Model/Category/Elasticsearch. The class must extend the class Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractIndex.

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractIndex;

class CategoryIndex extends AbstractIndex
{
    /**
     * @return string
     */
    public static function getName(): string
    {
        // TODO: Implement getName() method.
    }

    /**
     * @param int $domainId
     * @return int
     */
    public function getTotalCount(int $domainId): int
    {
        // TODO: Implement getTotalCount() method.
    }

    /**
     * @param int $domainId
     * @param array $restrictToIds
     * @return array
     */
    public function getExportDataForIds(int $domainId, array $restrictToIds): array
    {
        // TODO: Implement getExportDataForIds() method.
    }

    /**
     * @param int $domainId
     * @param int $lastProcessedId
     * @param int $batchSize
     * @return array
     */
    public function getExportDataForBatch(int $domainId,int $lastProcessedId,int $batchSize) : array
    {
        // TODO: Implement getExportDataForBatch() method.
    }
}

Register new index into config/services.yaml

App\Model\Category\Elasticsearch\CategoryIndex: ~

Create a new index in Elasticsearch

To create an index in the Elasticsearch, we need to implement getName() method in CategoryIndex.

The name must be the same as the name of the directory for Elasticsearch mapping in src/Resources/definition/

/**
 * @return string
 */
public static function getName(): string
{
    return 'category';
}

So far, it is the most minimalistic implementation. Now, we are able to create an index in Elasticsearch by running ./phing elasticsearch-index-migrate -D elasticsearch.index=category.

Warning

If you are using version v9.1.0 or lower you have to use command ./phing elasticsearch-index-create -D elasticsearch.index=category for creating index.

Also, we can use ./phing elasticsearch-index-recreate or ./phing elasticsearch-index-delete.

Note

Command ./phing elasticsearch-index-migrate -D elasticsearch.index=category (notice the parameter -D) create Elasticsearch index only for our CategoryIndex.
Using ./phing elasticsearch-index-migrate (without -D flag) will create Elasticsearch indexes for all registered ones in your project (product, category, and so on).

Export data into Elasticsearch

Creating and deleting the index is nice, but it is not really useful. As the next step, we will implement methods getTotalCount() and getExportDataForBatch() to be able to export data.

We can use the already existing method in \Shopsys\FrameworkBundle\Model\Category\CategoryRepository::getTranslatedVisibleSubcategoriesByDomain(). The method getTranslatedVisibleSubcategoriesByDomain() needs as a second argument an instance of DomainConfig, so we need to inject an instance of Domain class along with the instance of CategoryRepository into CategoryIndex.

/**
 * @var \Shopsys\FrameworkBundle\Model\Category\CategoryRepository
 */
protected $categoryRepository;

/**
 * @var \Shopsys\FrameworkBundle\Component\Domain\Domain
 */
protected $domain;

/**
 * @param \Shopsys\FrameworkBundle\Model\Category\CategoryRepository $categoryRepository
 * @param \Shopsys\FrameworkBundle\Component\Domain\Domain $domain
 */
public function __construct(CategoryRepository $categoryRepository, Domain $domain)
{
   $this->categoryRepository = $categoryRepository;
   $this->domain = $domain;
}

When we have injected services we may implement getTotalCount()

/**
 * @param int $domainId
 * @return int
 */
public function getTotalCount(int $domainId): int
{
    return count($this->categoryRepository->getTranslatedVisibleSubcategoriesByDomain(
        $this->categoryRepository->getRootCategory(),
        $this->domain->getDomainConfigById($domainId)
    ));
}

and also a getExportDataForBatch() with a private converting method convertToElastic()

/**
 * @param int $domainId
 * @param int $lastProcessedId
 * @param int $batchSize
 * @return array
 */
public function getExportDataForBatch(int $domainId,int $lastProcessedId,int $batchSize) : array
{
    $domainConfig = $this->domain->getDomainConfigById($domainId);
    $categories = $this->categoryRepository->getTranslatedVisibleSubcategoriesByDomain(
        $this->categoryRepository->getRootCategory(),
        $domainConfig
    );
    $locale = $domainConfig->getLocale();
    foreach ($categories as $category) {
        $result[$category->getId()] = $this->convertToElastic($category, $domainId, $locale);
    }
    return $result;
}

/**
 * @param \App\Model\Category\Category $category
 * @param int $domainId
 * @param string $locale
 * @return array
 */
private function convertToElastic(Category $category, int $domainId, string $locale): array
{
    return [
        'name' => $category->getName($locale),
        'description' => $category->getDescription($domainId),
        'parentId' => $category->getParent()->getId(),
        'level' => $category->getLevel(),
        'uuid' => $category->getUuid(),
    ];
}

Note

The getExportDataForBatch() must return a serialized array of rows indexed by its ID.

Now we can export categories data (name, description, parentId, level, and uuid) into Elasticsearch with ./phing elasticsearch-export -D elasticsearch.index=category (index has to be created first, see the step above).

Exporting via cron

We may automate the export process thanks to the CronModule, which is super easy. To achieve this goal, we must create a new class CategoryExportCronModule in src/Model/Category/Elasticsearch which extends AbstractExportCronModule.

The most important task here is to override the parent constructor and change the type-hint of the first argument to our created index (CategoryIndex).

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use Shopsys\FrameworkBundle\Component\Domain\Domain;
use Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractExportCronModule;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexDefinitionLoader;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexFacade;

class CategoryExportCronModule extends AbstractExportCronModule
{
    /**
     * @param \App\Model\Category\Elasticsearch\CategoryIndex $index
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexFacade $indexFacade
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexDefinitionLoader $indexDefinitionLoader
     * @param \Shopsys\FrameworkBundle\Component\Domain\Domain $domain
     */
    public function __construct(
        CategoryIndex $index,
        IndexFacade $indexFacade,
        IndexDefinitionLoader $indexDefinitionLoader,
        Domain $domain
    ) {
        parent::__construct($index, $indexFacade, $indexDefinitionLoader, $domain);
    }
}

Now, if we have crons properly configured, the new cron will be automatically started with others. Or you may want to configure this cron module with different timing.

Implement partial update

Sometimes, we want to export data immediately after the original data are changed (hiding category, rename, etc.) For this purpose we need to implement CategoryIndex::getExportDataForIds(), scheduler for queuing, and subscriber for processing queue.

Anywhere you want to add a row to immediate export, you may call CategoryExportScheduler::scheduleRowIdForImmediateExport() to add a row to the queue.

Scheduler

The scheduler is used as a queue of IDs we want to export. When we make any changes, we may add an affected category ID into this queue, and the subscriber will pick it up after the request is done.

Create class CategoryExportScheduler which extends AbstractExportScheduler. We do not need to override nor implement any method.

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractExportScheduler;

class CategoryExportScheduler extends AbstractExportScheduler
{
}

Subscriber

Create class CategoryExportSubscriber which extends AbstractExportSubsriber. Override its __construct() and getSubscribedEvents() methods.

Here is important to override constructors arguments type hint. Instead of abstract classes from \Shopsys\FrameworkBundle\Component\Elasticsearch, you need to replace it with our new implementation (CategoryExportScheduler, CategoryIndex).

We have to implement getSubscribedEvents(), which is desired to listening for kernel response, and then call the parent's method exportScheduledRows() with the proper priority. More information about subscribers can be found in Symfony documentation.

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use Doctrine\ORM\EntityManagerInterface;
use Shopsys\FrameworkBundle\Component\Domain\Domain;
use Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractExportSubscriber;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexDefinitionLoader;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexFacade;
use Symfony\Component\HttpKernel\KernelEvents;

class CategoryExportSubscriber extends AbstractExportSubscriber
{
    /**
     * @param \App\Model\Category\Elasticsearch\CategoryExportScheduler $categoryExportScheduler
     * @param \Doctrine\ORM\EntityManagerInterface $entityManager
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexFacade $indexFacade
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexDefinitionLoader $indexDefinitionLoader
     * @param \App\Model\Category\Elasticsearch\CategoryIndex $index
     * @param \Shopsys\FrameworkBundle\Component\Domain\Domain $domain
     */
    public function __construct(
        CategoryExportScheduler $categoryExportScheduler,
        EntityManagerInterface $entityManager,
        IndexFacade $indexFacade,
        IndexDefinitionLoader $indexDefinitionLoader,
        CategoryIndex $index,
        Domain $domain
    ) {
        parent::__construct($categoryExportScheduler, $entityManager, $indexFacade, $indexDefinitionLoader, $index, $domain);
    }

    /**
     * @inheritDoc
     */
    public static function getSubscribedEvents(): array
    {
        return [
            KernelEvents::RESPONSE => [
                ['exportScheduledRows', 0],
            ],
        ];
    }
}

CategoryIndex::getExportDataForIds()

To finish partial exports, we need to implement the last unimplemented method in CategoryIndex – to return all categories by their identifiers. We may also use an already existing method from CategoryRepository.

/**
 * @param int $domainId
 * @param array $restrictToIds
 * @return array
 */
public function getExportDataForIds(int $domainId, array $restrictToIds): array
{
    $categories = $this->categoryRepository->getCategoriesByIds($restrictToIds);

    $domainConfig = $this->domain->getDomainConfigById($domainId);
    $locale = $domainConfig->getLocale();
    foreach ($categories as $category) {
        $result[$category->getId()] = $this->convertToElastic($category, $domainId, $locale);
    }
    return $result;
}

Export only changed categories

Now, when we have a way to export partial data, we can extend the functionality even more and allow to export only changed categories. For this purpose, we need our index to implement \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexSupportChangesOnlyInterface interface and add two more methods: CategoryIndex::getChangedCount() and CategoryIndex::getChangedIdsForBatch().

Note

You will need to implement some way to distinguish changed categories.
For this purpose, you can, for example, add a new attribute to an entity or implement some sort of queue.

CategoryIndex::getChangedIdsForBatch()

This method has to return ID of categories that are considered changed and should be exported. If you use an entity attribute to distinguish changed categories, you can just simply return categories with this attribute set.

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractIndex;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexSupportChangesOnlyInterface;

class CategoryIndex extends AbstractIndex implements IndexSupportChangesOnlyInterface
{

    // ...

    /**
     * @param int $domainId
     * @param int $lastProcessedId
     * @param int $batchSize
     * @return int[]
    */
    public function getChangedIdsForBatch(int $domainId, int $lastProcessedId, int $batchSize): array
    {
        $allCategories = $this->categoryRepository->getTranslatedVisibleSubcategoriesByDomain(
            $this->categoryRepository->getRootCategory(),
            $this->domain->getDomainConfigById($domainId)
        );

        $categoryIds = [];
        foreach ($allCategories as $category) {
            // boolean entity attribute `shouldBeExported` have to be added before. See "Adding New Attribute to an Entity" cookbook
            if ($category->shouldBeExported === true) {
                $categoryIds[] = $category->getId();
            }
        }

        return $categoryIds;
    }
}

Note

In a real application, you should implement the logic with the database query to avoid fetching all data unnecessarily.

Note

In this cookbook, we return all affected categories at once.
On larger data sets, you can offset and limit the results with the $lastProcessedId and $batchSize.

CategoryIndex::getChangedCount()

This method has to return the count of changed categories.

/**
 * @param int $domainId
 * @return int
*/
public function getChangedCount(int $domainId): int
{
    return count($this->getChangedIdsForBatch($domainId));
}

Exporting changed categories via cron

We may automate the export process like we did for full data export earlier. All we need to do is to create a new class CategoryExportChangedCronModule in src/Model/Category/Elasticsearch which extends AbstractExportChangedCronModule.

The most important task here is to override the parent constructor and change the type-hint of the first argument to our created index (CategoryIndex).

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use Shopsys\FrameworkBundle\Component\Domain\Domain;
use Shopsys\FrameworkBundle\Component\Elasticsearch\AbstractExportCronModule;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexDefinitionLoader;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexFacade;

class CategoryExportChangedCronModule extends AbstractExportChangedCronModule
{
    /**
     * @param \App\Model\Category\Elasticsearch\CategoryIndex $index
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexFacade $indexFacade
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexDefinitionLoader $indexDefinitionLoader
     * @param \Shopsys\FrameworkBundle\Component\Domain\Domain $domain
     */
    public function __construct(
        CategoryIndex $index,
        IndexFacade $indexFacade,
        IndexDefinitionLoader $indexDefinitionLoader,
        Domain $domain
    ) {
        parent::__construct($index, $indexFacade, $indexDefinitionLoader, $domain);
    }
}

Mark exported categories as exported

Export now looks for the categories with the attribute shouldExport set to true. This is convenient, but after the export is finished, we should set the attribute back, so the next iterations will not export the same category over and over.

For this, we can use IndexExportedEvent::INDEX_EXPORTED event, which is fired after each index is finished for all domains.

We create a simple subscriber to this event and mark categories as exported.

declare(strict_types=1);

namespace App\Model\Category\Elasticsearch;

use App\Model\Category\Category;
use Doctrine\ORM\EntityManagerInterface;
use Shopsys\FrameworkBundle\Component\Elasticsearch\IndexExportedEvent;

class CategoryExportedSubscriber {

    /**
     * @var \Doctrine\ORM\EntityManagerInterface
     */
    protected $entityManager;

    /**
     * @param \Doctrine\ORM\EntityManagerInterface $entityManager
     */
    public function __construct(EntityManagerInterface $entityManager)
    {
        $this->entityManager = $entityManager;
    }

    /**
     * @param \Shopsys\FrameworkBundle\Component\Elasticsearch\IndexExportedEvent $indexExportedEvent
     */
    public function markAllAsExported(IndexExportedEvent $indexExportedEvent): void
    {
        if ($indexExportedEvent->getIndex() instanceof CategoryIndex) {
            $this->entityManager
                ->createQuery('UPDATE ' . Category::class . ' c SET c.shouldExport = FALSE')
                ->execute();

        }
    }

    /**
     * @return array
     */
    public static function getSubscribedEvents(): array
    {
        return [
            IndexExportedEvent::INDEX_EXPORTED => 'markAllAsExported',
        ];
    }
}

Conclusion

We have created a new index category in Elasticsearch. We were able to fill it with data (by a cron or immediately after a row is changed).

Categories can be marked for export (from different parts of the application, integration with IS, etc.), and only such categories can be exported.

From now you are able to use Elasticsearch as a data source (data providing functionality is needed to be implemented) instead of PostgreSQL, which will improve the performance of your application.