0 Comments

And we’re back

Its time to ring in the new year with some more chatter about our data synchronization algorithm, which honestly, has turned into a pretty complex beast.

The last time I wrote about the algorithm, I explained how we flipped the original implementation of the differencing scanfrom bottom up, to top down. This change represented a hearty improvement over the original implementation, because the data at the “top” of the table (i.e. when ordered by Row Version descending) is far more likely to change than the data at the “bottom”, at least for our clients anyway.

In that post though, I pointed out an inefficiency in the differencing scan, regardless of the direction”":

There is still an inherent inefficiency in the design of the scan, such that it will continue to do extra work for no benefit. Consider the situation where the scan has started and has dealt with 4 chunks. Its a bit down in the table, and the change it originally wanted to find was in chunk 5. In the meantime, the user has deleted data in chunk 2. The process will have to finish its scan of the entire table before it returns to the top and finds that particular change, which is just wasteful. Instead, it should be able to determine if there are any differences in the remainder of the data set (i.e. from the bottom of the current chunk down) and then use that information to decide to reset its position to the top again. We haven’t actually done this bit yet, because it came up after the deployed the first version to our users.

That inherent inefficiency is the topic for today.

Wait, Come Back, You’re Going In The Wrong Direction!

As mentioned above, the problem lies in the fact that once a difference is detected (i.e. one or more missing rows locally or remotely), the scan will just keep going until it finds and fixes said difference (or it hits the “end” of its scan and resets to the “top”). Of course, time does not stop while the scan is happening, and because it happens over a period of hours, with many relatively isolated executions of the algorithm, other changes are still being made to the underlying data.

Additions and modifications are handled by an entirely different process, so we don’t have to worry about them, but hard deletions can still occur at will.

When such a deletion occurs in the section that has already been scanned, but before the original difference that triggered the scan has been located and fixed, the algorithm can get stuck scanning the remainder of the data set for no real benefit. This actually isn’t a huge issue when the data set is small, or even medium sized, as you can just eat the inefficiency and wait. Eventually it will all work itself out.

Unfortunately, as the data set gets larger, the chances of a deletion occurring in the scanned section increases. Also, since changes are more likely to occur at the “top” of the data set, and the differencing scan works top-down, the sections that were most recently scanned are actually the most likely to contain differences. As a result, the algorithm spends longer and longer doing work for no benefit, so the inefficiency gets worse and worse.

Each chunk comparison requires the data to be present in memory in the remote database as well (its not an index-only query), so every pointless comparison actually results in more reads, which means more IOPS, which is why we started optimizing in the first place!

Long story short, we have to do something about it.

4665, 4666, 4667! There Are 4667 Rows Both Locally And Remotely, Ah Ah Ah

The good news is that this particular problem is not overly difficult to solve. It will still take some effort, but its not some fundamental flaw in the algorithm.

Every time a chunk is completed as part of the differencing scan, we can use a simple count to see whether or not the remaining differences (if there are any) are above or below the current location.

Locally this is trivial, just a query to the DB for count where < current range end.

Getting the same information from the remote requires the introduction of a new endpoint to the API though:

/v1/customers/{customerId}/databases/{databaseId}/tables/{tableName}/count{?aboveRowVersion=123&belowRowVersion=456}

Slightly generalised from our specific use case, but it basically lets you get a count of records:

  • above a certain row version
  • below a certain row version
  • in between two boundary row versions

For the differencing scan, we only really care about the “below a certain row version” use case, to get a count and compare it to the same count from the local data.

If the count is the same, we can safely exit early and flip back to the top of the data set, resetting the differencing scan so that it can pick up the more recent changes.

If the count is different, we just keep going down and repeat the process once we’ve actioned the next chunkl.

Nice and easy.

Of course, there are still some edge cases. Its possible (though not likely) get into a situation where the counts are the same but the data is actually different (a particular combination of locally deleted data and data that was never uploaded successfully) which can throw the whole thing for a bit of a loop, but that could have happened regardless, so we’re still in a better place than we would otherwise be.

Conclusion

Its been a bit of a hard slog trying to optimise the data synchronization algorithm, and we’re still not really there. Not only that, the algorithm itself has become more and more complex over time (obviously), and is getting pretty hard to reason about.

Annoyingly enough, we haven’t run across that one magical improvement that changes everything. Its very much been a kind of “death by a thousand cuts” sort of thing, with tens of small optimizations that alleviate the issue slightly. The improvement in this post is a good example of that sort of thing, and pretty much boils down to “don’t do something you don’t have to do”, which isn’t exactly ground-breaking.

