Adding information with transformations in Elasticsearch

 Reading time: 7

If we want to get more out of our data and get additional information from it we will most likely want to run some kind of aggregation. When using Logstash and Elasticsearch there are mainly two strategies for aggregating data:

 

  1. Aggregate the data with Logstash before ingesting it into Elasticsearch: For this there are mainly two plugins:
  • The Aggregate filter: It allows us to aggregate information from multiple sources based on one or more fields that we use as identifiers to group all associated events and run ruby code, which gives us a lot of freedom and power.
  • The Metrics filter: It allows us to run some very basic aggregations such as an event count, average, etc. It is easy to use and simple, although if we have several logstash instances it will generate insufficiently aggregated data.
  1. Use aggregations from Elasticsearch: In this case the data has already been ingested, and we run the aggregations after the fact. Elasticsearch has a large number of aggregations.

 

Each strategy has its own advantages and disadvantages. In general, aggregating from Logstash may seem like a good idea because we can insert the data directly into Elasticsearch already aggregated, avoiding Elastic to spend resources on it, but it is quite problematic if we have a lot of data, because the Aggregate filter requires us to use a single worker (if we use several workers, an event could enter through a worker and another event that should go to the same aggregate could enter through another worker, and as these are independent, they would not be seen, generating two different aggregates), which is a big bottleneck. On the other hand, aggregation in elasticsearch offers many possibilities, but sometimes it is not feasible to run an aggregation every time we want to get some aggregate data, because some aggregations are very expensive and can generate a high load on the Elasticsearch nodes.

 

In addition, the way in which data is sourced can introduce different levels of complexity. For example, let’s imagine we have an online shop and we want to use aggregations to obtain data such as the total amount of products we have sold, the money obtained for each product, and so on. Having a single server where every time a sale occurs, a simple log is generated with all the information would be convenient. But… What if for each sale of a product we need the interaction of several microservices, which also generate logs in different formats? What if the information of a sale of a product is dispersed in several events (for example, in one event the user information is shown, in another event the selected products and in another event the payment information)? Aggregations get complicated. In general, the more complicated the aggregations are, the more problems they can generate in elasticsearch and the more we will want to move them to Logstash. But we have already seen that Logstash also has some limitations in terms of scalability.

 

Transformations to the rescue

Since Elasticsearch 7.3 we have a tool that can help us to solve our problems with data aggregation. Transformations.

 

Transformations are a process that is in charge of:

  1. Group a set of events in elasticsearch (they can come from different indexes).
  2. Execute an aggregation on these events.
  3. Write the results to a new index.

 

This process can be executed once (batch transform) or continuously (continuous transform). It is most common to run the transformations in continuous mode.

 

There are two types of transformations: pivot and latest (as of Elasticsearch 7.12). We could say that the latest type is just a simplification of the pivot type to show the latest document for each element or set of elements. So let’s focus on the pivot type, which is what we will normally use for our aggregations.

Pivot transformations will allow us to generate entity-centric indexes. That is, before we had different events for each entity (an entity can be any field or set of fields that identify an element, such as a user, a session, a transaction, etc.). With the transformations we generate a new index in which all the information of an entity is summarised in a single event, which can be updated as new events appear for that entity.

 

How transformations work

There are two very important fields when defining pivot transformations:

group_by: This is the set of fields that define how our data will be grouped. All the information in a group will end up in the same document. It can be a term (for example, if our events have a user_id field, we can use that field to group, putting together all the events of the same user_id). We can also group in time buckets or based on geolocation. All these fields can also be combined. That is, we can group by time so that all events are grouped into 30 minute buckets, and then divide these buckets by some term like the user_id we mentioned before. That is, for each user_id we would end up with a document every 30 minutes (as long as there is an event in that period of time).

aggregation: Indicates the aggregation that will be executed on the events. Currently there are many types of aggregations supported (see the elasticsearch documentation for more details).

 

So, what we do is a grouping of data based on one or more fields defined in the group_by. We will run an aggregation on all the documents in this group, and the result of the aggregation will go to a single document in a new index. That is, in the new index we will have a document for each group of events defined in the group_by. But this document is not static. Every time a new document appears in a group, the transformation will take care of re-executing the aggregation on this group. This is very convenient for aggregations that include documents separated by days or even weeks.

 

When we run the transformation in continuous mode, Elasticsearch periodically executes the transformations based on checkpoints. That is, it calculates which entities (an entity is defined by the fields we use in the group_by) have changed from the last checkpoint to the current time and executes the aggregation on each of these entities. The first time we run the transformation we will not have any checkpoint available, and therefore, it is possible that we have a lot of data to process, which can make this process take a long time. Therefore, if we have a lot of data, it is advisable to temporarily limit the queries (in the query field of the transform) so as not to retrieve data that is too old.

Transformations offer several advantages:

 

– They are only calculated once (unless new documents appear in a group).

– They are executed continuously, which helps to spread the load and not have to make great efforts.

– The indexes they generate allow us to search on aggregated data, which can allow us to find information more quickly.

– Unlike logstash aggregations, we can aggregate information days or weeks apart.

