Thursday, 27 February 2020

Storing 50 million events per second in Elasticsearch: How we did it

Introduction

DataDome is a global cybersecurity company that offers a SaaS solution designed to protect customer websites against OWASP automated threats: credential stuffing, layer 7 DDoS attacks, SQL injection & intensive scraping. The solution protects all our customers’ vulnerability points (web, mobile apps & APIs) with cutting-edge artificial intelligence technology, providing real-time bot detection and automated blocking decisions.
The DataDome solution uses Apache Flink for real-time event analysis in order to detect new threats, and Elasticsearch to store requests from all our customers’ visitors. We receive these requests from our customers’ web servers, and use them to provide bot traffic statistics, feedback loops, business insights and bot attack details in the user dashboard.
A few numbers: our cluster stores more than 150TB of data, 15 trillion of events in 60 billion documents , spread in 3 000 indexes and 15 000 shards over 80 nodes. Each document stores 250 events in a seperate field.
Each day, during peak charge, our Elasticsearch cluster writes more than 200 000 documents per second and has a search rate of more than 20 000 requests per second.
Our indexes are daily based, and we have one index per customer in order to provide a logical separation of the data.

Our sharding policy challenge

One year ago, our cluster was composed of 30 dedicated servers split in a hot-warm architecture. Our hot layer was composed of 15 servers with CPUs with 20 threads and 5 SSD disks in RAID0, while our warm layer was composed of 15 servers with lower CPUs in order to reduce the cost because the data inside the warm layer are less requested.
We provide up to 30 days of data retention to our customers. The first seven days of data were stored in the hot layer, and the rest in the warm layer.
A common best practice is to keep a shard size of around 50GB. As described above, we have dedicated indexes for each customer, but all our customers do not have the same workload. Our biggest customers write tens of thousands of documents per second, while our smallest write a few hundreds. Furthermore, we must constantly adapt the number of shards to the evolution of our customers’ traffic, in order to respect this best practice of 50GB per shard.
In order to resolve this issue, we introduced a job which runs each day in order to update the mapping template and create the index for the day of tomorrow, with the right number of shards according to the number of hits our customer received the previous day. Since we know that one document indexed weighs around 1000 bytes, we can predict the number of shards we need for each index in order to respect the best practice of 50 GB per shard.
If we put a document inside a nonexisting index, the document will trigger the creation of the index. However, due to the design of our daily based index, this can cause some trouble to our cluster (yellow status, unassigned shards…), because we have a lot of throughput and the master nodes need to allocate the shards to the nodes at the same time (around midnight). 
Thanks to our job, however, we can create the index in advance (one day before) and avoid these issues.
In the example below, our index pattern is “index_clientID_date”
sharding policy challenge
With this sharding policy, we had almost 72 shards for our largest customer and only 1 shard for our smaller customers.
With the constant flow of new customers using our solution, our cluster became over-sharded and we experienced a decrease in performance, both in read and write. We also saw a significant increase in load average on our data nodes.
We therefore did a benchmark for some search and write requests, and found that the more our shards grew during the day, the more our search and write performances decreased. In the evenings, when we have a spike of traffic and the shards are bigger than in the morning, our Elasticsearch performance was particularly poor. 
Whenever a node had trouble and went down, our cluster suffered, because relocating a big index (72 shards of 50GB) costs a lot in write threads, io disk, CPU and bandwidth, especially during writes.
Our benchmark showed that the perfect shard size for us is around 20GB, given our use case, our document size, our traffic, our index mapping and our node type. Beyond this size, the performance (both in read and write) starts to decrease.
So how could we keep our shards around 20GB? 
  • By adding more shards to our indexes and making our cluster even more oversharded? Certainly not! 
  • By using rollover? Yes!

How we solved the sharding policy challenge