Don’t get me wrong, the process is much better than its ever been, but we’re still seeing very similar patterns in read IOPS, which is problematic.

It might be that the algorithm itself just doesn’t scale as well as we want it to, and that we might need to flip to a fundamentally different approach. Perhaps something that hooks into deeper into the customer data and notifies us of creations/updates/deletions as they occurs, rather than us having to poll for the same information.

Still, that sort of change is not something to be embarked on lightly, even disregarding the fallacy of sunk cost.

0 Comments

I learned a new thing about AWS, instance retirement and auto scaling groups a few weeks ago.

I mean, lets be honest, the amount of things I don’t know dwarfs the amount of things I do know, but this one in particular was surprising. At the time the entire event was incredibly confusing, and no-one at my work really knew what was going on, but later on, with some new knowledge, it all made perfect sense.

I’m Getting Too Old For This

Lets go back a step though.

Sometimes AWS needs to murder one of your EC2 instances. As far as I know, this tends to happen when AWS detects failures in the underlying hardware, and they schedule the instance to be “retired”, notifying the account holder as appropriate. Of course, this assumes that AWS notices the failure before it becomes an issue, so if you’re really unlucky, sometimes you don’t get a warning and an EC2 instance just disappears into the ether.

The takeaway here, is that you should never rely on just one EC2 instance for any critical functions. This is one of the reasons why auto scaling groups are so useful, because you specify a template for the instances instead of just making one by itself. Of course, if your instances are accumulating important state, then you’ve still got a problem if one goes poof.

Interestingly enough, when you stop (not terminate, just stop) an EC2 instance that is owned by an auto scaling group, the auto scaling group tends to murder it and spin up another one, because it thinks the instance has gone bad and needs to be replaced.

Anyway, I was pretty surprised when AWS scheduled two of the Elasticsearch data nodes in our ELK stack for retirement and:

  1. The nodes hung around in a stopped state, i.e. they didn’t get replaced, even though they were owned by an auto scaling group
  2. AWS didn’t trigger the CloudWatch alarm on the Elasticsearch load balancer that is supposed to detect unhealthy instances, even though the stopped instances were clearly marked unhealthy

After doing some soul searching, I can explain the first point somewhat.

The second point still confuses me though.

You’re Suspended! Hand In Your Launch Configuration And Get Out Of Here

It turns out that when AWS schedules an instance for retirement, it doesn’t necessarily mean the instance is actually going to disappear forever. Well, it won’t if you’re using an EBS volume at least. If you’re just using an instance store volume you’re pretty boned, but those are ephemeral anyway, so you should really know better.

Once the instance is “retired” (i.e. stopped), you can just start it up again. It will migrate to some new (healthy) hardware, and off it goes, doing whatever the hell it was doing in the first place.

However, as I mentioned earlier, if you stop an EC2 instance owned by an auto scaling group, the auto scaling group will detect it as a failure, terminate the instance and spin up a brand new replacement.

Now, this sort of reaction can be pretty dangerous, especially when AWS is the one doing the shutdown(as opposed to the account holder), so AWS does the nice thing and suspends the terminate and launch processes of the auto scaling group, just to be safe.

Of course, the assumption here is that the account holder knows that the processes have been suspended and that some instances are being retired, and they go and restart the stopped instances, resume the auto scaling processes and continues on with their life, singing merrily to themselves.

Until this happened to us, I did not even know that suspending auto scaling group processes was an option, let alone that AWS would do it for me. When we happened to notice that two of our Elasticsearch data nodes had become unavailable through Octopus Deploy, I definitely was not the “informed account holder” in the equation, and instead went on an adventure trying to figure out what the hell was going on.

I tried terminating the stopped nodes, in the hopes that they would be replaced, but because the processes were suspended, I got nothing. I tried raising the number of desired instances, but again, the processes were suspended, so nothing happened.

In the end, I created a secondary auto scaling group using the same Launch Configuration and got it to spin up a few instances, which then joined the cluster and helped to settle everything down.

It wasn’t until the next morning that cooler heads prevailed and I got a handle on what was actually happening that we cleaned everything up properly.

Conclusion

This was one of those cases where AWS was definitely doing the right thing (helping people to avoid data loss because of an underlying failure out of their control), but a simple lack of knowledge on our part caused a bit of a kerfuffle.

The ironic thing is that if AWS had simply terminated the EC2 instances (which I’ve seen happen before) the cluster would have self-healed and rebalanced perfectly well (as long as only a few nodes were terminated of course).

