2 years ago
#10109
SUT
Storm KafkaSpout does not get key (only value)
By using org.apache.kafka.clients.producer.* I try to send kafka-messages to a Storm Kafka Spout wit key:long value:String.
By checking the created record befor sending, key and value are set but at the receiving kafka spout, just the values are received. The value of the key is nothing or a tab.
Does anybody know such an issue?
My producer looks like:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaDataProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
The record is created and sent by
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, value);
RecordMetadata metadata = producer.send(record).get();
At the CLI I receive messages with kafka-console-consumer.sh --topic taxi --from-beginning --property print.key=true --property key.separator=" : " --bootstrap-server kafka1:9092
EDIT1: The Storm Kafka Consumer looks like that
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.
builder("PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092,PLAINTEXT://kafka3:9092,", TOPIC)
.setProp(props)
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setOffsetCommitPeriodMs(100)
.build();
builder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1);
EDIT2: The procedure of the producer data is like: Select from database and add to an ArrayList:
private static ArrayList<Pair<Long, String>> selectData(String start, String end) {
Statement statement = null;
ResultSet resultSet;
ArrayList<Pair<Long, String>> results = new ArrayList<>();
try {
if (conn != null) {
statement = conn.createStatement();
}
resultSet = statement.executeQuery("SELECT data1, data2, data3, data4 FROM " +
"tdrive " +
"where date_time between '" +
start +
"' and '" +
end +
"' order by date_time asc;");
while ( resultSet.next() ) {
int id = resultSet.getInt("data1");
String result = "";
result += id;
result += ";";
result += resultSet.getTimestamp("data2");
result += ";";
result += resultSet.getDouble("data3");
result += ";";
result += resultSet.getDouble("data4");
results.add(new Pair<>((long) id, result));
}
} catch (SQLException e) {
e.printStackTrace();
}
return results;
After the data were stored into the ArrayList all data are sent by Kafka:
for (Pair data : selectData(covertTime(selectStartTime), covertTime(selectEndTime))) {
String result = (String) data.getValue1();
produceMessage((Long) data.getValue0(), result);
}
producer.flush();
produceMessage is like:
private static void produceMessage(long key, String value) {
long time = System.currentTimeMillis();
try {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, value);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n", // key:id, value:"id;timestamp;long;lat"
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
} catch (Exception e) {
System.err.println(e);
}
}
I hope after EDIT2, there is not too much code.
Thank you in advance
java
apache-kafka
apache-storm
0 Answers
Your Answer