Geek_b809ff
胡老师,请教一个问题。用命令行消费是ok的,但是用API消费,在调用了consumer.poll(1000) 方法后就没任何反应了,请问有可能是什么问题?具体实现代码如下,用了线程池
public void start() {
try {
int threadCoreNumber = 5;
int threadMaxNumber = 10;
//启用线程池
executor = new ThreadPoolExecutor(threadCoreNumber, threadMaxNumber, 1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(500), new ThreadPoolExecutor.CallerRunsPolicy());
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
//从kafka中读取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
//自动提交
for (ConsumerRecord<String, String> record : records) {
logger.info(String.format("[consumer][thread:%s] receive message from [Topic:%s -> partition:%s -> offset:%s], message key:%s ,value:%s",
Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value()));
executor.submit(new SaleMngConsumer(record));
}
}
} catch (Exception e) {
logger.info("djfs",e);
//ignore if shutdown
}finally {
logger.info("kafka consumer is close ......");
consumer.close();
}
}
});
thread.start();
} catch (Exception e) {
executor.shutdown();
}
}
作者回复: 同一个consumer实例是在多个线程间共享的吗?KafkaConsumer不是线程安全的,你应该不被允许这么做的