Architect s Growth Path: In-depth interpretation of Kafka connector error handling and dead letter queue

Architect s Growth Path: In-depth interpretation of Kafka connector error handling and dead letter queue

The Kafka connector is a part of Kafka and is a powerful framework for building streaming pipelines between Kafka and other technologies. It can be used to stream data from multiple places (including databases, message queues, and text files) to Kafka, and stream data from Kafka to the target end (such as document storage, NoSQL, database, object storage, etc.) .

The real world is not perfect and errors are inevitable, so it is best that Kafka's pipeline can handle it as gracefully as possible when errors occur. A common scenario is to get messages on topics that do not match a specific serialization format (for example, when expected to be Avro, it is actually JSON, and vice versa). Since the release of Kafka 2.0, the Kafka connector has included an error handling option , that is, the function of routing messages to a dead letter queue , which is a common technique for building data pipelines.

In this article, we will introduce several common modes of dealing with problems and explain how to implement them.

Stop immediately after failure

Sometimes you may want to stop processing immediately when an error occurs. It may be that the poor quality data is caused by upstream reasons and must be resolved by the upstream. It is meaningless to continue to try to process other messages.

 

This is the default behavior of the Kafka connector, and it can also be specified explicitly using the following configuration items:

errors.tolerance = none duplicated code

In this example, the connector is configured to read JSON format data from the theme and then write it to a plain text file. Note that the FileStreamSinkConnector connector is used for demonstration purposes, and it is not recommended to use it in production.

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_01", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"test_topic_json", "value.converter":"org. apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas. enable": false, "file":"/data/file_sink_01.txt"} }' Copy code

Some JSON format messages in the topic are invalid, the connector will terminate immediately and enter the following FAILED state:

$ curl -s "http://localhost:8083/connectors/file_sink_01/status"|/ jq -c -M'[.name,.tasks[].state]'["file_sink_01","FAILED"] Copy code

Check the log of the worker node of the Kafka connector, you can see that the error has been recorded and the task has been terminated:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334) Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): was expecting double-quote to start field name at [Source: (byte[])"{brokenjson-:"bar 1"}"; line: 1, column: 3] Copy code

To fix the pipeline, you need to resolve the message issue on the source topic. Unless specified in advance, the Kafka connector will not simply "skip" invalid messages. If it is a configuration error (for example, the wrong serialization converter is specified), then it is best. Restart the connector after correcting it. However, if it is indeed an invalid message for this topic, then a way must be found, that is, not to prevent the processing of all other valid messages.

Silently ignore invalid messages

If you just want the processing to continue:

errors.tolerance = all duplicated code

 

In practice, it is roughly as follows:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_05", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"test_topic_json", "value.converter":"org. apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas. enable": false, "file":"/data/file_sink_05.txt", "errors.tolerance": "all"} }' Copy code

After starting the connector (still the original source topic, which contains both valid and invalid messages), you can continue to run:

$ curl -s "http://localhost:8083/connectors/file_sink_05/status"|/ jq -c -M'[.name,.tasks[].state]'["file_sink_05","RUNNING"] Copy code

At this time, even if there are invalid messages on the source topic read by the connector, there will be no errors written to the output of the Kafka connector worker node, and valid messages will be written to the output file as expected:

$ head data/file_sink_05.txt {foo=bar 1}{foo=bar 2}{foo=bar 3}... Copy code

Is it possible to sense the loss of data?

After configuring errors.tolerance = all, the Kafka connector will ignore invalid messages and will not log discarded messages by default. If you confirm the configuration errors.tolerance = all, then you need to carefully consider whether and how to know the actual message loss. In practice this means monitoring/alarming based on available indicators, and/or logging of failure messages.

The easiest way to determine whether any messages have been discarded is to compare the number of messages on the source topic with the number written to the destination:

$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l 150$ wc -l data/file_sink_05.txt 100 data/file_sink_05.txt Copy code

Although this approach is not very elegant, it can indeed be seen that the message is lost, and because there is no record in the log, the user still knows nothing about it.

A more reliable method is to use JMX indicators to actively monitor and alert the error message rate:

 

At this time, you can see that an error has occurred, but you don't know that the message has an error, but this is what the user wants. In fact, even if these discarded messages are written to/dev/null afterwards, it can actually be known. This is exactly the point where the concept of dead letter queue appears.

Route the message to the dead letter queue

The Kafka connector can be configured to send messages that cannot be processed (such as the deserialization error mentioned above) to a separate Kafka topic, the dead letter queue. Valid messages will be processed normally and the pipeline will continue to run. You can then check for invalid messages from the dead letter queue and ignore or fix and reprocess them as needed.

 

The dead letter queue can be enabled by the following configuration:

errors.tolerance = all errors.deadletterqueue.topic.name = Copy code