Like I said earlier, I still don’t know why we didn’t get a CloudWatch alarm when the instances were stopped, as they were definitely marked as “unhealthy” in the Load Balancer. We didn’t even realise that something had gone wrong until someone noticed that the data nodes were reporting as unavailable in Octopus Deploy, and that happened purely by chance.

Granted, we still had four out of six data nodes in service, and we run our shards with one primary and two replicas, so we weren’t exactly in the danger zone, but we were definitely approaching it.

Maybe its time to try and configure an alarm on the cluster health.

That’s always nice and colourful.

0 Comments

We’ve had full control over our Elasticsearch field mappings for a while now, but to be honest, once we got the initial round of mappings out of the way, we haven’t really had to deal with too many deployments. Sure, they happen every now and then, but its not something we do every single day.

In the intervening time period, we’ve increased the total amount of data that we store in the ELK stack, which has had an unfortunate side effect when it comes to the deployment of more mappings.

It tends to take the Elasticsearch cluster down.

Is The Highway To Hell Paved With Good Intentions?

When we originally upgraded the ELK stack and introduced the deployment of the Elasticsearch configuration it was pretty straightforward. The deployment consisted of only those settings related to the cluster/node, and said settings wouldn’t be applied until the node was restarted anyway, so it was an easy decision to just force a node restart whenever the configuration was deployed.

When deploying to multiple nodes, the first attempt at orchestration just deployed sequentially, one node at a time, restarting each one and then waiting for the node to rejoin the cluster before moving on.

This approach….kind of worked.

It was good for when a fresh node was joining the cluster (i.e. after an auto scale) or when applying simple configuration changes (like log settings), but tended to fall apart when applying major configuration changes or when the cluster did not already exist (i.e. initial creation).

The easy solution was to just deploy to all nodes at the same time, which pretty much guaranteed a small downtime while the cluster reassembled itself.

Considering that we weren’t planning on deploying core configuration changes all that often, this seemed like a decent enough compromise.

Then we went and included index templates and field mappings into the configuration deployment.

Each time we deployed a new field mapping the cluster would go down for a few moments, but would usually come good again shortly after. Of course, this was when we still only had a weeks worth of data in the cluster, so it was pretty easy for it to crunch through all of the indexes and shards and sort them out when it came back online.

Now we have a little over a months worth of data, and every time the cluster goes down it takes a fair while to come back.

That’s real downtime for no real benefit, because most of the time we’re just deploying field mappings, which can actually just be updated using the HTTP API, no restart required.

Dirty Deployments, Done Dirt Cheap

This situation could have easily been an opportunity to shift the field mappings into a deployment of their own, but I still had the same problem as I did the first time I had to make this decision – what’s the hook for the deployment when spinning up a new environment?

In retrospect the answer is probably “the environment is up and has passed its smoke test”, but that didn’t occur to me until later, so we went in a different direction.

What if we didn’t always have to restart the node on a configuration deployment?

We really only deploy three files that could potentially require a node restart:

  • The core Elasticsearch configuration file (/etc/elasticsearch/elasticsearch.yml)
  • The JVM options file (/etc/elasticsearch/jvm.options)
  • The log4j2 configuration file (/etc/elasticsearch/log4j2.properties)

If none of those files have changed, then we really don’t need to do a node restart, which means we can just move ahead with the deployment of the field mappings.

No fuss, no muss.

Linux is pretty sweet in this regard (well, at least the Amazon Linux baseline is) in that it provides a diff command that can be used to easily compare two files.

It was a relatively simple matter to augment the deployment script with some additional logic, like below:

… more script up here

 

temporary_jvm_options="/tmp/elk-elasticsearch/jvm.options" destination_jvm_options="/etc/elasticsearch/jvm.options" echo "Mutating temporary jvm.options file [$temporary_jvm_options] to contain appropriate memory allocation and to fix line endings" es_memory=$(free -m | awk '/^Mem:/ { print int($2/2) }') || exit 1 sed -i "s/@@ES_MEMORY@@/$es_memory/" $temporary_jvm_options || exit 1 sed -i 's/\r//' $temporary_jvm_options || exit 1 sed -i '1 s/^\xef\xbb\xbf//' $temporary_jvm_options || exit 1 echo "Diffing $temporary_jvm_options and $destination_jvm_options" diff --brief $temporary_jvm_options $destination_jvm_options jvm_options_changed=$?

 

… more script down here

 