Thanks to the rollover, we reduced our shard count by three, but also the load and CPU consumption on our nodes. The rollover allows us to use fewer shards simultaneously during writes (i.e reduce our load average and CPU usage). The biggest index now has maximum one shard on each hot node (so 15 in total), and small and medium indexes can have from 1 to 6 shards depending on their workload.
The rollover also helped optimize our read performance by using the cache more efficiently, because each write on an index invalidates the whole cache.
Furthermore, we are more comfortable when a node crashes and a lot of shards relocate, because smaller shards means less time to recover, less bandwidth and fewer resources consumed.
You can find the official documentation of the rollover here: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-rollover-index.html
In a few words, the rollover is composed of an alias which receives the requests for both reads and writes. Behind the alias, we have one or many indexes. The read requests are forwarded to all the indexes, while the write requests are forwarded only to the index with the flag “is_write_index” set to true. 
Last, but not least, we applied a “max_size” policy type: each time an index reaches 400GB, a rollover will occur and a new index will be created.
rollover sharding policy
As you can see, a write on “index_10_2019-01-01-000002” will not invalidate the cache of “index_10_2019-01-01-000001”. Also, our indexes are smaller than our shards.
As a result, our shards do not exceed 20GB, and we reduced the load average and optimized the write and read performance as well.

The hotspot challenge

With this, had we managed to design the perfect cluster? Unfortunately not.
After some weeks during which our cluster performed very well, it became unstable just after one hot node went down, we recovered it and put it back in the cluster. 
What happened? 
A few hours after we brought the node back to life in our cluster, a lot of indexes triggered a rollover and all the new shards went to this node. We got an unbalanced cluster where only one node received almost all the write traffic. 
Why? 
Because by default, Elasticsearch takes care to balance the number of shards for each node in the same layer (hot or warm). As a result, almost all the new shards got rolled over, even the 14 shards of the big index.
Let’s look at an example which shows how our cluster could become unbalanced. We assume we are the 3rd of January 2019. We have set “replica 0” in our indexes settings – remember, we have one index per day and per customer (10, 20 and 30 are our customers’ IDs) – and in this case the balancing is perfect, because the writes are spread evenly across our nodes. We have one shard in write per node.
cluster could - hotspot challenge
Now let’s assume that node 3 goes down:
cluster could unbalanced - node down
As expected, all shards from node 3 are moved to node 1 and node 2. And now, what will happen if node 3 comes back to life and a rollover occurs just a few seconds later?
cluster could rollover
Due to the default Elasticsearch shard placement heuristics, we now have all the shards in write on node 3!
How could this be solved? Could we change the heuristic algorithm https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html
Unfortunately, this wouldn’t have helped us. As I said, by default, Elasticsearch tries to balance the number of shards per node. Changing this setting could help us to balance the number of shards per index and per node instead of the number of shards per node, but it would only have helped for big indexes which have one shard per node. For the rest of the indexes, which have fewer shards (let’s say 10 shards) than the hot nodes, this setting doesn’t prevent Elasticsearch from putting all the shards on the first ten nodes only.
cluster could heuristic algorithm
In this example we can see 6 indexes : 
  • index_10_2019-01-03-000001 which is in write
  • index_20_2019-01-03-000001 which is in write
  • index_30_2019-01-03-00001 which is in write
  • index_10_2019-01-02-000001 which is not in write
  • index_20_2019-01-02-000001 which is not in write
  • index_30_2019-01-02-00001 which is not in write
As you can see, even if we try to spread the shards by index and by node instead of just by node, we can find a case where our cluster will be unbalanced.
Here, one solution could be to set the number of shards equal to the number of nodes, but as discussed above, a shard has a cost.

How we solved the hotspot issue

In April 2019, Elasticsearch released version 7.0 which introduced a new feature: the index lifecycle management (aka ILM). 
Thanks to this new feature, we are now able to split our data nodes in three layers : hot, warm and cold.
The main benefit of the index lifecycle management feature is that it allows us to move a shard from hot to warm immediately after the rollover of the index. We can keep a hot layer with only in-write indexes, a warm layer with shards for the last 7 days of data in read-only, and the cold layer with shards for the last 7 to 30 days of data (or more).
Because the writes represent eighty percent of our activity, we want to have a hot layer with only shards in write. This will allow us to keep a balanced cluster, and we don’t need to worry about the hotspot issue.