If you are running on a single-node Kafka cluster, you also need to configure errors.deadletterqueue.topic.replication.factor = 1, and its default value is 3.

An example of a connector configuration with this configuration is roughly as follows:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_02", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"test_topic_json", "value.converter":"org. apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas. enable": false, "file": "/data/file_sink_02.txt", "errors.tolerance": "all", "errors.deadletterqueue.topic.name":"dlq_file_sink_02", "errors.deadletterqueue.topic.replication .factor": 1} }' Copy code

Use the same source theme as before, and then process the mixed valid and invalid JSON data, you will see that the new connector can run stably:

$ curl -s "http://localhost:8083/connectors/file_sink_02/status"|/ jq -c -M'[.name,.tasks[].state]'["file_sink_02","RUNNING"] Copy code

The valid records in the source subject will be written to the target file:

$ head data/file_sink_02.txt {foo=bar 1}{foo=bar 2}{foo=bar 3}[ ] Copy code

In this way, the pipeline can continue to run normally, and there is still data in the dead letter queue topic, which can be seen from the indicator data:

 

Examining the subject itself can also be seen:

ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups------------------------------------- -------------------------------------------------- ------------ dlq_file_sink_02 | false | 1 | 1 | 0 | 0 test_topic_json | false | 1 | 1 | 1 | 1---------------- -------------------------------------------------- ---------------------------------ksql> PRINT'dlq_file_sink_02' FROM BEGINNING;Format:STRING1/24/19 5 :16:03 PM UTC, NULL, {foo:"bar 1"}1/24/19 5:16:03 PM UTC, NULL, {foo:"bar 2"}1/24/19 5:16:03 PM UTC, NULL, {foo:"bar 3"}... Copy code

As you can see from the output, the timestamp of the message is (1/24/19 5:16:03 PM UTC), the key is (NULL), and then the value. At this time, you can see that the value is invalid JSON format {foo:"bar 1"} (foo should also be quoted), so JsonConverter will throw an exception when processing it, so it will eventually be output to the dead letter topic.

But only by seeing the message can you know that it is invalid JSON. Even so, you can only assume the reason why the message was rejected. To determine the actual reason why the Kafka connector treats the message as invalid, there are two methods:

  • The message header of the dead letter queue;
  • The log of the worker node of the Kafka connector.

They will be introduced separately below.

Reason for failure to record message: message header

The message header is additional metadata stored using the key, value, and timestamp of the Kafka message, which was introduced in version 0.11 of Kafka. The Kafka connector can write information about the reason for the message rejection into the message header of the message itself. This approach is better than writing to a log file because it directly links the cause to the message.

Configure the following parameters to include the rejection reason in the message header of the dead letter queue:

errors.deadletterqueue.context.headers.enable = true copy the code

The configuration example is roughly as follows:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_03", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"test_topic_json", "value.converter":"org. apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas. enable": false, "file": "/data/file_sink_03.txt", "errors.tolerance": "all", "errors.deadletterqueue.topic.name":"dlq_file_sink_03", "errors.deadletterqueue.topic.replication .factor": 1, "errors.deadletterqueue.context.headers.enable":true} }' Copy code

As before, the connector can operate normally (because errors.tolerance=all is configured).

$ curl -s "http://localhost:8083/connectors/file_sink_03/status"|/ jq -c -M'[.name,.tasks[].state]'["file_sink_03","RUNNING"] Copy code

The valid messages in the source subject will be written to the target file normally:

$ head data/file_sink_03.txt {foo=bar 1}{foo=bar 2}{foo=bar 3}[ ] Copy code

You can use any consumer tool to check the messages on the dead letter queue (KSQL was used before), but kafkacat will be used here, and the reason will be seen immediately. The simplest operation is roughly as follows:

kafkacat -b localhost:9092 -t dlq_file_sink_03 % Auto-selecting Consumer mode (use -P or -C to override){foo:"bar 1"}{foo:"bar 2"} Copy code

But kafkacat has more powerful functions, you can see more information than the message itself:

kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1/ -f'\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n' Copy code

This command will get the last message (-o-1, use the last message for the offset), read only one message (-c1), and format it with the -f parameter to make it easier to understand:

Key (-1 bytes): Value (13 bytes): {foo:"bar 5"} Timestamp: 1548350164096 Partition: 0 Offset: 34 Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect .errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors .exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace= org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:[ ] Copy code

It is also possible to display only the message header and use some simple techniques to split it so that you can see more information about the problem more clearly:

$ kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 -f'%h'|tr',''\n' __connect.errors.topic=test_topic_json__connect.errors.partition=0__connect.errors.offset=94__connect.errors.connector.name=file_sink_03__connect.errors.task.id=0__connect.errors.stage=VALUE_CONVERTER__connect.errors.class.name=org. apache.kafka.connect.json.JsonConverter__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error: Copy code

