The AggregateRepository is the interface for accessing Aggregates in the EventStore.

The AggregateRepository is typically used in CommandHandlers to load and add AggregateRoots.

The AggregateRepository uses the Unit-Of-Work and Identity-Map patterns to ensure each AggregateRoot is only loaded once per transaction, and that you always get the same AggregateRoot instance back.

The AggregateRepository keeps track of the Unit-Of-Work per thread, so can be shared between threads.

This also means that if you load an AggregateRoot in two different threads it will be two copies of the same AggregateRoot. Any changes made to the AggregateRoot will not be synchronized between the threads. When both threads make changes to the AggregateRoot, upon commit one of the threads will “win”. The other thread will fail with a Sequent::Core::EventStore::OptimisticLockingError.

When you use the AggregateRepository outside a CommandHandler and therefore outside of the scope of the CommandService, you need to manage the state yourself.

You can access the AggregateRepository via Sequent.aggregate_repository

The public API of AggregateRepository:

Adding an AggregateRoot

Sequent.aggregate_repository.add_aggregate(..)

This adds the AggregateRoot in the AggregateRepository. If you use the AggregateRepository outside a CommandHandler you need to ensure the Unit-Of-Work is cleaned using clear or clear! (See section Advanced usage outside the CommandService transaction below).

Loading AggregateRoots

# load single AggregateRoot by id and type
Sequent.aggregate_repository.load_aggregate('23456', Invoice)

# or multiple in single call
Sequent.aggregate_repository.load_aggregates(['65432', '23456'], Invoice)

# load single AggregateRoot up until a moment in time, skipping possible 
# snapshotevents
Sequent.aggregate_repository.load_aggregate_for_snapshotting('12345', Invoice, load_until: '2022-02-14 13:20:48')

The second parameter, the type of AggregateRoot, is optional. If given, it will fail if the type of the loaded AggregateRoot differs.

The third parameter, the load_until parameter, is also optional and only available for the load_aggregate_for_snapshotting method. If given it will load the AggregateRoot up until that moment in time.

Check if AggregateRoots exists


# Will fail with AggregateNotFound if not found
Sequent.aggregate_repository.ensure_exists(aggregate_id, clazz)

# Returns true or false
Sequent.aggregate_repository.contains_aggregate?(aggregate_id)

Advanced usage outside the CommandService transaction

In some use cases you might want to read the AggregateRoots outside the transaction started by the CommandService. Valid use cases are for instance background Workflows. When accessing the AggregateRepository in this case, you need to manually clear the AggregateRepository, otherwise it will keep all the loaded AggregateRoots in memory.

# This will remove all loaded AggregateRoots from the Unit-of-Work cache
Sequent.aggregate_repository.clear

# Idem, but will fail if there are uncommitted events for an AggregateRoot
Sequent.aggregate_repository.clear!