This is the last and the final blog, Kafka Message Ordering Part 4, in the series of understanding and solutionizing the issues related to message ordering. To ensure you understand the intent and maintain continuity I would recommend you to go through the previous blogs where I have covered:
- Factors which can disrupt the producer message order – Kafka Message Ordering Part 1
- Factors responsible for altering message ordering at infrastructure layer and consumer side – Kafka Message Ordering Part 2
- Architecting a solution for message ordering issues using HBase – Kafka Message Ordering Part 3
Due to certain decisions regarding the technology stack (say Spark 2 as message processing tool on Cloudera platform) it becomes impossible to use HBase based solution as highlighted in precious blog. In this, Kafka Message Ordering Part 4, blog I would cover another alternative for the same message ordering problem using Apache Kudu (available on Cloudera platform). We would address the problem at both producer and the consumer side.
Producer side: Here is the list of various activities which need to be taken care of by producer jobs before sending it out for consumption by downstream systems-
- We are targeting a near real time solution wherein events are grouped together in one message (say 400 events per message) before transmitting it over to the consumer.
- An event represents a record in source which has been updated, added or deleted.
- A record at source can change multiple times in a single message window i.e. multiple events being generated and wrapped in a single message.
- A single message can have only 400 events / source records.
- Every event has unique event id i.e. if a record pk1 changed three times than three events would be generated with three unique event ids (could be timestamps for simplicity)
- The latest generated event would have greatest eventid (be it a sequence or timestamp)
- Every event contains two additional columns for metadata. First one is Eventid and the second one is one of the CRUD Operation Type (inserted, updated or deleted).
- If there were 900 events generated in one message window then a total of 3 messages would be sent (2 with 400 events/records and one with 100 events).
- Every message has a unique message id even if it is split. So in case of above point where due to the restriction on events per message, three messages have to be created then all of them would still have the same message id.
This is how a sample set of events looks like:
If you see record with primary key A is updated twice, record with primary key B is deleted and record with primary key C is updated once. Now source has to add additional metadata tags to each message and record which looks something like:
In JSON format both messages could look like:
Consumer Side: At the consumer level-
- Tables are designed with these metadata columns (message id and the event id) as composite primary keys so that we can hold multiple source records with same primary key. Since eventids are different for each event and can help us store same record multiple times for each change at source (could be updates in one or more column of the record).
- Insert everything as such including the metadata information in Kudu with just one change wherein record would be in active state if it is added or updated but would be in inactive state if deleted at source. This is how it looks in the Kudu table:
- Create a non-materialized view on top of this table to pick non duplicated latest active records. This is done for each primary key taking the non-deleted record with highest event and message ids. This is exactly what we want: get the correct snapshot of data based on the processed messages irrespective of the order in which the messages or events were processed:
SQL for this could look like:View SQLOracle PL/SQL12345678CREATE VIEW IF NOT EXISTS db.table AS SELECTKey,PINCODEFROM (SELECT ROW_NUMBER() OVER (PARTITION BY Key ORDER BY MESSAGEID DESC, EVENTID DESC) AS MESSAGE_ROW_NUMBER, db.table.*FROM db.table) TABWHERE MESSAGE_ROW_NUMBER = 1 AND OPERATION = 'A';
- And finally keep on compacting this table as we are inserting everything and for us only the latest record for each primary key is relevant. If you see the #2 step above it contains four rows but the correct and complete snapshot of source data is represented by view ins step #3 which has only 2 rows. This end user view would be faster if we have least redundant data. The SQL for compacting the kudu table could be:
Kudu Table CompactionOracle PL/SQL1234DELETE FROM db.table WHERE CONCAT(KEY, MESSAGEID, EVENTID) IN ( SELECT CONCAT(KEY, MESSAGEID, EVENTID) FROM (SELECT KEY, OPERATION, ROW_NUMBER() OVER (PARTITION BY KEY ORDER BY MESSAGEID DESC, EVENTID DESC) AS MESSAGE_ROW_NUMBERFROM db.table)TAB WHERE MESSAGE_ROW_NUMBER > 1 OR OPERATION = 'D')
If you see, the message ordering is a tricky situation but can be handled elegantly with proper tool selection and configurations at various levels. I hope you would have enjoyed reading this blog series on message ordering and would leverage these concepts to solve related problems in your use cases. Do let me know in case you have any doubts, need more information or have any suggestion on this topic.