if [[ $jvm_options_changed == 1 || $configuration_file_changed == 1 || $log_config_file_changed == 1 ]]; then
    echo "Configuration change detected, will now restart elasticsearch service."
    sudo service elasticsearch restart || exit 1
else
    echo "No configuration change detected, elasticsearch service will not be restarted."
fi

No more unnecessary node restarts, no more unnecessary downtime.

Conclusion

This is another one of those cases that seems incredibly obvious in retrospect, but I suppose everything does. Its still almost always better to go with the naive solution at first, and then improve, rather than try to deal with everything up front. Its better to focus on making something easy to adapt in the face of unexpected issues than to try and engineer some perfect unicorn.

Regardless, with no more risk that the Elasticsearch cluster will go down for an unspecified amount of time whenever we just deploy field mapping updates, we can add new fields with impunity (it helps that we figured out how to reindex).

Of course, as with anything, there are still issues with the deployment logic:

  • Linking the index template/field mapping deployment to the core Elasticsearch configuration was almost certainly a terrible mistake, so we’ll probably have to deal with that eventually.
  • The fact that a configuration deployment can still result in the cluster going down is not great, but to be honest, I can’t really think of a better way either. You could deploy the configuration to the master nodes first, but that leaves you in a tricky spot if it fails (or if the configuration is a deep enough change to completely rename or otherwise move the cluster). You might be able to improve the logic to differentiate between “first time” and “additional node”, but you still have the problem of dealing with major configuration changes. Its all very complicated and honestly we don’t really do configuration deployments enough to spend time solving that particularly problem.
  • The index template/field mapping deployment technically occurs once on every node, simultaneously. For something that can be accomplished by a HTTP call, this is pretty wasteful (though doesn’t have any obvious negative side effects).

There’s always room for improvement.

0 Comments

It’s been a little while since I wrote about our data synchronization algorithm, but it still gets a fair bit of space in my mind on a day to day basis.

Most recently, we put some effort into optimizing it in the face of rising IOPS usage at the database level, which worked well enough to stabilize the system in the face of what we wanted it to do at the time.

Then we told it to sync more data.

Specifically, one of the two biggest data sets in our software which covers the storage of repeatable tasks, like appointments, phone calls, inspections and so on.

Not only is this table one of the largest in raw size, it also features the most churn AND is one of the only tables to feature hard deletes, a challenge that we had to build an entire secondary sync process for.

Funnily enough, it actually worked pretty well at first, but everything started to fray at the edges as we started syncing more and more clients. Same sort of thing as before, pressure on IOPS at the database level, mostly as a result of reads.

With the memory of the last round of optimizations fresh in our minds, it was time to enter the breach once more.

Will To Survive

We prodded at the system for a bit, analysing the traffic patterns to try and determine what might be causing the high read IOPS this time. Being that we’d already investigated similar sorts of things recently, we knew our way around it pretty well, so it wasn’t long before we noticed something suspicious.

Lets go back a step though.

The core data synchronization algorithm hasn’t changed all that much from the last time I wrote about it, even in the face of the optimizations.

Get Local Count/Version/Timestamp
Get Remote Count/Version/Timestamp
If Local Count/Version/Timestamp == Remote Count/Version/Timestamp
    Do Nothing and Exit
If Local Version/Timestamp == Remote Version/Timestamp BUT Local Count != Remote Count
    Calculate [BATCH SIZE] Using Historical Data
    Get Last Local Position
    Get Next [BATCH SIZE] Local Rows from last position
    Get Min & Max Version in Batch
    Query Remote for Manifest Between Min/Max Local Version
    Create Manifest from Local Batch
    Compare
        Find Remote Not in Local
            Delete from Remote
        Find Local Not in Remote
            Upload to Remote
If Local Version/Timestamp > Remote Version/Timestamp
    Calculate [BATCH SIZE] Using Historical Data
    Get Next [BATCH SIZE] Local Rows > Remote Version
    Upload to Remote
        Record Result for [BATCH SIZE] Tuning
        If Failure & Minimum [BATCH SIZE], Skip Ahead
If Local Version/Timestamp < Remote Version/Timestamp
    Find Remote > Local Version
    Delete from Remote

We’ve tweaked the mechanisms by which each evaluation of the algorithm determines its local and remote versions (there’s some caching on both the server and client side now, and each request to put/remote data returns new meta information from the server for efficiency) but that’s about it.

This time when we analysed the traffic, we noticed that there was a lot of calls to get table chunk manifests, an important part of the diff checker component of the algorithm, which is primarily meant to remove rows that have been deleted locally from the remote (i.e. hard deletes rather than the much easier to handle soft deletes, which are really just updates).