How to set up ILM 

And finally, create your index in rollover mode suffixed by “-000001”
Let’s take the process for an index from the hot node to the cold node through the warm node:
index lifecycle management

Conclusion

Thanks to the rollover and index lifecycle management features, we solved all the main performance and stability issues of our Elasticsearch cluster.
Improving our monitoring has allowed us to better understand what is happening inside our cluster.
For each index, no matter its size, we now have shards with no more than 25GB of data on each. Having smaller shards also enables better rebalancing and relocation when needed.
We avoid hotspot issues because our hot layer only has shards in write, and the hot, warm and cold architecture improves our cache utilization for read requests.
That being said, the cluster is still not perfect. We still need to:
  • avoid hotspots inside the warm and cold layers (even if our main concern is the scalability for write operations) 
  • closely monitor our jobs creating the rollover aliases 
  • create our indexes one day before using them.
So what’s next? Continuing to scale! As more and more security professionals realize the need for behavioral detection and real-time bot protection, the volume of requests we process is growing exponentially. Our next milestone is therefore to be able to process 500.000 write requests per second. Stay tuned!  

from : https://datadome.co/bot-detection/store-50-million-event-per-second-in-elasticsearch/

And the big one said "Rollover" — Managing Elasticsearch time-based indices efficiently

Anybody who uses Elasticsearch for indexing time-based data such as log events is accustomed to the index-per-day pattern: use an index name derived from the timestamp of the logging event rounded to the nearest day, and new indices pop into existence as soon as they are required. The definition of the new index can be controlled ahead of time using index templates.
This is an easy pattern to understand and implement, but it glosses over some of the complexities of index management such as the following:
  •  To achieve high ingest rates, you want to spread the shards from your active index over as many nodes as possible.
  •  For optimal search and low resource usage you want as few shards as possible, but not shards that are so big that they become unwieldy.
  •  An index per day makes it easy to expire old data, but how many shards do you need for one day?
  •  Is every day the same, or do you end up with too many shards one day and not enough the next?
In this blog post I’m going to introduce the new Rollover Pattern, and the APIs which support it, which is a simpler, more efficient way of managing time-based indices.

Rollover Pattern

The Rollover Pattern works as follows:
  •  There is one alias which is used for indexing and which points to the active index.
  •  Another alias points to active and inactive indices, and is used for searching.
  •  The active index can have as many shards as you have hot nodes, to take advantage of the indexing resources of all your expensive hardware.
  •  When the active index is too full or too old, it is rolled over: a new index is created and the indexing alias switches atomically from the old index to the new.
  •  The old index is moved to a cold node and is shrunk down to one shard, which can also be force-merged and compressed.

Getting started

Let’s assume we have a cluster with 10 hot nodes and a pool of cold nodes. Ideally, our active index (the one receiving all the writes) should have one shard on each hot node in order to split the indexing load over as many machines as possible.
We want to have one replica of each primary shard to ensure that we can tolerate the loss of a node without losing any data. This means that our active index should have 5 primary shards, giving us a total of 10 shards (or one per hot node). We could also use 10 primary shards (a total of 20 including replicas) and have two shards on each node.
First, we’ll create an index template for our active index:
PUT _template/active-logs
{
  "template": "active-logs-*",
  "settings": {
    "number_of_shards":   5,
    "number_of_replicas": 1,
    "routing.allocation.include.box_type": "hot",
    "routing.allocation.total_shards_per_node": 2
  },
  "aliases": {
    "search-logs": {}
  }
}
        
Indices created from this template will be allocated to nodes tagged with box_type: hot, and the total_shards_per_node setting will help to ensure that the shards are spread across as many hot nodes as possible. I’ve set it to 2 instead of 1, so that we can still allocate shards if a node fails.
We will use the active-logs alias to index into the current active index, and the search-logs alias to search across all log indices.
Here is the template which will be used by our inactive indices:
PUT _template/inactive-logs
{
  "template": "inactive-logs-*",
  "settings": {
    "number_of_shards":   1,
    "number_of_replicas": 0,
    "routing.allocation.include.box_type": "cold",
    "codec": "best_compression"
  }
}
        
