# Manifold Blog

Whenever searching for an optimal solution to a problem, one is faced with design decisions on the appropriate architecture and approach. This post discusses one such problem, in order to highlight key decision points: data synchronization from one store to the other. We contrast two approaches and pose questions that can help inform the design decisions. The approaches we look at are: replicating source actions (insert, update, delete) at the destination data store, and replicating the state of the source store in the destination store.

## The Problem

Given a source-of-truth data store (the source) and a read-only copy (destination), how does one keep the destination consistent with the source? Examples of this type of problem include database read replicas, updating search index, etc.

To define the problem more precisely, we make the following assumptions:
• Source and destination data stores are not necessarily the same (e.g., source can be a MySQL database and destination may be an Elasticsearch index).
• Data will be transformed in some fashion when copied from source to the destination (e.g., we may denormalize source data by joining two tables into a single table in the destination store).
• The synchronization must be done in near real-time. As a practical matter, this means that we cannot copy the entire source data store on periodic basis, but need to keep the destination store up to date incrementally.
• To make the process possible, we assume that the source data store will send an event whenever a piece of data is modified.

### Example

Consider the source data store to be a MySQL database with one table (for simplicity), and we want to keep a Elasticsearch index up-to-date with the source DB. In the source we have a pets table:
`id   | name | kind  | dob--------------------------------1    | Rex  | dog   | 2018-01-012    | Joe  | snake | 2017-03-02`
...which will be replicated in the Elasticsearch pets index as:
`{  "id": 1,  "name": "Rex",  "kind": "dog",  "dob": "2018-01-01"},{  "id": 2,  "name": "Joe",  "kind": "snake",  "dob": "2017-03-02"}`
If a change (INSERT, UPDATE, DELETE) statement is executed in the source DB, we will receive an event with old and new record values. For example, if we update dob in record 1, we should get an event:
`{  event_type: UPDATE,  before_values: { id: 1, name: Rex, kind: dog, dob: 2018-01-01 },  after_values: { id: 1, name: Rex, kind: dog, dob: 2018-03-17 },}`
With this information, we can update the record in the Elasticsearch index:
`PUT pets/_doc/1{ "name": "Rex", "kind": "dog", "dob": "2018-03-17" }`

### Formal Problem Definition

A source system S is defined as a set of records s, and a destination system D as a set of records d. A state of the source (destination) is the set of records s (d) respectively.

There is transformation T: S→ D, between the source and the destination. A destination state d is said to be consistent with the source state s if d = T(s).

For a source system, there is a set of actions A. The effect of an action a in A on the state s is denoted as a new state a(s).

Suppose we have a sequence of actions a1, a2, ... an transforming the state s to state s'=an(...(a2(a1(s))...). This transition, call it X(s, s'), can be represented as the sequence of actions (a1, ..., an), or as a difference between the states (s' - s), or a combination of both.

The synchronization problem can be defined as follows. For a transition X(s, s') of the source, find an action b (or sequence of actions) that will map state d = T(s) of the destination such that b(d) = T(s').

## Replicating Actions

In this approach, for every source change event, we execute a corresponding action updating the destination database. In the example above, if we receive a insert event:
`{  event_type: INSERT,  values: { id: 3, name: Primrose, kind: cat, dob: 2015-03-01 }}`
...we can update the index by executing:
`PUT pets/_doc/3{ "name": "Primrose", "kind": "cat", "dob": "2015-03-01" }`

Formally speaking, given X(s, s') = a (or equivalently s' = a(s)), we need to find a' such that T(s') = a'(T(s)) or equivalently T(a(s)) = a'(T(s)).

## Replicating State

