Construyendo un Event Storage

Cuando utilizamos event sourcing, como parte de su funcionamiento se realiza el almacenamiento de eventos con el fin de realizar la reconstrucción del estado de una entidad a partir de un conjunto de eventos almacenados.

En el presente artículo se mostrará un ejemplo de implementación de un Event Storage con varios de los problemas que suelen ser comunes al momento de realizar dicha implementación. Se debe considerar que aunque el ejemplo muestra una solución que no es altamente eficiente, sin embargo, satisface la necesidad de un gran porcentaje de aplicaciones.

Un Event Storage puede utilizar con almacén de información: archivos, base de datos relacionales, bases no sql, blob storage, etc, para este caso se hará uso de una base de datos relacional dado que se pretende evitar varios problemas técnicos que no forman parte de este ejemplo, como podrían ser: modelos de confirmación de transacciones, tipo de almacenamiento de datos para obtener un buen desempeño en la lectura.

Estructura

Un Event Storage básico puede representarse en base de datos relacional utilizando solo dos tablas:

Tabla Event

ColumnaTipo
AggregateIdBinary(16)
DataBlob
VersionInt

Esta tabla representa el registro de eventos. Existirá un registro (fila) por evento en esta tabla y el evento en sí se almacenará en la columna Data de la tabla.

El evento se almacenará utilizando algún mecanismo de serialización, aunque el uso del patrón memento puede ser altamente ventajoso para este tipo de problemas.

Esta tabla contiene la cantidad mínima de columnas requerida para implementar el Event Storage, sin embargo se pueden agregar más columnas como: la fecha y hora en la que se realizó el cambio, o metadatos con información relevante del contexto asociado al cambio de estado.

Los metadatos pueden incluir información como el nombre de usuario que inició el cambio, la dirección Ip, nivel de permisos, etc.

La columna Version es utilizada para almacenar el número de versión de cada evento, en la mayoría de los casos un número entero incremental funcionará, así cada evento que se almacena tendrá un número de versión incrementada. El número de versión es único y secuencial solo dentro del contexto de un agregado dado, està información es relevante para asegurar la consistencia

La columna AggregateId es la llave foránea que debe ser indexada, esto nos lleva a la siguiente tabla que es la tabla

Tabla Aggregate

ColumnaTipo
AggregateIdBinary(16)
TypeVarchar
Versionint

La tabla Aggregate representa los agregado existentes en el sistema, cada uno de ellos debe contener un registro en esta tabla. Junto con el identificador hay una desnormalización del número de versión actual. Esto se debe a una optimización, ya que podría derivarse de la tabla Events, pero es mucho más rápido consultar la desnormalización que consultar la tabla de Events. Este valor también se utiliza en la verificación de concurrencia optimista.

Para este ejemplo, también se incluye una columna [Type], que corresponde al nombre completo del agregado que se almacena. Es útil por varios motivos, por ejemplo procesos de debug, sin embargo no es necesario para la creación de un Event Storage básico.

Operaciones

Un Event Storage es mucho más simple que los otros mecanismos de almacenamiento de datos dado que no admiten consultas de propósito general. El Event Storage en su nivel más simple tiene dos operaciones, lo que hace que el almacenamiento de eventos sea más simple que la mayoría de mecanismos de almacenamiento de datos, esto también facilita su optimización.

La primera operación es obtener todos los eventos para un agregado. Es extremadamente importante que los eventos se ordenen en el mismo orden en el que fueron escritos, el número de versión se puede utilizar para este propósito. Todo esto se puede hacer simplemente utilizando una consulta de nuestro sistema de gestión de bases de datos relacionales.

SELECT * FROM events WHERE aggregate_id = '' ORDER BY version;

Esta es la única consulta que debe ejecutar un sistema productivo contra el Event Storage. Una posible segunda consulta que podría ser de utilidad es el limitar el conjunto de resultados en función de una fecha para conocer el estado de un objeto en un punto en el tiempo, pero generalmente un sistema en producción no debería realizar este tipo de operaciones.