Archived indices should be allocated to cold nodes and should use deflate compression to save space. I’ll explain why I’ve set replicas to 0 later.
Now we can create our first active index:
PUT active-logs-1
PUT active-logs-1/_alias/active-logs        
The -1 pattern in the name is recognised as a counter by the rollover API. We will use the active-logs alias to index into the current active index, and the search-logs alias to search across all log indices.

Indexing events

When we created the active-logs-1 index, we also created the active-logs alias. From this point on, we should index using only the alias, and our documents will be sent to the current active index:
POST active-logs/log/_bulk
{ "create": {}}
{ "text": "Some log message", "@timestamp": "2016-07-01T01:00:00Z" }
{ "create": {}}
{ "text": "Some log message", "@timestamp": "2016-07-02T01:00:00Z" }
{ "create": {}}
{ "text": "Some log message", "@timestamp": "2016-07-03T01:00:00Z" }
{ "create": {}}
{ "text": "Some log message", "@timestamp": "2016-07-04T01:00:00Z" }
{ "create": {}}
{ "text": "Some log message", "@timestamp": "2016-07-05T01:00:00Z" }
        Read Less

Rolling over the index

At some stage, the active index is going to become too big or too old and you will want to replace it with a new empty index. The rollover API allows you to specify just how big or how old an index is allowed to be before it is rolled over.
How big is too big? As always, it depends. It depends on the hardware you have, the types of searches you perform, the performance you expect to see, how long you are willing to wait for a shard to recover, etc etc. In practice, you can try out different shard sizes and see what works for you. To start, choose some arbitrary number like 100 million or 1 billion. You can adjust this number up or down depending on search performance, data retention periods, and available space.
There is a hard limit on the number of documents that a single shard can contain: 2,147,483,519. If you plan to shrink your active index down to a single shard, you must have fewer than 2.1 billion documents in your active index. If you have more documents than can fit into a single shard, you can shrink your index to more than one shard, as long as the target number of shards is a factor of the original, e.g. 6 → 3, or 6 → 2.
Rolling an index over by age may be convenient as it allows you to parcel up your logs by hour, day, week, etc, but it is usually more efficient to base the rollover decision on the number of documents in the index. One of the benefits of size-based rollover is that all of your shards are of approximately the same weight, which makes them easier to balance.
The rollover API would be called regularly by a cron job to check whether the max_docs or max_age constraints have been breached. As soon as at least one constraint has been breached, the index is rolled over. Since we’ve only indexed 5 documents in the example above, we’ll specify a max_docs value of 5, and (for completeness), a max_age of one week:
POST active-logs/_rollover
{
  "conditions": {
    "max_age":   "7d",
    "max_docs":  5
  }
}
        
This request tells Elasticsearch to rollover the index pointed to by the active-logs alias if that index either was created at least seven days ago, or contains at least 5 documents. The response looks like this:
{
  "old_index": "active-logs-1",
  "new_index": "active-logs-2",
  "rolled_over": true,
  "dry_run": false,
  "conditions": {
    "[max_docs: 5]": true,
    "[max_age: 7d]": false
  }
}
        
The active-logs-1 index has been rolled over to the active-logs-2 index because the max_docs: 5 constraint was met. This means that a new index called active-logs-2 has been created, based on the active-logs template, and the active-logs alias has been switched from active-logs-1 to active-logs-2.
By the way, if you want to override any values from the index template such as settings or mappings, you can just pass them in to the _rollover request body like you would with the create index API.

Why don't we support a max_size constraint?