Each message processed by the Kafka connector comes from the source topic and a specific point (offset) in the topic, and the message header has accurately stated this. So you can use it to go back to the original topic and check the original message when needed. Since the dead letter queue already has a copy of the message, this check is more like an insurance practice.

Based on the detailed information obtained from the message header above, you can check the source message again:

__connect.errors.topic=test_topic_json __connect.errors.offset=94 Copy code

Insert these values into the -t and -o parameters representing the topic and offset of kafkacat, respectively, and you can get:

$ kafkacat -b localhost:9092 -C/ -t test_topic_json -o94/-f'\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Topic: %t\n'Key (- 1 bytes): Value (13 bytes): {foo:"bar 5"} Timestamp: 1548350164096 Partition: 0 Offset: 94 Topic: test_topic_json Copy code

Compared with the above messages in the dead letter queue, you can see that they are exactly the same, even including the timestamp. The only difference is the subject, offset, and message header.

Reason for failure in logging message: log

The second option for logging the reason for rejection of a message is to write it to the log. Depending on the installation method, the Kafka connector will write it to standard output or log files. Either way, a bunch of detailed output is generated for each failed message. Perform the following configuration to enable this feature:

errors.log.enable = true copy the code

By configuring errors.log.include.messages = true, you can also include metadata about the message itself in the output. This metadata includes some of the same items as in the message header mentioned above, including the subject and offset of the source message. Note that it does not include the message key or value itself.

The connector configuration at this time is as follows:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_04", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"test_topic_json", "value.converter":"org. apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas. enable": false, "file": "/data/file_sink_04.txt", "errors.tolerance": "all", "errors.log.enable":true, "errors.log.include.messages":true} }' Copy code

The connector can run successfully:

$ curl -s "http://localhost:8083/connectors/file_sink_04/status"|/ jq -c -M'[.name,.tasks[].state]'["file_sink_04","RUNNING"]Valid records from the source topic get written to the target file:$ head data/file_sink_04.txt{foo= bar 1}{foo=bar 2}{foo=bar 3}[ ] Copy code

At this time, look at the worker node log of the Kafka connector, and you will find that each failed message has an error record:

ERROR Error encountered in task file_sink_04-0. Executing stage'VALUE_CONVERTER' with class'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test_topic_json', partition=0, offset=94, timestamp =1548350164096, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)[ ] Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3] Copy code

You can see the error itself, as well as the information related to the error:

{topic = 'test_topic_json', partition = 0, offset = 94, timestamp = 1548350164096, timestampType = CreateTime} copy the code

As shown above, the topic and offset can be used in tools such as kafkacat to check the message on the source topic. Depending on the exception thrown, you may also see the recorded source message:

Caused by: org.apache.kafka.common.errors.SerializationException: At [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3] Copy code

Process messages from the dead letter queue

Although a dead letter queue is set up, how to deal with those "dead letters"? Because it is just a Kafka topic, you can use standard Kafka tools like any other topic. As we have seen above, for example, you can use kafkacat to check the message header, and kafkacat can also do general checks on the content of the message and its metadata. Of course, depending on the reason for the rejection, you can also choose to rebroadcast the message.

One scenario is that the connector is using the Avro converter, but the topic is a JSON format message (and therefore is written to the dead letter queue). Probably due to legacy reasons, producers of JSON and Avro formats are writing the source topic. This problem has to be solved, but currently only the data in the pipeline stream is written to the receiver.

1. start with the initial sink reading the source topic, use Avro to deserialize and route to the dead letter queue:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_06__01-avro", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"test_topic_avro", "file":"/data/file_sink_06.txt", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value .converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "errors.tolerance":"all", "errors.deadletterqueue.topic.name":"dlq_file_sink_06__01", "errors.deadletterqueue.topic.replication.factor":1, "errors.deadletterqueue.context.headers.enable":true, "errors.retry.delay.max.ms": 60000, "errors.retry.timeout": 300000} }' Copy code

In addition, create a second receiver, use the dead letter queue of the first receiver as the source topic, and try to deserialize the record into JSON. Here, you need to change the value.converter, key.converter, source topic name, and Dead letter queue name (if this connector needs to route any messages to the dead letter queue, avoid recursion).

 

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d'{ "name": "file_sink_06__02-json", "config": {"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "topics":"dlq_file_sink_06__01", "file":"/data/file_sink_06.txt", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka .connect.json.JsonConverter", "key.converter.schemas.enable": false, "errors.tolerance":"all", "errors.deadletterqueue.topic.name":"dlq_file_sink_06__02", "errors.deadletterqueue.topic .replication.factor":1, "errors.deadletterqueue.context.headers.enable":true, "errors.retry.delay.max.ms": 60000, "errors.retry.timeout": 300000} }' Copy code