La otra operación que debe soportar el Event Storage es la escritura de un conjunto de eventos de un Raíz Agregado. Esto se puede realizar por código o mediante un procedimiento almacenado. Se prefiere utilizar un procedimiento almacenado o código SQL generado dinámicamente que contiene sentencias de control sobre un proceso de inserción que realice la ejecución de múltiples consultas de ida y vuelta. El pseudocódigo para el proceso de inserción se lo puede observar a continuación:

BEGIN
	version = SELECT version FROM aggregates WHERE aggregate_id = ''
	IF version IS NULL
		INSERT INTO aggregates version = 0
	END
	IF expectedversion != version
		RAISE concurrency problem
	FOREEACH event
		INSERT event WITH incremented version number
	UPDATE aggregate with last version number
END TRANSACTION

La operación de escritura también es relativamente simple, aunque encontramos algunas sutilezas dentro de ella. El proceso básico es:

  • se verifica si existe un agregado con el identificador único que debe usar
  • si no hay uno, lo creará y considerará que la versión actual es cero
  • luego, intentará hacer una prueba de concurrencia optimista con los datos recibidos para insertar, si la versión esperada no coincide con la versión real, generará una excepción de concurrencia
  • siempre que las versiones sean las mismas, se recorrerá los eventos que se están guardando y los insertará en la tabla de Events, incrementando el número de versión en uno para cada evento
  • finalmente actualizará la tabla Aggregates con el nuevo número de versión actual del agregado

Es importante tener en cuenta que estas operaciones se realizan en una transacción, dado que es necesario garantizar la concurrencia optimista funcione en un entorno distribuido.

El contrato para un Event Storage se puede definir con la siguiente interfaz:

public interface IEventStore {
    void SaveChanges(Uuid AggregateId, int OriginatingVersion, IEnumerable<Event> events);
    IEnumerable<Event> GetEventsFor(Uuid AggregateId); 
}

Aunque no es un tarea trivial el crear un Event Storage para utilizarlo en un ambiente productivo, los conceptos generales detrás de un Event Storage son relativamente sencillos. Sin embargo, hay una optimización muy importante que debería existir en la mayoría de los sistemas y es el concepto de “Instantánea rodante” o “Rolling Snapshot”.

Rolling Snapshots

Los Rolling Snapshots evitan la necesidad de cargar todos los eventos al realizar una consulta para reconstruir un Agregado. Son una desnormalización del agregado en un punto dado del tiempo. Un cambio en la lógica de consulta y una tabla adicional son todo lo que se necesita para agregar la funcionalidad de snapshots al Event Storage.

Tabla Snapshot

ColumnaTipo
AggregateIdUuid
SerializedDataBlob
Versionint

La tabla Snapshot es sencilla. Consta de la columna SerializedData que contiene la versión serializada del agregado en un momento dado. Los datos serializados pueden estar en cualquiera de los muchos esquemas posibles, binarios, XML, texto sin formato, json, etc. La decisión sobre cómo serializar las snapshots depende realmente del sistema que se esté construyendo. Se incluye un número de versión con el snapshot, que representa qué versión del agregado.

Para crear un snapshot, se debe introducir un proceso que maneje su creación. Este proceso puede vivir fuera del servidor de aplicaciones como un proceso en segundo plano y puede existir un solo proceso en ejecución o muchos dependiendo de las necesidades debido al rendimiento. Todos los snapshots ocurren de forma asincrónica. La siguiente figura muestra una arquitectura conceptual incluyendo el proceso [SnapShotter].

ToDo: Insertar gráfico

El [SnapShotter] se encuentra bajo el Event Storage y consulta periódicamente cualquier Agregado que necesite crear un snapshot porque ha superado el número permitido de eventos. Esta consulta se puede hacer con bastante facilidad en Event Storage uniendo la tabla Aggregates a la tabla Snapshot utilizando la columna AggregateId.

La diferencia se calcula restando la última versión en la tabla Snapshot de la versión actual utilizando la cláusula where para que únicamente retorne los agregados con una diferencia mayor que algún número.