Given that the intention is to produce evenly sized shards, why don't we support a max_size constraint in addition to max_docs?  The answer is that shard size is a less reliable measure because ongoing merges can produce significant temporary growth in shard size which disappears as soon as a merge is completed.  Five primary shards, each in the process of merging to one 5GB shard, would temporarily increase the index size by 25GB!  The doc count, on the other hand grows predictably.

Shrinking the index

Now that active-logs-1 is no longer being used for writes, we can move it off to the cold nodes and shrink it down to a single shard in a new index called `inactive-logs-1. Before shrinking, we have to:
  •  Make the index read-only.
  •  Move one copy of all shards to a single node. You can choose whichever node you like, probably whichever cold node has the most available space.
This can be achieved with the following request:
PUT active-logs-1/_settings
{
  "index.blocks.write": true,
  "index.routing.allocation.require._name": "some_node_name"
}
        
The allocation setting ensures that at least one copy of each shard will be moved to the node with name some_node_name. It won’t move ALL the shards — a replica shard can’t be allocated to the same node as its primary — but it will ensure that at least one primary or replica of each shard will move.
Once the index has finished relocating (use the cluster health API to check), issue the following request to shrink the index:
POST active-logs-1/_shrink/inactive-logs-1
        
This will kick off the shrink process. As long as your filesystem supports hard links, the shrink will be almost instantaneous. If your filesystem doesn’t support hard links, well, you’ll have to wait while all the segment files are copied from one index to another…
You can monitor the shrink process with the cat recovery API or with the cluster health API:
GET _cluster/health/inactive-logs-1?wait_for_status=yellow
        
As soon as it is done, you can remove the search-logs alias from the old index and add it to the new:
POST _aliases
{
  "actions": [
    {
      "remove": {
        "index": "active-logs-1",
        "alias": "search-logs"
      }
    },
    {
      "add": {
        "index": "inactive-logs-1",
        "alias": "search-logs"
      }
    }
  ]
}
        

Saving space

Our index has been reduced to a single shard, but it still contains the same number of segment files as before, and the best_compression setting hasn’t kicked in yet because we haven’t made any writes. We can improve the situation by force-merging our single-shard index down to a single segment, as follows:
POST inactive-logs-1/_forcemerge?max_num_segments=1
        
This request will create a single new segment to replace the multiple segments that existed before. Also, because Elasticsearch has to write a new segment, the best_compression setting will kick in and the segment will be written with deflate compression.
There is no point in running a force-merge on both the primary and replica shard, which is why our template for inactive log indices set the number_of_replicas to 0. Now that force-merge has finished, we can increase the number of replicas to gain redundancy:
PUT inactive-logs-1/_settings
{ "number_of_replicas": 1 }
        
Once the replica has been allocated — use the cluster health API with ?wait_for_status=green to check — we can be sure that we have a redundant copy of our data and we can safely delete the active-logs-1 index:
DELETE active-logs-1
        

Deleting old indices

With the old index-per-day pattern, it was easy to decide which old indices could be dropped. With the rollover pattern, it isn’t quite as obvious which index contains data from which time period.
Fortunately, the field stats API makes this easy to determine. We just need to look for all indices where the highest value in the @timestamp field is older than our cutoff:
GET search-logs/_field_stats?level=indices
{
  "fields": ["@timestamp"],
  "index_constraints": {
    "@timestamp": {
      "max_value": {
        "lt": "2016/07/03",
        "format": "yyyy/MM/dd"
      }
    }
  }
}
        
Any indices returned by the above request can be deleted.

Future enhancements

With the rollovershrinkforce-merge, and field-stats APIs, we have provided you with the primitives to manage time-based indices efficiently.
Still, there are a number of steps which could be automated to make life simpler. These steps are not easy to automate within Elasticsearch because we need to be able to inform somebody when things don’t go as planned. This is the role of a utility or application built on top of Elasticsearch.
Expect to see a work flow based on the above provided by the Curator index management tool and a nice UI in X-Pack which can take advantage of the scheduling and notification features to provide a simple, reliable, index management tool.

from: https://www.elastic.co/cn/blog/managing-time-based-indices-efficiently