– They allow us to paint this aggregated data directly on graphs without having to repeat complex aggregations over and over again (efficiency).

 

However, with great power comes great responsibility, and using transformations adds another point of failure. It is essential to check well that the fields we use in the aggregations of the transformations exist and are correctly indexed. If not, transformations can easily fail. In addition, we must be especially careful with ingestion problems in elasticsearch, because if we receive documents with too much lag, the transformation may not detect these documents (remember that each time it is executed, it looks for the groups that have changed since the last checkpoint until the current moment) and therefore it is as if they had not existed. So in the end we may “lose” data in the new aggregated data index generated by the transformation. This is why it is always recommended to use the sync.time.delay field to give a reasonable time for the data to reach Elastic.

 

Testing transformations

 

The best way to test transformations is by using the preview API, which allows us to run the transformation on a small subset of the data for testing purposes. This is quite convenient for testing changes quickly. However, sometimes it fails or does not show results, and the only way to understand what is happening is to create the transformation and run it.

 

Elastic transformations support several types of aggregations, but I think the most interesting is the scripted metric type, as it allows us to run a script in Painless (basically Java) on all the documents in the group, which is very powerful and gives us great flexibility.

 

Returning to our example of the online shop, let’s suppose that the payment information is scattered: In some documents we get the user_data.id, in others we get the product and the price, in other documents we get the payment method, and for some purchases, the user_data.dni field may appear. But all these events share a field called transaction_id. We can make a transformation that aggregates the documents based on that field, where the result will be a new index in which there is a document for each transaction_id where we have all the information summarised, as shown in the example:

 

POST _transform/_preview
{
  "source": {
    "index": [
      "mi-tienda-online-*"
    ],
    "query": {
      "bool": {
        "filter": [
          {
            "range": {
              "@timestamp": {
                "gte": "now-5d"
              }
            }
          }
        ]
      }
    }
  },
  "pivot": {
    "group_by": {
      "correlator": {
        "terms": {
          "field": "transaction_id"
        }
      }
    },
    "aggregations": {
      "transaction_data": {
        "scripted_metric": {
          "init_script": "state.docs = []",
          "map_script": """ 
          Map span = [ 
            '@timestamp': doc['@timestamp'].value,
          ];

          def optional_fields = ['producto', 'user_data', 'precio', 'modo_pago'];
          for (f in optional_fields){
            if (doc.containsKey(f) && doc[f].size() > 0){
              span[f] = doc[f].value;
            }
          }

          state.docs.add(span);
        """,
          "combine_script": "return state.docs;",
          "reduce_script": """
          def all_docs = [];
          for (s in states) {
            for (span in s) {
              all_docs.add(span);
            }
          }
          all_docs.sort((HashMap o1, HashMap o2)->o1["@timestamp"].toInstant().toEpochMilli().compareTo(o2["@timestamp"].toInstant().toEpochMilli()));
          def ret = new HashMap();
          ret["start_time"] = all_docs[0]["@timestamp"];
          ret["end_time"] = all_docs[all_docs.length-1]["@timestamp"];
          ret["duration_ms"] = ret["end_time"].toInstant().toEpochMilli() - ret["start_time"].toInstant().toEpochMilli();
          for (d in all_docs){
            if (d["user_data"] != null) {
              if (d["user_data"].containsKey("id")) {
                ret["user_id"] = d["user_data"]["id"];
              }
              if (d["user_data"].containsKey("dni")) {
                ret["dni"] = d["user_data"]["dni"];
              }
            }
            for (f in ['producto', 'precio', 'modo_pago']) {
              if (d.containsKey(f) && !ret.containsKey(f)) ret[f] = d[f];
            }
          }

          return ret;
          """
        }
      }
    }
  },
  "frequency": "3m",
  "dest": {
    "index": "transform-mitiendaonline"
  },
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "2m"
    }
  },
  "retention_policy": {
    "time": {
      "field": "transaction_data.end_time",
      "max_age": "21d"
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

 

with the result of the test being something like

 

{
  "preview" : [
    {
      "transaction_id" : "0003301a8e8c1830",
      "transaction_data" : {
        "duration_ms" : 1,
        "user_id": fulanito
        "dni": "77777777X"
        "start_time" : "2022-10-10T12:56:03.557Z",
        "end_time" : "2022-10-10T12:56:03.558Z",
        "producto": "cepillo_dientes_tipo_2",
        “precio”: 2.45
      }
    },
    ...
]

 

As we can see, besides being able to join all the fields, we can also calculate the duration of the request and save the start and end. There are really not many limits, it is basically programming in Java and we have a great flexibility.

In short…

Transformations offer us a very convenient solution when we need to aggregate our data in Elasticsearch. They allow us to continuously aggregate our data, avoiding large load peaks that can affect the stability of the cluster. However, they are a new point of failure that should be monitored to avoid data loss. More examples can be found in the official Elasticsearch documentation.

 

 

Oscar Erades
Ana Ramírez

Ana Ramírez

Did you find it interesting?

Leave a Reply

Your email address will not be published. Required fields are marked *

FOLLOW US

CATEGORIES

LATEST POST