Esta consulta devolverá todos los agregados que se crearán en la tabla Snapshot. El SnapShotter luego iteraría a través de esta lista de Agregados para crear los snapshots (si se usan múltiples SnapShotters, el patrón de Competing Consumer funcionará bien).

El proceso de creación de un snapshot implica que el dominio cargue la versión actual del agregado y luego tome un snapshot de la misma. La creación del snapshot se puede hacer de muchas maneras. Una vez que se ha tomado el snapshot, se vuelve a guardar en la tabla de Snapshots para que las consultas tengan el snapshot disponible.

Muchos utilizan la funcionalidad de serialización disponible con la plataforma de desarrollo que utilizan con buenos resultados, aunque el patrón Memento es bastante útil cuando se trata de snapshots.

El patrón Memento (o serialización personalizada) aísla mejor el dominio con el tiempo a medida que cambia la estructura de los objetos del dominio.

El serializador predeterminado suele tener problemas para el manejo de versiones cuando se libera una nueva estructura del dominio (los snapshots existentes deben eliminarse y recrearse o actualizarse para que coincidan con el nuevo esquema). El uso del patrón Memento permite el control de versiones separadas del esquema de snapshots del propio objeto de dominio.

Ten en cuenta que cuando el SnapShotter se da cuenta que el Raiz Agregado necesita que se realice un snapshot carga el Agregado y toma el snapshot. Desafortunadamente, puede suceder que mientras hacía esto, uno de los clientes realizó un cambio en el mismo Agregado. Como el snapshot depende de la posición dentro del log de eventos, recibiría una falla de concurrencia optimista. La respuesta fácil sería simplemente repetir el proceso, pero ¿qué pasaría si fallara nuevamente? El snapshot de un Agregado muy utilizado podría terminar en una situación en la que tendría una probabilidad muy baja de escribir realmente snapshot con éxito.

Al separar los snapshots en su propia tabla y asociarlas a una versión del agregado, se resuelve este problema. No es necesario ordenar los snapshot, los snapshots ni siquiera necesita estar en la última versión, el snapshot que se toma es válido para la versión en la cuál se generó.

Los snapshots son una heurística que mejorará drásticamente el rendimiento de muchos sistemas, aunque no todos los sistemas necesitan snapshots. En general, se recomienda manejar el desarrollo sin snapshots, ya que siempre se puede introducir más tarde como una simple mejora del rendimiento del sistema.

Event Storage como una Cola

Se sabe que los eventos que salen de un dominio también son un [Modelo de integración]. Muy a menudo, estos eventos no solo se guardan, sino que también se publican en la cola donde se envían de forma asincrónica a los suscriptores dentro del mismo sistema o para otras aplicaciones (la generación de reportes es un buen ejemplo).

Un problema que existe con muchos sistemas de publicación de eventos es que requieren una confirmación de dos fases entre cualquier almacenamiento que estén utilizando (relacional o de otro tipo) y la publicación de sus eventos en la cola.

La razón por la que se necesita la confirmación en dos fases es que podría ocurrir una catástrofe durante el pequeño período de tiempo entre la confirmación de la escritura en el almacenamiento de datos y la confirmación de la escritura en la cola.

Si ocurriera una falla durante este período, el mensaje no se publicaría en la cola (o en la otra dirección, se puede publicar pero el cambio no se guardaría). Si ocurriera cualquiera de los casos, los suscriptores de los eventos no estarían sincronizados con el productor.

La confirmación en dos fases puede ser costosa, pero para los sistemas de baja latencia hay un problema mayor cuando se trata con esta situación. En general, la cola en sí es persistente, por lo que el evento se escribe en el disco dos veces en la confirmación de dos fases, una en el Event Storage y otra en la cola persistente.

Dado que la mayoría de los sistemas tienen dos escrituras no es tan importante, pero si tiene requisitos de baja latencia, puede convertirse en una operación bastante costosa, ya que también forzará búsquedas en el disco. La siguiente figura ilustra la confirmación en dos fases entre el almacenamiento de datos y la publicación en una cola.

ToDo: Insertar gŕafico

