1 year ago

#70353

test-img

r_k

KSQL+Streams+Kubernetes Timeout while waiting for command topic consumer to process command

I'm new in k8s and KSQL, so I would be appreciate for any suggestion. We have an 1 KSQL Server as ETL,3 Kafka,3 Zookeeper and 1 Kafka-Connect Pods in k8s.

While creating new stream we receive an error (I replaced IP and sensitive strings):

 '@type': 'statement_error',
      error_code: 50000,
      message: "Could not write the statement 'CREATE OR REPLACE STREAM <STREAM_NAME> (\n" +
        '  VARCHAR,\n' +
        '  VARCHAR,\n' +
        '  VARCHAR\n' +
        ') WITH (\n' +
        "  KAFKA_TOPIC = 'source.<topic_name>',\n" +
        "  VALUE_FORMAT = 'JSON',\n" +
        '  PARTITIONS = 3,\n' +
        '  REPLICAS = 2\n' +
        ");' into the command topic.\n" +
        'Caused by: Timeout while waiting for command topic consumer to process command\n' +
        '\ttopic',

I checked a Zookeeper, all 3 Brokers available:

[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},
"endpoints":["PLAINTEXT://test-dw-cp-kafka-0.test-dw-cp-kafka-headless.test:9092","EXTERNAL://10.21.13.134:31090"],
"jmx_port":5555,"port":9092,"host":"test-dw-cp-kafka-0.test-dw-cp-kafka-headless.test","version":5,"timestamp":"1641997599570"}
[zk: localhost:2181(CONNECTED) 2]
[zk: localhost:2181(CONNECTED) 2]
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},
"endpoints":["PLAINTEXT://test-dw-cp-kafka-1.test-dw-cp-kafka-headless.test:9092","EXTERNAL://10.21.9.15:31091"],
"jmx_port":5555,"port":9092,"host":"test-dw-cp-kafka-1.test-dw-cp-kafka-headless.test","version":5,"timestamp":"1642487507887"}
[zk: localhost:2181(CONNECTED) 3]
[zk: localhost:2181(CONNECTED) 3]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},
"endpoints":["PLAINTEXT://test-dw-cp-kafka-2.test-dw-cp-kafka-headless.test:9092","EXTERNAL://10.21.12.83:31092"],
"jmx_port":5555,"port":9092,"host":"test-dw-cp-kafka-2.test-dw-cp-kafka-headless.test","version":5,"timestamp":"1642512834413"}

Here the k8s POD's info:

kubectl -n test get pods | egrep 'ksql|kafka|zoo'
test-dw-cp-kafka-0                                       2/2     Running     0          12d
test-dw-cp-kafka-1                                       2/2     Running     0          12d
test-dw-cp-kafka-2                                       2/2     Running     0          12d
test-dw-cp-kafka-connect-5ccc9fd88d-lq62r                2/2     Running     3          12d
test-dw-cp-kafka-rest-5745fdf7bf-zgcx8                   2/2     Running     2          12d
test-dw-zookeeper-0                                      1/1     Running     0          12d
test-dw-zookeeper-1                                      1/1     Running     0          12d
test-dw-zookeeper-2                                      1/1     Running     0          12d
cp-ksql-server-test-7cdb75b7b-vvg9c                      2/2     Running     0          6d1h

From KSQL Server I see error:

WARN [Consumer clientId=_confluent-ksql-dw-etl-fa28b13f-771c-4b75-80b1-f09034a6fb9f-StreamThread-2-consumer, groupId=_confluent-ksql-dw-etl] Error connecting to node test-dw-cp-kafka-1.test-dw-cp-kafka-headless.test:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException:

I can provide all the necessary information on demand

kubernetes

etl

apache-kafka-streams

confluent-platform

ksqldb

0 Answers

Your Answer

Accepted video resources