2 years ago

#10109

test-img

SUT

Storm KafkaSpout does not get key (only value)

By using org.apache.kafka.clients.producer.* I try to send kafka-messages to a Storm Kafka Spout wit key:long value:String.

By checking the created record befor sending, key and value are set but at the receiving kafka spout, just the values are received. The value of the key is nothing or a tab.

Does anybody know such an issue?

My producer looks like:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaDataProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

The record is created and sent by

final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, value);
RecordMetadata metadata = producer.send(record).get();

At the CLI I receive messages with kafka-console-consumer.sh --topic taxi --from-beginning --property print.key=true --property key.separator=" : " --bootstrap-server kafka1:9092

EDIT1: The Storm Kafka Consumer looks like that

Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.
    builder("PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092,PLAINTEXT://kafka3:9092,", TOPIC)
    .setProp(props)
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
    .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
    .setOffsetCommitPeriodMs(100)
    .build();

builder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1);

EDIT2: The procedure of the producer data is like: Select from database and add to an ArrayList:

private static ArrayList<Pair<Long, String>> selectData(String start, String end) {
    Statement statement = null;
    ResultSet resultSet;
    ArrayList<Pair<Long, String>> results = new ArrayList<>();

    try {

        if (conn != null) {
            statement = conn.createStatement();
        }

        resultSet = statement.executeQuery("SELECT data1, data2, data3, data4 FROM " +
                "tdrive " +
                "where date_time between '" +
                start +
                "' and '" +
                end +
                "' order by date_time asc;");


        while ( resultSet.next() ) {
            int id = resultSet.getInt("data1");
            String result = "";
            result += id;
                result += ";";
            result += resultSet.getTimestamp("data2");
            result += ";";
            result += resultSet.getDouble("data3");
                result += ";";
        result += resultSet.getDouble("data4");

            results.add(new Pair<>((long) id, result));
        }     
    } catch (SQLException e) {
        e.printStackTrace();
    }
return results;

After the data were stored into the ArrayList all data are sent by Kafka:

for (Pair data : selectData(covertTime(selectStartTime), covertTime(selectEndTime))) {
     String result = (String) data.getValue1();
     produceMessage((Long) data.getValue0(), result);
 }
 producer.flush();

produceMessage is like:

private static void produceMessage(long key, String value) {
    long time = System.currentTimeMillis();

    try {
        final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, value);

        RecordMetadata metadata = producer.send(record).get();

        long elapsedTime = System.currentTimeMillis() - time;
        System.out.printf("sent record(key=%s value=%s) " +
                        "meta(partition=%d, offset=%d) time=%d\n", // key:id, value:"id;timestamp;long;lat"
                record.key(), record.value(), metadata.partition(),
                metadata.offset(), elapsedTime);

    } catch (Exception e) {
        System.err.println(e);
    }
}

I hope after EDIT2, there is not too much code.

Thank you in advance

java

apache-kafka

apache-storm

0 Answers

Your Answer

Accepted video resources