Se podría solucionar este problema escribiendo solo en una cola y luego «algo» en el otro lado de la cola debería actualizar el almacenamiento de datos con los cambios representados por los eventos, sin embargo, esto tiene algunos problemas. El mayor problema es que no todos los eventos podrán escribirse en el almacenamiento, se ha introducido una consistencia eventual y es posible que ocurra un problema de concurrencia optimista en la escritura de los eventos. Tratar este problema en un sistema de producción no es trivial.

Por otro lado, muchas soluciones hacen lo contrario, usan el almacenamiento de eventos como una cola. Agregar un número de secuencia a la tabla Events discutida anteriormente permite usar el Event Storage como una cola. La siguiente tabla ilustra el cambio en el esquema de la tabla Events.

ColumnaTipo
AggregateIdBinary(16)
DataBlob
SequenceNumberLong
VersionInt

La base de datos aseguraría que los valores de SequenceNumber serían únicos e incrementales, esto se puede hacer fácilmente usando un tipo de incremento automático.

Debido a que los valores son únicos e incrementales un proceso secundario puede estar atento a la tabla de Events y publicar los eventos en otra cola. El proceso de «estar atento» simplemente tendría que almacenar el valor del número de secuencia del último evento que había procesado, incluso podría actualizar este valor con un compromiso de dos fases que lleva la actualización y la publicación a la cola en la misma transacción. Este proceso se puede ver en la siguiente figura.

ToDo: Insertar gráfico

La responsabilidad ha sido retirada del procesamiento inicial de una manera segura. La publicación puede ocurrir de forma asíncrona a la escritura real. Esto reduce la latencia de completar la operación inicial, también limitará el número de escrituras de disco en el procesamiento de la solicitud inicial a uno.

Esta estrategia puede ser extremadamente valiosa cuando se trata con requisitos de baja latencia, ya que permite que gran parte del trabajo en el procesamiento inicial se descargue a otro proceso de forma asincrónica y segura, hay poca diferencia si la publicación ocurre como parte del procesamiento inicial o de forma asincrónica, ya que generalmente los mensajes se publican de forma asincrónica de todos modos, el uso de Event Storage como cola solo aumenta el tiempo hasta que el mensaje se publique ligeramente, esto puede verse como un aumento leve del SLA.

 

Construyendo un Bus de Comandos

Con seguridad existen varios paquetes disponibles que implementan un bus de comandos con distinto nivel de complejidad y funcionalidad, sin embargo si deseas implementar algo sencillo o tener este componente totalmente en tu control, a continuación mostraré un ejemplo que puede servir como base.

La infraestructura de un bus de comandos propuesta estará compuesta por tres componentes principales:

  1. Comandos, que son objetos de datos que se pasan al bus de comandos
  2. Controladores (Handlers), que se ocupan de receptar un comando y completar la tarea requerida.
  3. Bus de comandos, que es responsable de despachar comandos a los controladores (handlers) y crear instancias de los objetos necesarios.

Está implmenetación no es muy diferente a la implementación de un bus de eventos, sin embargo la gran diferencia es que los comandos tienen un solo controlador, mientras que los eventos tienen uno o varios controladores.

La interfaz Command

Lo primero que necesitamos crear es la interfaz Command

type Command interface {
}

La interfaz Command no tiene definiciones de métodos, dado que por ahora un Comando es básicamente un objeto plano que contiene datos.

Pero, ¿entonces, para qué creamos la interfaz? Existe mucho valor en implementar interfaces, incluso si la interfaz no contiene ningún método dado que hace que el código sea mucho más legible y fácil de entender para otros desarrolladores.

La interfaz Handler

Ahora, es necesario definir la interfaz Handler:

type Handler interface {
 handle(command Command)
}

Cada controlador requiere implementar un método handle(), que debería aceptar un comando (Command). Realmente no nos importa cómo maneja el controlador al comando, solo nos importa decirle qué hacer.

El contenedor

El bus de comandos será responsable de crear una nueva instancia de un controlador particular para cada comando que reciba, esto significa que el bus de comandos requiere una manera de crear instancias de otros objetos.

Vamos a definir una interfaz Container que esté en nuestro control con la finalidad de poder realizar implementaciones distintas que satisfagan nuestro contrato.