In this approach, we use the source change event merely as a trigger to query the source data and update the destination store based on the result of that query. In our example, we would get the event:
`{  event_type: INSERT,  values: { id: 3, name: Primrose, kind: cat, dob: 2015-03-01 }}`
....query the source using:
`SELECT * FROM pets WHERE id = 3`
...then using the response of the query update the destination store by executing:
`PUT pets/_doc/3{ "name": "Primrose", "kind": "cat", "dob": "2015-03-01" }`
More formally, given X(s, s') = s' - s, we need to find action a' such that a'(T(s)) = T(s').

### Contrasting the Two Approaches

The two approaches look similar, and the differences are only apparent when we look at the implementation architecture. Here is a generic architecture of such a synchronization flow.
We assume that the Event Generator is either part of the Source Data Store or tightly coupled with it, so that each change generates a single event and the events are generated in the same order the changes are executed in the Source Data Store.

The event generated by the Event Generator can either be the action performed on the even store (e.g., an INSERT or UPDATE statement) or change in the store state (e.g., list of modified rows).

The Event Queue serves as a buffer, so that events can be generated by the Event Generator as fast as they occur, regardless of how fast they are processed by the Event Processor.

Finally, the event processor updates the Destination Data Store using one of the above strategies.

Let's contrast the two approaches by looking at the following aspects.

### Events

What kind of events are we able to receive from the source? Each event describes a transition from some state s to a state s' of the source. As described in our formal treatment of the problem above, this can be in terms of actions or in terms of state differences.

Examples of action representation are verbatim SQL statements executed in the source system or HL7 SIU messages (see https://corepointhealth.com/resource-center/hl7-resources/hl7-siu-message/).
`DELETE FROM pets WHERE dob < 2000-01-01`
On the other hand, the state difference will give us detail of what rows (in the database example) have been modified.
`{  event_type: DELETE,  values: { id: 3, name: Shiva, kind: cat, dob: 1999-03-01 }}`
Clearly, the former makes it difficult to infer what part of the source state was affected by the change. It is, however, much more compact in cases where the change affects many rows. For a MySQL discussion of the differences, see https://dev.mysql.com/doc/refman/8.0/en/replication-sbr-rbr.html.

### Performance

In a naive implementation, the Event Processor will process each event serially in the same order it was generated by the Event Generator. This implies that the events must be processed (on average) as fast as they are generated—which may not always be possible, effectively putting a cap on the system's throughput. Typical factors that you should consider are:
• Writes to the Destination Data Store may be slower than writes to the Source Data Store. For example, Destination Data Store may be read optimized at the expense of writes (e.g., AWS Redshift).
• Event processing may be non-trivial and require extra time to process. For example, the processing may include calling additional service.
• Writing to the destination store may not be parallelizable. This may be due to Destination Store limitations or in order to preserve semantics (see replicating actions example below).
Replicating state is generally more expensive than replicating actions, as it requires additional queries to the Source Data Store. However, for complex transformations between the source and the destinations, this may be a good tradeoff.

To get around the problem of keeping up with an incoming stream of events, we may want to optimize the system by:
• processing events in batches, as it is often more efficient to process multiple records at once than one-by-one; or
• processing events in parallel.
The latter approach will inevitably result in events being processed in a different order than they were generated in. Replicating the state is generally more resilient to processing order than replicating actions.

### Processing order

Consider the following changes are made to the source data:
`UPDATE pets SET name='Jill' WHERE id = 7UPDATE pets SET name='Jack' WHERE id = 7`
We expect the resulting state to be that the cat with record id 7 is named 'Jack'. However, if we parallelize processing of the events generated by these updates, depending on the order we process them, we will either get:
`{ "name": "Jack", "kind": "cat", "dob": "2015-03-01" }`
...or:
`{ "name": "Jill", "kind": "cat", "dob": "2015-03-01" }`
If we, however, replicate the state instead of the actions, the following is a possible sequence that shows the approach's invariance with respect to the execution order.
`-- first updateUPDATE pets SET name='Jill' WHERE id = 7`
`-- second updateUPDATE pets SET name='Jack' WHERE id = 7`
`-- process second eventSELECT * FROM pets WHERE id = 7PUT pets/_doc/7{ "name": "Jack", "kind": "cat", "dob": "2015-03-01" }`
`-- process first eventSELECT * FROM pets WHERE id = 7PUT pets/_doc/7{ "name": "Jack", "kind": "cat", "dob": "2015-03-01" }`
Even though the events have been processed in the reversed order, the resulting state is correct. The only downside is that we have made the same change to the Destination Store twice. This can be mitigated by batching up updates, thus trading latency for throughput.

Note: In the above discussion, we made an assumption that processing each event is atomic. This assumption is rarely true in the real world. For example, the event processing above can have the following sequence:
`-- first updateUPDATE pets SET name='Jill' WHERE id = 7`
`-- (start) process first eventSELECT * FROM pets WHERE id = 7`
`-- second updateUPDATE pets SET name='Jack' WHERE id = 7`
`-- process second eventSELECT * FROM pets WHERE id = 7PUT pets/_doc/7{ "name": "Jack", "kind": "cat", "dob": "2015-03-01" }`
`-- (finish) process first eventPUT pets/_doc/7{ "name": "Jill", "kind": "cat", "dob": "2015-03-01" }`
We can prevent this sequence by locking the source row we read (so that processing the second event will block until we finish processing first event), but locking is generally counterproductive when trying to increase throughput through parallel processing.

An alternative method is to partition the source state so that events that are processed in parallel refer to different parts of the state (e.g., disjoint set of records). For example, one can shard the space based on a set of keys in the source tables such as tenant or user ID. Such partitioning may be difficult when replicating actions. It is much harder to determine that two actions in the source system affect disjoint sets of records.

### Idempotency

Replicating state has an additional advantage, as is clear from the above example. It is idempotent: the same event can be processed multiple times with no ill effect. This allows the Event Queue to relax the delivery guarantee from exactly once to at least once (AWS SQS is an example of such a queue).

The same property makes error handling easier.

### Transformations

Another difference between the approaches is in how they handle transformation between the source and destination when they are not 1-1. For example, say the source contains one more table, owners, and the pets table has a foreign key owner_id into the owners table. In our destination data store, we want to flatten this structure by repeating the owner's information in each pet document. For example, if John owns two pets, the destination records may look like:
`{ "pet_name": "Joe", "kind": "snake", "owner_name": "John", "citi": "Boston", ... }{ "pet_name": "Rex", "kind": "Dog", "owner_name": "John", "citi": "Boston", ... }`
In this scenario, we want to change the owner's city from Boston to Cambridge. The update would be:
`UPDATE owners SET city = Cambridge WHERE id = 33`
...and the corresponding event:
`{  event_type: UPDATE,  table: owners,  before_values: { name: John, city: Boston, ... },   after_values: { name: John, city: Cambridge, ... }}`
In the first approach (replicating action), we would need to translate the action above to a corresponding action that would update both documents. Note that finding corresponding actions may be difficult or impossible. Thus complexity of the transformation is a key decision point in determining which strategy to use.

However, if we use the state replication approach, we can simply do:
`SELECT * FROM pets LEFT JOIN owners WHERE owners.id = 33`
...and update destination based on the response of the query:
`PUT pets/_doc/1{ "pet_name": "Joe", "kind": "snake", "owner_name": "John", "citi": "Cambridge", ... }PUT pets/_doc/2{ "pet_name": "Rex", "kind": "Dog", "owner_name": "John", "citi": "Cambridge", ... }`
Note: in both cases, depending on the transformation, a single change in the source may result is a change to a large number of records/documents in the destination store.

Another consideration is whether the transformation loses some information from the source. When mapping external systems, we may not have access to complete internal representation. When faced with this kind of lossy transformation, keeping the destination exactly synchronized with the source by replicating actions is impossible.

### Deletions

Replicating state when records are deleted is a bit more difficult, given that we cannot query the state of what is not there. In this case, we need to find the image in the Destination Store corresponding to the deleted source records and delete those destination records.

### Transactions

Finally, we should consider how the system treats transactions. If the source or destination are not transactional, this may be a moot point. However, assume that both systems are transactional (e.g., relational databases). Ideally, we would like to ensure that the destination is never in a state that does not correspond to some state of the source.

The most strict requirement is that if the source is in a set of states s1, s2, ... sn, the destination will be in exactly the same set of states T(s1), T(s2), ..., T(sn).

We can relax this constraint to allow destination skipping of some intermediate states. For example, if the source transitions through states s1, s2, ..., sn, then the destination should transition to some subsequence of states T(s1), T(s2), ..., T(sn). In other words, it is allowed to skip some intermediate states. This relaxation is useful when we want to process multiple changes in batches, or in parallel.

## Conclusion

Both replicating state and replicating actions can be viable options when we need to keep data in sync. While replicating actions is simpler and more efficient, it is better suited for simple transformations and established fixed sharding strategy.

Replicating state instead of actions provides the advantage that it is more tolerant to the processing and idempotent. This yields a system that is easier to scale and more robust.

Here are some key question to ask:

What events are pushed out by the source system (state or actions)?

If state is pushed, replicating state may be easier than reverse engineering actions. Likewise, if actions are pushed, replicating actions may be easier. Can you modify the source system to better suite your case?

Can you efficiently query the source system to obtain state?

If you cannot, the only option may be to replicate actions.

How complex is the transformation?

As a rule of thumb, simple (as in 1-1) will steer us towards replicating actions, where even modestly complex ones will favor replicating state.

Can the source state (rows) be effectively partitioned?

While this question may not necessarily favor one approach or the other, it is an important question to ask when you consider performance. If performance is important, and one approach makes parallelizing the processes easier, that's an advantage.

Is transactional integrity important?

Is source (destination) transactional? Can one combine multiple transactions into one? Can one break single transaction? Generally, when replicating actions is it easier to respect transaction boundaries.

## Epilogue: Sometimes there are no good answers

My favorite example is replicating the state of a multiple calendar systems (specifically electronic health records — EHRs) into a common representation, which I've had to wrestle with in the past. Each EHR has a different representation of the calendar, and mapping it to a common representation is non-trivial and generally loses some information. The actions, such as booking and cancelling appointments, have different effects on the state of the source system. EHRs report actions (booking, cancelling, etc.) via HL7, and mostly do not offer efficient way to query their state (i.e., the calendar). Based on our discussion above, complex transformation will argue for the replicating state approach. However, inability to query state will force us to use action replication. It's a tough problem.

Topics: Data engineering