You can verify it now.

1. the source topic receives 20 Avro messages, and then you can see that 20 messages have been read and received by the original Avro receiver:

 

Then send 8 JSON messages, at this time 8 messages are sent to the dead letter queue, and then received by the JSON receiver:

 

Now send 5 more incorrectly formatted JSON messages, and then you can see that both have failed messages. There are 2 points to confirm:

  1. There is a difference between the number of messages sent from the Avro receiver to the dead letter queue and the number of successfully sent JSON messages;
  2. The message is sent to the dead letter queue of the JSON receiver.

 

Monitoring dead letter queue through KSQL

In addition to using JMX to monitor the dead letter queue, you can also use the aggregation capabilities of KSQL to write a simple streaming application to monitor the rate at which messages are written to the queue:

- Register the flow for each dead letter queue topic. CREATE STREAM dlq_file_sink_06__01 (MSG VARCHAR) WITH (KAFKA_TOPIC ='dlq_file_sink_06__01', VALUE_FORMAT ='DELIMITED'); CREATE STREAM dlq_file_sink_06__02 (MSG VARCHAR_TOPIC from the beginning of VALUE_DELIMITED'); CREATE STREAM dlq_file_sink_06__02 (MSG VARCHAR_TOPIC from VALUE_DELIM_TOPIC) Consumption data SET'auto.offset.reset' ='earliest'; - Use other columns to create a monitoring stream, which can be used for subsequent aggregation queries CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS/SELECT'dlq_file_sink_06__01' AS SINK_NAME,/'Records: 'AS GROUP_COL,/MSG/FROM dlq_file_sink_06__01; - Use the message from the second dead letter queue to inject the same monitoring flow INSERT INTO DLQ_MONITOR/SELECT'dlq_file_sink_06__02' AS SINK_NAME,/'Records:' AS GROUP_COL,/MSG/FROM dlq_file_sink_06__02;-- Create an aggregate view of messages within the time window of each minute of each dead letter queue CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS/SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS,/SINK_NAME,/GROUP_COL,/COUNT(*) AS DLQ_MESSAGE_COUNT/FROM DLQ_MONITOR/WINDOW TUMBLING (SIZE 1 MINUTE)/GROUP BY SINK_NAME,/GROUP_COL; Copy code

This aggregate table can be queried interactively. The following shows the number of messages in each dead letter queue in one minute:

ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_MESSAGE_COUNT_PER_MIN; 2019-02-01 02:56:00 | dlq_file_sink_06__01 | 92019-02-01 03:10:00 | dlq_file_sink_06__01 | 82019-02-01 03:12:00 | dlq_file_sink_06__01 | 52019-02-01 02:56:00 | dlq_file_sink_06__02 | 52019-02-01 03:12:00 | dlq_file_sink_06__02 | 5 Copy code

Because the Kafka topic is below this table, it can be routed to any desired monitoring dashboard, and it can also be used to drive alarms. Assuming that a few error messages are acceptable, but more than 5 messages in one minute is a big problem that needs attention:

CREATE TABLE DLQ_BREACH AS/ SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT/FROM DLQ_MESSAGE_COUNT_PER_MIN/WHERE DLQ_MESSAGE_COUNT>5; Copy code

Now there is a DLQ_BREACH topic that the alarm service can subscribe to. When any message is received, it can trigger an appropriate action (such as notification).

ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH; 2019-02-01 02:56:00 | dlq_file_sink_06__01 | 92019-02-01 03:10:00 | dlq_file_sink_06__01 | 8 Copy code

Where does the Kafka connector not provide error handling?

The error handling method of the Kafka connector is shown in the following table:

Does the connector life cycle stage description handle errors? When the connector is started for the first time, it will perform the necessary initialization, such as connecting to the data store, no pull (for the source connector), reading messages from the source data store, unformat conversion, reading and writing data from Kafka topics and JSON/Avro format For serialization/deserialization, there is a single-message conversion application, any configured single-message conversion, there is a reception (for the receiving connector), the message is written to the target data store none

Note that the source connector does not have a dead letter queue.

Error handling configuration process

Regarding the configuration of connector error handling, you can step by step according to the following process:

 

summary

Handling errors is an important part of any stable and reliable data pipeline. Depending on how the data is used, there are two options. If any error message of the pipeline is unacceptable, indicating that there is a serious problem upstream, then processing should be stopped immediately (this is the default behavior of the Kafka connector).

On the other hand, if you just want to stream data to storage for analysis or non-critical processing, it is more important to keep the pipeline running stably as long as errors are not propagated. At this time, you can define the error handling method. The recommended way is to use a dead letter queue and closely monitor the available JMX metrics from the Kafka connector.

At last

Readers who see this can forward and follow, and more selected articles will be updated in the future!