type Container interface {
 make(typeName string)
}

El inflector

El bus de comandos también puede ser responsable de asignar los comandos con su respectivo controlado, sin embargo en lugar de acoplar esta responsabilidad en el bus de comandos, podemos escribir un inflector que pueda hacer el trabajo por nosotros.

type Inflector interface {
 inflect(command Command)
}

Una implementación sencilla del inflector podría ser la siguiente:

func (ni *NameInflector) Inflect(command Command) string {
 t := reflect.TypeOf(command)
 h := t.Elem().Name()
 return strings.Replace(h, "Command", "Handler", -1)
}

Implementacion del bus de comandos

Finalmente vamos a implementar el bus de comandos. Al bus de comandos se le deben inyectar instancias de una implementación de Container y otra de una implementación de Inflector, adicionalmente debe tener un método execute() que acepte un Command.

type CommandBus struct {
 Container Container
 Inflector Inflector
}
func (cb *CommandBus) handler(c Command) Handler {
 name := cb.Inflector.Inflect(c)
 return cb.Container.make(name).(Handler)
}
func (cb *CommandBus) Execute(c Command) {
 cb.handler(c).Handle(c)
}

Como se puede ver, el método execute() no devuelve ningún valor. Adicionalmente contiene un método privado para determinar qué handler utilizar y luego buscarlo en el contenedor.

Notas finales

La capa de aplicación en DDD es una parte más de nuestra arquitectura. Esta capa maneja la comunicación con el mundo exterior aceptando solicitudes y respondiendolas. Al actuar con un componente delimitador en realidad no le importa de dónde viene la solicitud o hacia dónde va.

Hay un par de enfoques frecuentemente utilizados para construir la capa de aplicación. Dos opciones populares son los Servicios de Aplicación o el uso de Comandos y Manejadores. Ambos enfoques tienen aspectos positivos y negativos, por lo que depende de la necesidad decidir cuál es el adecuado para el problema en cuestión.

Este artículo muestra una implementación simple del bus de comandos, sin embargo puede ser útil como base para robustecer una solución más específica o simplemente para entender los conceptos relacionados. 

 

Bus de Servicios

En términos simples, un bus de servicio es un mecanismo para intercambiar mensajes entre componentes. Los mensajes son DTOs (Data Transfer Object / Objectos de transferencia de datos) que contienen información relevante que nos permite interactuar sobre dicha información.

Existe un componente conocido como “emisor” cuya responsabilidad es la creación del mensaje y su entrega al bus. Por otro lado, existe un segundo componente conocido como “receptor” que le especifica al bus qué tipo de mensajes le interesa recibir.

Cuando el bus recibe un mensaje, envía el mensaje a los receptores, entonces, en realidad el bus actúa como el límite entre los distintos componentes creando desacoplamiento en nuestra solución de software. Tanto los emisores, como los receptores, desconocen la existencia de otros componentes.

Debido a este desacoplamiento, un bus de servicio puede permitir que diferentes componentes trabajen juntos de manera eficiente. Dado que el bus se encuentra en el intermedio de todos los mensajes, puede agregar funcionalidad a todos estos mensajes sin cambiarlos. Un ejemplo puede ser el registros de todos los mensajes en un sistema de logs o su encolamiento en una cola de mensajes.

Tipos de bus

En los párrafos anteriores se describió un bus de servicio de maneral general, un bus que únicamente despacha mensajes. No restringe a los mensajes o a los manejadores de mensajes de manera alguna.

Es normal que distintos tipos de mensajes requieran distinta lógica gestión, es por ello que se puede hablar de tipos de bus y hoy me interesa comentar sobre los siguientes tres:

Bus de comandos

Se caracteriza por:

  • Los mensajes (comandos) señalan la intención del usuario, por ejemplo: CrearSolicitud o RegistrarUsuario
  • Un comando es manejado por exactamente un controlador (handler)
  • Un comando no retorna valor alguno.

Bus de consultas

