Why Did We Start Working on Streaming Replication
I was in a harbor waiting for a ferry, when I suddenly got pulled into a support request to troubleshoot a stalling cluster. Talking with the DBA it soon turned out that his cluster was executing a background batch job for maintenance. This maintenance job executed a big number of data changes at a low pace in a single transaction. The cluster hang started soon after the batch job had committed in the first node. The hang was caused by this one big transaction taking a long time to apply in secondary nodes. The cluster was apparently choking for a too big bite.
The DBA was eager to start killing potentially harmful connections, but in a cluster, this is hardly ever a good idea, and especially in this situation would not be even effective. Our options were to shutdown all but the first node, and join nodes back, one by one by State Snapshot Transfer (SST), or alternatively we could just wait. Based on the information of the maintenance transaction, the expected downtime by waiting was possible to tolerate. The customer problem was solved, by waiting, but the total toll was 27 minutes of cluster stalling. When I boarded the ferry, it was obvious that something must be done to prevent large transactions choking clusters.
Metrics on the Problem
After the trip, and back to my laboratory, I started simulating the problem seen on the customer site. And I finally came up with a test scenario simulating the turbulence caused by a large transaction in a cluster. In the scenario, the cluster is processing a constant flow of well behaving small transactions, which should saturate the transaction through-put rate at a stable level. And then we enter a large transaction to execute in the first node and monitor how long it takes to operate in the first node and what happens after the replication.
But enough talking, see for yourself what happens when a large transaction hits the first node in the cluster. In this exercise a two node cluster is installed on my laptop, which has also background media apps running to keep the field engineer entertained for otherwise monotonic benchmarking work ahead. So this is not a perfect deployment for stability, and metrics show some fluctuation, but overall, the impact of large transaction for transaction throughput is clearly visible from the metrics recorded.
The benchmark has a very small database (4 tables, 1K rows), where 8 client connections submit randomly single row autocommit updates into the first cluster node. Despite the small data set, these updates are mostly non-conflicting, and the cluster can execute this transaction load at an average of 2.3K transactions per second. These connections simulate the normal payload transaction load in the cluster.
There is also a separate table with 2.5M rows, and to simulate a large transaction, the benchmark launches a single update statement to update all the rows in the big table. Running the large update, alone in an idle cluster takes ~20 secs.
Here is a visualization, of what happens when the large update is submitted to the cluster while having to simultaneously process the payload.
Here you can see that the payload transaction load is initially quite stable in average 2.3K trx/sec level (with one dip down to 1.3 trx/sec level, though). The large transaction is submitted at the “Huge trx Begin” mark, and the execution of the large transaction continues in the first node for 47 secs, until the “Huge Trx Replication” mark in the chart. Due out the large transaction execution phase, cluster transaction throughput goes down to level of ~2K trx/sec. This dip is simply because of the low performance of the cluster HW capacity (both nodes compete for the same CPU cycles, and the large transaction takes its share of first node processing capacity). In a high performance cluster deployment, the large transaction effect during the executing phase in the first node, is generally not seen.
At the “Huge trx replication” mark, the large transaction has completed the executing phase and it replicates one write set containing all the update row events. The large transaction also commits in the first node in this point, and from the perspective of the application who has sent the large transaction, all looks good now. The large transaction was executed in 47 secs, which was more or less the expected execution time and it has successfully committed in the first node.
However, the actual problem only begins now. When the write set of the large transaction arrives into the secondary node, it will be applied among all other payload transactions’ write sets. The secondary node is configured with 8 applier threads, so it has enough resources for applying these non conflicting write sets in parallel. But, as the cluster has strict commit order policy, all write sets must queue for commit ordering after the applying phase. Now that the applying of the large transaction’s write set takes a lot longer than the reasonable size payload transactions, it means that of the secondary node’s replication applier threads, one is applying the large transaction, and 7 appliers have completed applying payload transactions and are waiting for commit order after the large transaction. In the chart, you can see that this applying phase lasts 21 secs. No transaction can commit during this time, in any cluster node, cluster remains in stalling state.
A quick reminder though: the Galera replication library could be configured to allow out of order committing, by variable: repl.commit_order. But for data safety, this should never be done. Data safety is the first and foremost requirement for a database cluster, after all.
In the chart the “Huge trx end” mark shows when the large transaction was finally committed in the secondary node, and the cluster returns to normal payload transaction processing state. Transaction throughput saturates back to 2.3K trx/sec rate.
Limiting Max Transaction Size
The problem of cluster choking under large transactions was anticipated very early during the development of Galera, many years back. Protecting the cluster from the effect of large transactions was one design goal, and for this two approaches were developed. With wsrep_max_ws_size variable, a cluster can be configured to reject too big transactions. This variable is enforced in MySQL/MariaDB server side, where the size of accumulating replication events (a.k.a binlog events) is monitored and if wsrep_max_ws_size limit is exceeded, the transaction will abort with a warning. This is an effective method for protecting the cluster, but is a rather rude way to do it. The large transaction is allowed to proceed up to the transaction size limit, and is then rolled back This means a lot of wasted resources and nothing got done after all. The default value for wsrep_max_ws_size is 2G, which is too high to protect the cluster from choking.
The Galera replication library has a somewhat similar parameter: repl.max_ws_size. This limits the total size of the replication write set, i.e. the combined size of the replication events and all meta data packed for the write set. Default for repl.max_ws_size is also 2G, so a very big tolerance here.
LOAD DATA command is one potential source for causing large transactions, and to support e.g. migrations to Galera Cluster use, we have developed a special method to split LOAD DATA transactions into small INSERT blocks. This was another attempt to protect clusters from potentially harmful large transactions. LOAD DATA splitting can be configured with wsrep_load_data_splitting variable.
Enter Streaming Replication
Streaming Replication is the technology that finally solves the large transaction problem. In a nutshell, the architecture looks quite simple:
- A large transaction is split into small fragments, which are replicated as separate write sets.
- This transaction fragmentation happens during the transaction execution time in the first node.
- Fragment write sets are applied by a streaming replication transaction in secondary nodes, but not committed. The streaming replication transaction remains alive, to wait for more fragments to arrive for applying.
- At commit time, the last fragment write set will be marked by “commit flag”, which triggers the final commit for the secondary nodes.
With this, secondary nodes do not get a single large transaction write set to digest, but they chew up the transaction with reasonable sized fragments. Each fragment is fast to apply, and will not block other transactions in the commit phase. The following images show how this happens.
Here a large transaction starts to execute in the first node. As long as wsrep_trx_fragment_size is not exceeded, the transaction remains in the first node only.
As soon as wsrep_trx_fragment_size is exceeded, a write set is composed of replication events accumulated for the fragment, and replicated to the cluster. The receiving nodes will launch a special streaming transaction transaction (“SR trx” in the image), which applies the events in the fragment transaction and remains alive in the node.
When the client commits the transaction, the last fragment is replicated with a special commit flag. This informs the receiving nodes that the streaming replication transaction must be committed after applying the last fragment.
With this, the cluster has turned into a distributed transaction processing engine. The large translation is being processed in all cluster nodes at the same time. Secondary nodes follow the execution state of the first node practically one fragment size worth of replication events later.
A Brief History of Streaming Replication Development
We started the development of Streaming Replication with MySQL version 5.6 in 2015. The basic functionality for splitting transactions and applying by long term streaming replication handler transactions was rather straightforward to develop, but as usual, the devil lurks in the details. Synchronous database clusters may lose nodes and/or join new nodes at any time, and long term streaming replication transactions must be possible to bring back to life and catch up to the correct state in new joining nodes. This was probably the hardest development challenge to overcome.
We developed streaming replication support first for MySQL but for first public releases, we migrated the solution for MariaDB side, with the target to get it in 10.3 version in 2017. We missed the 10.3 train, mostly because our change set was too big for reviews within the 10.3 timeframe. But for 10.4 release, one year later, we were well prepared and streaming replication finally got released. Streaming replication depends on the new replication API (wsrep API version 26), a.k.a Galera 4. MariaDB versions 10.3 and earlier use Galera 3 replication, and versions 10.4 and later use the new Galera 4 replication, this is a rather big leap in terms of replication functionality.
In MySQL side, we merged all Galera 4 functionality to the 8.0 release series, which was released in 2020. MySQL 5.7 and earlier use Galera 3 and we have no plans for backporting Galera 4 to 5.7 version.
Using Streaming Replication
Configuring streaming replication is rather simple, we have only two configuration variables for tuning:
- wsrep_trx_fragmentation_unit specifies which unit is used for fragment size metrics. Possible values are:
- Statement – Executed SQL statements in the transaction are counted
- Bytes – The size of accumulated replication events, in bytes, is monitored
- wsrep_trx_fragment_size tells the threshold (by the chosen unit) for max fragment size
The recommended configuration is to set wsrep_trx_fragment_unit = bytes and wsrep_trx_fragment_size to a high enough value to split non-normal potentially harmfully large transactions.
Streaming replication maintains one system table in mysql schema: mysql.wsrep_streaming_log:
|node_uuid||Uuid of the SR transaction in source node|
|trx_id||Transaction ID in the source node|
|seqno||Replication sequence number of the replicated fragment|
|flags||Execution mode flags|
|frag||Actual fragment contents|
Streaming replication manages this table, and end users have no rights for modifying it. Information in this table could be used for monitoring the streaming replication progress in the cluster. E.g. “SELECT node_uuid, trx_id, seqno” would tell the current situation of ongoing streaming replication transactions. The pair (node_uuid, trx_id) identifies a single streaming replication transaction and in which node it is running, and all fragments that have been replicated so far for this transaction, would show up with a separate sequence number in the output.
Large Transaction Simulation with Streaming Replication
Experimenting with a large transaction simulation with streaming replication, shows how a large transaction is processed with different fragment sizes.
In this experiment, wsrep_trx_fragment_size is set to 10K. i.e. After the replication event set’s accumulated size exceeds 10,000 bytes, a new fragment write set is replicated.The “Huge trx begin” and “Huge trx end” marks show when the large transaction begins processing and commits in the first node. The total execution time is 157 secs, and there is no hanging at all.
Remember that in the non fragmenting experiment, the elapsed time experienced by the application was 47 secs and total time experienced by the cluster was 108 secs.
During the large transaction execution time, the total transaction throughput lowers down to level of 1.7 K trx/sec.
The total number of fragments replicated is 18,875, so instead of one replication round we have now close to 20K replications for this single transaction. These excessive replication rounds extend the transaction execution time considerably. On the other hand, the secondary node is following the state of the transaction quite closely to the first node, and overall transaction throughput level is quite stable and not much lower than the first node’s transaction throughput during the large transaction’s execution time in the non streaming replication experiment.
In this experiment, wsrep_trx_fragment_size is set to 100K. i.e. after the replication event set’s accumulated size exceeds 100,000 bytes, a new fragment write set is replicated.The “Huge trx begin” and “Huge trx end” marks show when the large transaction begins processing and commits in the first node. The total execution time is 75 seconds, and there is no hanging at all.
During the large transaction execution time, the total transaction throughput lowers down to level of 1K trx/sec. Total number of fragments replicated is 1,883. This is a lot less than with 10K fragment size, but total transaction throughput has also dropped somewhat, to level of 1K trx/sec.
In this experiment, wsrep_trx_fragment_size is set to 1M. i.e. After the replication event set’s accumulated size exceeds 1,000,000 bytes, a new fragment write set is replicated.The “Huge trx begin” and “Huge trx end” marks show when the large transaction begins processing and commits in the first node. The total execution time is 55 seconds, and there is no hanging at all.
During the large transaction execution time, the total transaction throughput lowers down to level of 0.7K trx/sec. Total number of fragments replicated is now 188.
Recommended Streaming Replication Use Case
The first implementation of streaming replication is conservative and targets for correctness rather than for performance. As seen by the simulation, using small fragment sizes will easily multiply replication heavily. Also handling fragment metadata storage causes additional storage and CPU load for the cluster. If fragment size is configured so low, that regular sized transactions will be fragmented, that would cause considerable performance degradation for the cluster. Therefore our recommendation for streaming replication use case, is to split only harmfully large transactions from the cluster work load. The fragmentation unit should be bytes and fragment size high enough to not split the regular well behaving payload transactions. The actual size depends on the cluster capacity and exact nature of the transaction load, but I would expect that fragmentation size limit in magnitudes of 10-50MB would be suitable for many use cases.
The max write set size variables are still enforced and can be used, but if streaming replication is configured, it does not make sense to use these limits any more.
LOAD DATA splitting is still enforced by wsrep_load_data_splitting configuration, but it is now implemented through streaming replication.
Foundation for Further Development
Streaming replication implements a totally new way for transaction control in the cluster, and this technology can be used as a foundation for other new replication features. First such add-on feature, we have worked on, is XA transaction support in clusters. Note that both MariaDB and MySQL already have XA transaction support, but that is only safe in stand-alone server use. Trying a XA transaction in a cluster would lead to very confusing results. Now, with the help of streaming replication, we can make the whole cluster operate as a XA transaction resource manager.
Our work for XA transaction support was launched as soon as the streaming replication framework was stabilized, but again, the actual releasing of this feature was delayed somewhat. The first pull request for XA transaction support was pushed for MariaDB 10.4 together with streaming replication, but it did not pass the reviews in this round. Next we tried to get XA in for MariaDB 10.5, but again missed the train. MariaDB 10.5 got a fundamental refactoring of native MariaDB XA transaction support, which conflicted with our cluster wide XA transaction support. Now that the MariaDB 10.6 code freeze is quite near, and it looks very promising that cluster XA support will finally get released in 10.6. Our MySQL version for this will come later for MySQL 8.0 series.
Streaming replication development was a big effort and it has turned out to be an effective method for protecting the cluster from harmfully large transactions. The stalling effect of large transactions is so serious that it is better to use this precaution. Or, if nothing else, at least to configure wsrep_max_ws_size low enough to abort transactions growing too big, the 2GB default is apparently too high.
Our current work in this area is focusing now on optimizing the streaming replication performance and experimenting with XA transaction use cases.
Codership welcomes questions and comments in: google groups codership-team discussion