java之kafka AdminClient API 等待节点分配超时

lovecherry 阅读:73 2024-06-20 12:54:19 评论:0

我是 Kafka 的新手,正在尝试使用 AdminClient用于管理在我的本地机器上运行的 Kafka 服务器的 API。我的设置与 quick start 中的完全相同Kafka 文档的部分。唯一的区别是我没有创建任何主题。

我在此设置上运行任何 shell 脚本都没有问题,但是当我尝试运行以下 java 代码时:

public class ProducerMain{ 
 
    public static void main(String[] args) { 
        Properties props = new Properties(); 
        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,  
            "localhost:9092"); 
 
 
        try(final AdminClient adminClient =  
              KafkaAdminClient.create(props)){ 
 
            try { 
                final NewTopic newTopic = new NewTopic("test", 1,  
                    (short)1); 
 
                final CreateTopicsResult createTopicsResult =  
                    adminClient.createTopics(  
                         Collections.singleton(newTopic)); 
 
                createTopicsResult.all().get(); 
 
            }catch (InterruptedException | ExecutionException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
} 

错误: TimeoutException: Timed out waiting for a node assignment
Exception in thread "main" java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. 
at ProducerMain.main(ProducerMain.java:41) 
    <br>Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. 
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258) 
at ProducerMain.main(ProducerMain.java:38) 
<br>Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. 

我在网上搜索了有关问题可能是什么的指示,但到目前为止一无所获。任何建议都是受欢迎的,因为我已经走到了尽头。

请您参考如下方法:

听起来你的经纪人不健康......

这段代码工作正常

public class Main { 
 
    static final Logger logger = LoggerFactory.getLogger(Main.class); 
 
    public static void main(String[] args) { 
        Properties properties = new Properties(); 
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
        properties.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, "local-test"); 
        properties.setProperty(AdminClientConfig.RETRIES_CONFIG, "3"); 
 
        try (AdminClient client = AdminClient.create(properties)) { 
            final CreateTopicsResult res = client.createTopics( 
                    Collections.singletonList( 
                            new NewTopic("foo", 1, (short) 1) 
                    ) 
            ); 
            res.all().get(5, TimeUnit.SECONDS); 
        } catch (InterruptedException | ExecutionException | TimeoutException e) { 
            logger.error("unable to create topic", e); 
        } 
    } 
} 

我可以在代理日志中看到该主题已创建


标签:java
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号