En ingles conocido como Query bus, se caracteriza por:

  • Los mensajes (consultas / queries) identifican una pregunta, que no está relacionada necesariamente con una consulta a base de datos (sql query). Algunos ejemplos serían: UltimasSolicitudes o ComentariosDeUnArticulo
  • Una consulta es manejada por exactamente un controlador (handler)
  • Las consultas retornan datos
  • Las consultas no deben cambiar el estado de la aplicación

Bus de eventos

Un bus de eventos se caracteriza por:

  • Los mensajes (eventos) indican que ha sucedido un evento, como por ejemplo: ArticuloCreado. UsuarioRegistrado o SolicitudFormalizada
  • Un evento puede ser manejado por cualquier numero de controladores / handlers ([0, inf])
  • Solo contiene un conjunto de valores primitivos (cadenas de texto, enteros, booleanos), no clases completas.
  • Los eventos no deben devolver valores

Consideraciones importantes

Validaciones

Los mensajes siempre deben ser válidos. Esto significa que el objeto / estructura que contiene el mensaje debe validar cada una de sus propiedades. De esta forma, solo se despachan mensajes válidos. Sin embargo, hay un límite para esto.

Por ejemplo, el comando RegistrarUsuario puede requerir (entre otras cosas) un nombre de usuario. El comando debe validar que el nombre de usuario sea una cadena de texto con una longitud entre 6 y 100 caracteres. Si el nombre de usuario es único o no, probablemente no debería ser validado por el comando y quien asumirá dicha responsabilidad es el controlador (handler)

Patrones de diseño

La implementación de comandos y consultas es parte (commands & queries) son parte del patrón CQRS. Sin embargo se puede utilizar buses de servicio sin aplicar CQRS.

Los comandos y eventos a menudo se utilizan juntos, entonces, cuando se ejecuta el comando RegistrarUsuario se activa el evento UsuarioRegistrado.

 

Complejidad esencial y complejidad adicional

Cuando desarrollamos software, podemos decir que nos enfocamos principalmente en resolver problemas y al resolver dichos problemas nos podemos encontrar con dos tipos de complejidad: la complejidad esencial y la complejidad adicional.

Complejidad esencial

Se refiere a la complejidad propia de construir una característica del software

Complejidad adicional

Se refiere a la complejidad que agregamos por nuestra cuenta mientras construimos la característica del software, complicando la resolución del problema por distintos factores.

Al inicio de la construcción de un sistema, su complejidad suele ser igual a la complejidad esencial y conforme pasa el tiempo la complejidad del sistema es el resultado de la suma de la complejidad esencial y complejidad adicional.

Una buena arquitectura de software debe pretender que el peso de la complejidad adicional no se incremente en demasía.

Como ejemplos podemos pensar imaginar:

  • una aplicación que como backend tiene un solo endpoint, su complejidad adicional será menor que una que tenga un backend con muchos endpoints acoplados
  • una organización en la cual durante un sprint se desarrollen varias reuniones poco productivas le agrega complejidad adicional al desarrollo.
  • evitar implementar un correcto sistema de despliegue en un inicio por ganar tiempo agrega complejidad adicional cuando el desarrollo involucre a muchos desarrolladores desplegando código.

Como podemos observar, la complejidad adicional no solo tiene que ver con el código que escribimos, la misma incluye todo el entorno en el cual desarrollamos el software, sin embargo en cuanto a código también podemos agregar gratuitamente complejidad adicional cuando el mismo no incluye mejores prácticas, estándares de desarrollo o el uso de clean code.

 

Qué es arquitectura de software

La arquitectura de software son las reglas autoimpuestas al definir como diseñamos software.

La arquitectura de software no incluye el tratamiento de asuntos relacionados al hardware de manera directa.

Respecto al diseño, existen los enfoques de micro-diseño y macro-diseño.

Por ejemplo, el micro-diseño hace referencia al diseño que realizamos cuando probamos el código de una función en un desarrollo dirigido por pruebas y el macro-diseño va más alla de la función que estamos implementando, tiene que ver con modelamos nuestro dominio a un nivel más alto, como dividimos nuestra aplicacion por capas, servicios, etc.