This is third blog, Kafka Message Ordering Part 3, in the four part series to develop understanding of how to maintain ordering of message in Kafka. I would highly recommend you to go through both of the previous blogs on Kafka Message Ordering where I have detailed out the:
- Factors which can disrupt message ordering at producer level – Kafka Message Ordering Part 1
- Factors responsible for altering message ordering at infrastructure layer and consumer side – Kafka Message Ordering Part 2
The objective of this Kafka Message Ordering Part 3 blog is to present a solution which can be adopted to meet real-time ordered message ingestion using bigdata technology stack. We all know about small files problem with namenode in hadoop which essentially implies that hadoop is meant for small number of large files instead of large number of small files. And if we are talking about real-time ingestion of messages coming over kafka (assuming message size are up-to a couple of megs only) then it essentially is putting us into the same boat of having multiple small files (one for each message) unless we have a framework which can handle record/message level CRUD operation. There are two such data stores which are used quite often and provide the above mentioned CRUD functionality in the purview of bigdata: Apache HBase and Apache Kudu.
Also as discussed in the first blog, Kafka Message Ordering Part 1, there could be broadly two types of message ordering issues:
- Individual messages/events are coming out of order
- Individual events are grouped together and then divided into manageable chunks of messages. And then these messages can arrive from producer to consumer in out of order
Solution for both of these scenarios follow the same architecture be it using HBase or Apache Spark.
Apache HBase: Despite of some drawbacks HBase is a fantastic tool to employ in real time use cases. In our scenario we want to handle the messages coming in out of sequence at consumer side and ensure that we always see the correct state of data. Assuming the below architecture:
If we are using HBase as a data source here, then it is quite easy to manage CRUD operations at record and even cell level. For illustrative purpose I am using HBase shell but in real time implementations API usage is recommended. For the sensor setup in the first blog, suppose we create a table ‘sensordata’ with one column family ‘seabed’ as:
If you describe it:
Let’s now store data in a column in seabed column family at a timestamp 1,
where sensordata is the table name, seabed is column family name, pk1 is row key, reading is a column in seabed column family, HighTide is the value of the reading column for the given row key, and 1 is the timestamp (just kept it simple enough for our example). If we query this row:
Similarly adding a couple more rows and querying them:
Now just imagine that we process the latest data coming at t10 (denoted by timestamp value 10 in below example) is arrived and processed out of sequence as compared to other events generated and sent prior to the one at t10. Note carefully at any point in time due to very (default) nature of HBase we always get the latest data out from each cell for a given row even if it was processed first:
As you have seen above HBase alleviates a lot of concerns around message/event ordering due to its inbuilt feature around time based data management even at column level. So irrespective of any changes in the message ordering at any previous layer doesn’t pose any threat to data integrity. HBase would show the correct and complete latest snapshot of data for all the successfully processed messages.
For any incoming message we just need store the message attributes (or say columns in a table/column family) and when we retrieve the data for a given key we would always get the one which was latest among the processed messages for the given key. This behavior can be extended to any message processing framework which can log data/message in HBase. Always remember if requirement is to show the real time trend as well as their deviation from historical data then it is better to limit the data in HBase and continuously move it to HDFS storage. And on daily basis the historical trends can be computed via batch jobs which can be merged with datasets in HBase on demand in real time.
We will continue our discussion in the next, Kafka Message Ordering Part 4, blog where we would cover the alternate solution for Kafka message ordering issues.