The problem with there being so many manifest calls is that they are quite expensive, especially for large tables.

Each manifest call requires a database query that can only use the first two components of the clustered index (which is client segmentation data), but must then scan through the remaining data in order to get the “chunk” needed for analysis. This can mean a scan of millions of rows, which is quite intense on read IOPS, because all of that data needs to be brought into memory in order to be evaluated.

But why was the number of manifest calls a problem now?

Sneaky Corporation

It all ties back into the data that we were syncing this time. As I mentioned at the beginning of this article, not only is it a huge table, it also features hard-deletes as the norm, rather than as exceptions. These few things added together create a bit of a perfect storm with regards to diff checker activity, which is why the system was starting to strain as a result of the massive increase in manifest calls.

Conceptually the diff checker part of the algorithm is a batched scan. It starts on one end of the data set, and gets chunks of data sequentially from both locations (local and remote), analysing and reacting as appropriate until it either gets back in sync or it reaches the other end, when it then wraps back around to it’s start point again.

Here’s where we made a mistake that seems obvious in hindsight.

In almost all cases, changes (even hard deletes) are more likely to occur at the top end of the data set, rather than the bottom.

Our scan?

Starts at the bottom and works its way up.

Thus, any time something was deleted from close to the top of the data set, the diff checker would have to scan through the entire table before finding and fixing it. This means more reads (to get the manifest chunks) which means higher IOPS.

This One Goes To Eleven

It really didn’t take a huge amount of effort to flip the diff checker algorithm to go top down instead of bottom up.

In fact, because of the way in which we’d built the code in the first place (using the strategy pattern to implement the actual chunking algorithm) it was as simple as writing a new class and binding it up instead of the old one via our dependency injection container.

The top down logic is basically the same as it is for bottom up, just inverted.

Starting the lower boundary of the last chunk (or the very top of the table if the state was reset), get the next [BATCH SIZE] chunk of rows from the local in a downwards direction. Using that chunk, find the extremities (top and bottom) and ask the remote for an equivalent chunk. Contrast and compare, react accordingly and then remember where you were up to for next time. If the algorithm reaches a point where the local and remote are identical, reset all state and do nothing.

Nothing super fancy, and an extremely obvious performance optimization in retrospect considering how much more likely changes are to occur at the top end rather than the bottom.

Such Strange Things

There are two tricksy bits though, one specific the top down approach and the other that was present in the bottom up approach, but is more obvious when working top down.

  1. When you get the local chunk, and you get less than the batch size, you know that you’re probably at the bottom of the table. If you follow the same rules for asking the remote at this point, you risk leaving data in the remote that is not in the local if its right at the bottom. Instead, when you reach this point, you have to infer a lower boundary of 0 to make sure you get everything.
  2. There is still an inherent inefficiency in the design of the scan, such that it will continue to do extra work for no benefit. Consider the situation where the scan has started and has dealt with 4 chunks. Its a bit down in the table, and the change it originally wanted to find was in chunk 5. In the meantime, the user has deleted data in chunk 2. The process will have to finish its scan of the entire table before it returns to the top and finds that particular change, which is just wasteful. Instead, it should be able to determine if there are any differences in the remainder of the data set (i.e. from the bottom of the current chunk down) and then use that information to decide to reset its position to the top again. We haven’t actually done this bit yet, because it came up after the deployed the first version to our users.

The first issue is a bit of a deal breaker (because it means the process can result in an incomplete data synchronization) so we’re lucky that we noticed it during testing and fixed it right away.

The second issue is not as bad because the process still does what it is supposed to do, just slower. Not great from an optimization point of view, so we’ll still fix it, but not exactly a critical issue.

Conclusion

Like I said a little bit earlier, doing the diff check scan from top to bottom is incredibly obvious in retrospect. All the changes are at the top, why would we start at the bottom of a potentially huge data set and waste time and effort analysing a bunch of data that is not likely to be relevant? Of course, that wasn’t actually obvious until we did some analysis on top of other optimizations, but it really does feel like a bit of a boneheaded mistake.

The good news is that when we implemented the changes as described (algorithm from bottom up to top down) we saw a dramatic decrease in traffic, because the diff checker was running fewer times before it found and fixed the changes.

The bad news is that due to point number two above (the diff checker doesn’t know that it should start from the top again when there are more recent changes) the IOPS usage didn’t really change all that much at all.

Still, the algorithm continues to get better and better and we continue to be able to synchronize more and more data without having to pay for additional infrastructure to support it.

Those are some good engineering feels.