Si queremos sacar más partido a nuestros datos y obtener información adicional de ellos es muy probable que queramos ejecutar algún tipo de agregación. Cuando usamos Logstash y Elasticsearch hay principalmente dos estrategias para agregar los datos:
- Agregar los datos con Logstash antes de ingestar en Elasticsearch: Para ello existen principalmente dos plugins:
- El filtro Aggregate: Nos permite agregar información de múltiples fuentes en base a uno o varios campos que usamos como identificadores para agrupar todos los eventos asociados y ejecutar código ruby, lo cuál nos da mucha libertad y potencia.
- El filtro Metrics: Nos permite ejecutar algunas agregaciones muy básicas como un conteo de eventos, la media, etc. Es fácil de usar y sencillo, aunque si tenemos varias instancias de logstash generará datos insuficientemente agregados.
- Usar agregaciones desde Elasticsearch: En este caso los datos ya se han ingestado, y ejecutamos las agregaciones a posteriori. Elasticsearch tiene una gran cantidad de agregaciones.
Cada estrategia tiene sus propias ventajas e inconvenientes. En general, agregar desde Logstash puede parecer una buena idea porque podemos insertar los datos directamente en Elasticsearch ya agregados, evitando a Elastic emplear recursos en ello, pero es bastante problemático si tenemos muchos datos, ya que el filtro Aggregate requiere que usemos un único worker (si empleásemos varios workers, un evento podría entrar por un worker y otro evento que debería ir al mismo agregado podría entrar por otro worker, y como éstos son independientes, no se llegarían a ver, generando dos agregados distintos), lo cuál es un gran cuello de botella. Por otra parte, agregar en elasticsearch ofrece muchas posibilidades, pero a veces no es factible ejecutar una agregación cada vez que queremos obtener algún dato agregado, porque algunas agregaciones son muy costosas y pueden generar una alta carga en los nodos de Elasticsearch.
Además, la forma en la que se originan los datos puede introducir distintos niveles de complejidad. Por ejemplo, imaginemos que tenemos una tienda online y queremos usar agregaciones para obtener datos como la cantidad total de productos que hemos vendido, el dinero obtenido para cada producto, etc. Tener un único servidor en el que cada vez que se produce una venta se genera un simple log con toda la información sería conveniente. Pero…¿Y si para cada venta de un producto necesitamos la interacción de varios microservicios, que además generan logs en distintos formatos?¿Y si la información de una venta de un producto está dispersa en varios eventos (por ejemplo, en un evento se muestra la información del usuario, en otro evento los productos seleccionados y en otro evento la información del pago)? Las agregaciones se complican. En general, cuanto más complicadas sean las agregaciones, más problemas pueden generar en elasticsearch y más vamos a querer moverlas a Logstash. Pero ya hemos visto que Logstash también tiene bastantes limitaciones a nivel de escalabilidad.
Transformaciones al rescate
Desde Elasticsearch 7.3 tenemos una herramienta que puede ayudarnos a solucionar nuestros problemas con la agregación de datos. Las transformaciones.
Las transformaciones son un proceso que se encarga de:
- Agrupar un conjunto de eventos en elasticsearch (pueden venir de distintos índices).
- Ejecutar una agregación sobre dichos eventos.
- Escribir los resultados en un nuevo índice.
Este proceso puede ejecutarse una única vez (batch transform) o continuamente (continuous transform). Lo más habitual es ejecutar las transformaciones en modo continuo.
Hay dos tipos de transformaciones: pivot y latest (a partir de Elasticsearch 7.12). Podríamos decir que el tipo latest no es más que una simplificación del tipo pivot para mostrar el último documento para cada elemento o conjunto de elementos. Así que vamos a centrarnos en el tipo pivot, que es lo que normalmente usaremos para nuestras agregaciones.
Las transformaciones pivot nos permitirán generar índices de tipo entity-centric . Es decir, antes teníamos distintos eventos para cada entidad (una entidad puede ser cualquier campo o conjunto de campos que identifican un elemento, como un usuario, una sesión, una transacción, etc.). Con las transformaciones generamos un nuevo índice en el que toda la información de una entidad queda resumida en un único evento, que puede irse actualizando a medida que aparecen nuevos eventos de dicha entidad.
Funcionamiento de las transformaciones
Hay dos campos muy importantes a la hora definir las transformaciones de tipo pivot:
- group_by: Es el conjunto de campos que definen cómo se agruparán nuestros datos. Toda la información de un grupo irá a parar al mismo documento. Puede ser un término (por ejemplo, si nuestros eventos tienen un campo user_id, podemos usar ese campo para agrupar, juntando todos los eventos del mismo user_id). También podemos agrupar en buckets temporales o en base a geolocalización. Todos estos campos además se pueden combinar. Es decir, podemos agrupar por tiempo para que todos los eventos se agrupen en buckets de 30 minutos, y luego dividir estos buckets por algún término como el user_id que mencionábamos antes. Es decir, para cada user_id terminaríamos con un documento cada 30 minutos (siempre y cuando haya algún evento en ese periodo de tiempo).
- aggregation: Indica la agregación que se ejecutará sobre los eventos. Actualmente hay muchos tipos de agregaciones soportadas (consultar la documentación de elasticsearch para más detalle).
Por tanto, lo que hacemos es una agrupación de datos en base a uno o varios campos definidos en el group_by. Ejecutaremos una agregación sobre todos los documentos de este grupo, y el resultado de la agregación irá a un único documento en un nuevo índice. Es decir, en el nuevo índice tendremos un documento para cada grupo de eventos definido en el group_by. Pero este documento no es algo estático. Cada vez que aparezca un nuevo documento en un grupo, la transformación se encargará de volver a ejecutar la agregación sobre este grupo. Esto es muy conveniente para agregaciones que incluyan documentos separados por días o incluso semanas.
Cuando ejecutamos la transformación en modo continuo, Elasticsearch ejecuta periódicamente las transformaciones en base a checkpoints. Es decir, calcula qué entidades (una entidad está definida por los campos que usamos en el group_by) han cambiado desde el último checkpoint hasta el momento actual y ejecuta la agregación sobre cada una de estas entidades. La primera vez que ejecutemos la transformación no tendremos disponible ningún checkpoint, y por tanto, es posible que tengamos muchos datos que procesar, lo cuál puede hacer que este proceso tarde mucho tiempo. Por eso, si tenemos muchos datos es recomendable acotar temporalmente las consultas (en el campo query de la transform) para no sacar datos demasiado antiguos.
Las transformaciones nos ofrecen varias ventajas:
- Sólo se calculan una vez (salvo que aparezcan nuevos documentos de un grupo).
- Se ejecutan continuamente, lo cuál ayuda a repartir la carga y no tener que hacer grandes esfuerzos.
- Los índices que generan nos permiten hacer búsquedas sobre datos agregados, lo cuál nos puede permitir encontrar más rápidamente la información.
- A diferencia de las agregaciones de logstash, podemos agregar información con días o semanas de diferencia.
- Nos permiten pintar en grafana directamente estos datos agregados sin tener que repetir una y otra vez complejas agregaciones (eficiencia).
No obstante, un gran poder conlleva una gran responsabilidad, y usar transformaciones supone añadir un nuevo punto de fallo. Es fundamental comprobar bien que los campos que usamos en las agregaciones de las transformaciones existen y están correctamente indexados. Si no, las transformaciones pueden fallar fácilmente. Además, hay que ir con especial cuidado ante los problemas de ingesta en elasticsearch, ya que si nos llegan documentos con demasiado lag, la transformación puede no detectar estos documentos (recordemos que cada vez que se ejecuta, busca los grupos que han cambiado desde el último checkpoint hasta el momento actual) y por tanto es como si no hubieran existido. Por lo que al final podemos “perder” datos en el nuevo índice de datos agregados generado por la transformación. Es por ello que siempre se recomienda usar el campo sync.time.delay para dar un tiempo razonable a que lleguen los datos a Elastic.
Probando transformaciones
La mejor forma de probar las transformaciones es usando la API de preview, que nos permite ejecutar la transformación con un pequeño subconjunto de los datos a modo de prueba. Esto es bastante cómodo para ir probando cambios de forma rápida. No obstante, a veces falla o no muestra resultados, y la única forma de entender qué está pasando es creando la transformación y ejecutándola.
Las transformaciones de elastic soportan bastantes tipos de agregaciones, pero creo que la más interesante es la de tipo scripted metric, ya que nos permite ejecutar un script en Painless (básicamente Java) sobre todos los documentos del grupo, lo cuál es muy potente y nos da una gran flexibilidad.
Volviendo a nuestro ejemplo de la tienda online, supongamos que la información de los pagos nos viene dispersa: En algunos documentos viene el user_data.id, en otros nos viene el producto y el precio, en otros documentos nos viene el modo de pago, y para algunas compras, puede aparecer el campo user_data.dni. Pero todos estos eventos comparten un campo que se llama transaction_id. Podemos hacer una transformación que agregue los documentos en base a dicho campo, donde el resultado será un nuevo índice en el que hay un documento para cada transaction_id donde tenemos toda la información resumida, como se muestra en el ejemplo:
POST _transform/_preview { "source": { "index": [ "mi-tienda-online-*" ], "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-5d" } } } ] } } }, "pivot": { "group_by": { "correlator": { "terms": { "field": "transaction_id" } } }, "aggregations": { "transaction_data": { "scripted_metric": { "init_script": "state.docs = []", "map_script": """ Map span = [ '@timestamp': doc['@timestamp'].value, ]; def optional_fields = ['producto', 'user_data', 'precio', 'modo_pago']; for (f in optional_fields){ if (doc.containsKey(f) && doc[f].size() > 0){ span[f] = doc[f].value; } } state.docs.add(span); """, "combine_script": "return state.docs;", "reduce_script": """ def all_docs = []; for (s in states) { for (span in s) { all_docs.add(span); } } all_docs.sort((HashMap o1, HashMap o2)->o1["@timestamp"].toInstant().toEpochMilli().compareTo(o2["@timestamp"].toInstant().toEpochMilli())); def ret = new HashMap(); ret["start_time"] = all_docs[0]["@timestamp"]; ret["end_time"] = all_docs[all_docs.length-1]["@timestamp"]; ret["duration_ms"] = ret["end_time"].toInstant().toEpochMilli() - ret["start_time"].toInstant().toEpochMilli(); for (d in all_docs){ if (d["user_data"] != null) { if (d["user_data"].containsKey("id")) { ret["user_id"] = d["user_data"]["id"]; } if (d["user_data"].containsKey("dni")) { ret["dni"] = d["user_data"]["dni"]; } } for (f in ['producto', 'precio', 'modo_pago']) { if (d.containsKey(f) && !ret.containsKey(f)) ret[f] = d[f]; } } return ret; """ } } } }, "frequency": "3m", "dest": { "index": "transform-mitiendaonline" }, "sync": { "time": { "field": "@timestamp", "delay": "2m" } }, "retention_policy": { "time": { "field": "transaction_data.end_time", "max_age": "21d" } }, "settings": { "max_page_search_size": 500 } }
siendo el resultado de la prueba algo como:
{ "preview" : [ { "transaction_id" : "0003301a8e8c1830", "transaction_data" : { "duration_ms" : 1, "user_id": fulanito "dni": "77777777X" "start_time" : "2022-10-10T12:56:03.557Z", "end_time" : "2022-10-10T12:56:03.558Z", "producto": "cepillo_dientes_tipo_2", “precio”: 2.45 } }, ... ]
Como podemos observar, además de poder juntar todos los campos, también podemos calcular la duración de la petición y guardarnos el inicio y el fin. Realmente no hay muchos límites, es básicamente programar en Java y tenemos una gran flexibilidad.
En definitiva…
Las transformaciones nos ofrecen una solución muy conveniente para cuando necesitamos agregar nuestros datos en Elasticsearch. Permiten agregar de forma continua nuestros datos, evitando que se produzcan grandes picos carga que puedan afectar a la estabilidad del clúster. No obstante, son un nuevo punto de fallo que conviene vigilar para evitar pérdida de información. En la documentación oficial de Elasticsearch se pueden encontrar más ejemplos.