A Brief Introduction to Apache™Kafka
Kafka is a stream processing service that has Producers publishing records into topics that are read and processed by Consumers. Kafka topics are timestamped, replicated logs of the published messages. Topics can be partitioned for increased storage capacity and for improved parallelism.
As depicted in Figure 1, producer processes publishing to the same topic can choose to (a) write their message to a specific partition (blue arrows) or (b) allow Kafka to distribute/load balance messages across the available partitions (orange arrows). Not counting Kafka’s replicated copies, a published message appears in only one partition.
On the consumer side, a consumer can be either in a consumer group or not in a group. When not a group member, all subscribed consumers will be able to read a published message. This is exemplified as the arrows labelled (c) in Figure 1.
Conversely, members of a consumer group are assigned to read from one or more partitions. When there are more consumers in the group than there are partitions in the topic, a consumer will be assigned to a single partition (arrow (d)). When all partitions are assigned, some consumers can get no assignment and will be unused until needed. On the other hand, when topic partitions outnumber consumers in the group, a consumer will be assigned more than one partition. At any given time, a partition of a topic will be assigned to at most one member of each subscribing group. Kafka knows which message was the last one handed out from each partition. A new subscriber would begin reading from this point unless it positions to an earlier location in the message log.
A Kafka Consumer for Trafodion
This blog examines the techniques available for putting Kafka data into a Trafodion database. It also shows why a commonly used method is not necessarily the best performing.
The Kafka consumer is a Java program that “reads” messages published to Kafka, formats the data as needed, so that it can be “written” via JDBC to a Trafodion database table. Kafka handling in the program can be described simply as follows.
- Instantiating a Kafka class, such as KafkaConsumer, to supply input data to the program
- Setting property values for this class, such as topic name, starting position, broker location, timeout limits, zookeeper location, etc.
- Reading (polling for) messages from the topic
Kafka message publications are typically organized by topic having one or more storage partitions to allow for increased capacity. Kafka keeps track of the highest message offset that has been successfully delivered for each partition. The recorded offset is where message delivery would resume when a new consumer process restarts on this partition. For more details see https://kafka.apache.org/documentation/. Thus, the combination of (topic, partition number, offset) becomes a restart point for a partition. The actual offset value saved is affected by a KafkaConsumer class property setting, enable.auto.commit, or by an explicit call to KafkaConsumer.commitSync().
Next, we’ll look at how to put the Kafka messages into a Trafodion table.
A Trafodion Table
A Trafodion table is an SQL table that is a mapping of a relational SQL table to an HBase table. For this exercise, we used a tabled defined by the following DDL.
To partition a table, either a PRIMARY KEY or a STORE BY clause is required, reference http://trafodion.apache.org/docs/sql_reference/Trafodion_SQL_Reference_Manual.pdf.
The five columns in the row correspond to the five fields of the Kafka message published for this exercise.
In today’s IT environments Kafka might run on one cluster, Trafodion on another, and the Kafka consumer might be on yet another cluster. A four-node cluster was used in these tests. CDH 5.9 and EsgynDB v2.3 ran on all nodes of the cluster. Kafka 2.11 was installed on one node. The Java consumer program ran on another node.
Trafodion SQL supports INSERT and UPSERT DML syntax. INSERT adds a new row, while UPSERT updates an existing matching row, if one exists, otherwise it adds the row. Those are the two techniques we compared. Also, we will explore an INSERT-SELECT technique, employing a user-defined function, that might be suitable in some situations.
DML such as INSERT and UPSERT change the database. In JDBC, database changes are protected by transactions. By default, each DML statement executes as a separate transaction, unless the connection is set to end a transaction only on an explicit request: a commit(). For example, by default, 10 separate INSERT’s would be 10 transactions, with each new row visible immediately after each INSERT. But, with the setting autoCommit=off, the same 10 INSERTs update the table, but the 10 rows will be available only when commit() is called after the tenth INSERT. Put another way, with 10 separate transactions, if the program fails before the sixth INSERT, the five previously INSERTed rows will be visible. In the latter case, none of the rows are available. Committing a group of records at a time is known as batching in JDBC.
These techniques are the bases of the tests. Sample usages of those techniques are shown in the following snippets of code.
In actual practice, Kafka message arrival rates run the gamut from streaming applications to trickle feeds. To see the differences in data ingest techniques, a high volume of data would enable a better comparison. For this, 300,000 144-byte messages were published to a single topic partition. The same Kafka messages were used for each test run.
The target Trafodion table was as described above.
The simple question was: Which performs better, INSERT or UPSERT? Table 1 shows the settings and their results.
Timings for inserting the rows one at a time are included as a baseline reference. The benefits of sending blocks of records to the database versus one by one are obvious.
Clearly, UPSERT in batches performs better than INSERT at any given blocking factor. The best elapsed time was 7.8 seconds to insert 300,000 rows. Your statistics will differ depending on message size, number of table columns, number of records, etc., but UPSERT will still outperform INSERT. In our case, a batch size of 4000 gave the best performance. Finding the “sweet spot” setting for your application will be an iterative process.
Elapsed time to insert all the messages was captured programmatically. The time included the timeout value of one second for the Kafka poll timeout, which occurred when the message stream was exhausted. Adjusting for this would have only made the UPSERT statistics look better.
Why such a difference in performance? SQL semantics for INSERT require checking for an existing duplicate row before the new row can be inserted. Because Trafodion tables are HBase table, the HBase interface for this operation allows just one row at a time. On the other hand, UPSERT overwrites any existing row, so no check is necessary. Furthermore, the HBase interface behind UPSERT accepts a set of rows.
A general survey of the Trafodion techniques for data loading can be found here: https://cwiki.apache.org/confluence/display/TRAFODION/Data+Loading; or in the Trafodion Load and Transform guide, http://trafodion.apache.org/docs/load_transform/index.html#introduction-insert-types.
A comment is needed here about JDBC batching. For these tests, because the Kafka data was already created, there was no delay getting records to fill a batch – it would not have been helpful to introduce latency to our timings of efficiency. In practice, a Kafka data stream might have an irregular arrival rate. Whatever your batch size setting, transactional latency occurs until reaching the batch limit. This can be mitigated with something like the following. You could reduce the batch size to enable committing more frequently, or, after a certain number of Kafka poll timeouts (i.e., no data), commit the rows in the current batch – in essence, dynamically adjusting the batch size based on Kafka input load.
UPSERT syntax has another variant – UPSERT USING LOAD.
As stated in the Trafodion SQL Reference Manual, http://trafodion.apache.org/docs/sql_reference/Trafodion_SQL_Reference_Manual.pdf, this operation occurs without a transaction and is used to put data into an empty table. The typical incarnation of this statement looks as follows.
UPSERT USING LOAD INTO target_table SELECT * FROM source_table
How would this apply in a Kafka scenario? What would be the source table?
Trafodion’s User-Defined Functions (UDF) architecture enables this. A user-coded Java procedure (Table Mapping UDF, or TMUDF) follows prescribed coding protocols to produce a virtual set of rows and columns, that is, a table, that can be used anywhere a table is allowed in an SQL query. For this specific case, we’d need a TMUDF to take Kafka messages with message fields and turn them into rows of data columns. Read more about Trafodion’s UDF architecture here: https://cwiki.apache.org/confluence/display/TRAFODION/Tutorial%3A+The+object-oriented+UDF+interface.
Having already written a functioning Kafka consumer program, the bulk of that coding has already been done. The database handling aspects would not be needed in the UDF – only the Kafka specific parts. Some modifications to this would be needed to conform to the UDF programming model. Thus, a TMUDF for reading a Kafka topic partition was created.
We don’t go over the details of the UDF logic but suffice it to say that the 300,000 messages in the Kafka log are read and 300,000 rows are produced, each containing five columns. The UPSERT statement used by the consumer program looks like this.
UPSERT USING LOAD INTO blogdata SELECT * FROM udf(kafread( …))
Where the list of values are used by the UDF code for the Kafka connection.
When this statement was executed in the consumer program, the results were as shown in Table 2.
Using the UDF for this exercise is an atomic operation – it’s all or none. Unlike the iterative controlled logic of row/batch insert that could be interrupted/resumed, it’s not possible to do that using Insert-Select with UDF.
Thus, while speed to load might be acceptable, considerations for recovery and restart are not in its favor – Kafka message queue positioning information cannot be passed back to the caller by the UDF.
For a Kafka application where the message rates are low, INSERTing or UPSERTing a message at a time to the Trafodion database would probably be good enough. But, for high message rates, the UPSERT implementation yields higher performance than INSERT. Even though the use of JDBC batches can improve INSERT times, the real advantage for UPSERT comes from its set-oriented processing. Old habits might die hard, but instead of thinking INSERT for inserting data into a database, don’t forget about UPSERT.
Source code for the consumer program and the Kafka UDF are here, https://github.com/esgyn/code-examples/tree/master/kafkablog.