今天遇到这么一个问题, http://stackoverflow.com/quest… 跟这位哥们的问题一模一样,Kafka 在 producer 调用 send 发送数据的时候卡住一分钟,精确的一分钟,然后函数返回,没有抛出异常
我比原题中的案例还肯定这一点的原因是我在 send 的前后分别打了日志
在这里, http://blog.chinaunix.net/xmlr… 看到了同样的问题记录,他的问题排查完了发现是没有配置 host,但是我这里是配置了的
于是在日志中把 kafka 的日志放开,得到如下:
[2017-02-24 21:42:46,080][ INFO]ProducerConfig values: request.timeout.ms = 30000 retry.backoff.ms = 100 buffer.memory = 33554432 ssl.truststore.password = null batch.size = 16384 ssl.keymanager.algorithm = SunX509 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.key.password = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.provider = null sasl.kerberos.service.name = null max.in.flight.requests.per.connection = 5 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [BJ-KTDBTEST01:9092, BJ-KTDBTEST02:9092, BJ-KTDBTEST03:9092] client.id = max.request.size = 1048576 acks = 1 linger.ms = 0 sasl.kerberos.kinit.cmd = /usr/bin/kinit ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] metadata.fetch.timeout.ms = 60000 ssl.endpoint.identification.algorithm = null ssl.keystore.location = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.truststore.location = null ssl.keystore.password = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full = false metrics.sample.window.ms = 30000 metadata.max.age.ms = 300000 security.protocol = PLAINTEXT ssl.protocol = TLS sasl.kerberos.min.time.before.relogin = 60000 timeout.ms = 30000 connections.max.idle.ms = 540000 ssl.trustmanager.algorithm = PKIX metric.reporters = [] compression.type = none ssl.truststore.type = JKS max.block.ms = 60000 retries = 0 send.buffer.bytes = 131072 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner reconnect.backoff.ms = 50 metrics.num.samples = 2 ssl.keystore.type = JKS [org.apache.kafka.clients.producer.ProducerConfig::logAll::165::http-nio-8081-exec-62] [2017-02-24 21:42:46,119][ WARN]The configuration request.required.acks = 0 was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62] [2017-02-24 21:42:46,121][ WARN]The configuration metadata.broker.list = BJ-KTDBTEST01:9092,BJ-KTDBTEST02:9092,BJ-KTDBTEST03:9092 was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62] [2017-02-24 21:42:46,121][ WARN]The configuration key.serializer.class = kafka.serializer.StringEncoder was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62] [2017-02-24 21:42:46,121][ WARN]The configuration serializer.class = kafka.serializer.StringEncoder was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62] [2017-02-24 21:42:46,123][ INFO]Kafka version : 0.9.0.0[org.apache.kafka.common.utils.AppInfoParser::<init>::82::http-nio-8081-exec-62] [2017-02-24 21:42:46,123][ INFO]Kafka commitId : fc7243c2af4b2b4a[org.apache.kafka.common.utils.AppInfoParser::<init>::83::http-nio-8081-exec-62] [2017-02-24 21:42:46,129][ WARN]Error registering AppInfo mbean[org.apache.kafka.common.utils.AppInfoParser::registerAppInfo::59::http-nio-8081-exec-62] javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_67] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_67] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_67] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_67] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_67] at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_67] at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57) ~[kafka-clients-0.9.0.0.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:314) [kafka-clients-0.9.0.0.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194) [kafka-clients-0.9.0.0.jar:na] at com.quantex.data.dao.KafkaDao.<init>(KafkaDao.java:71) [classes/:na] at com.quantex.data.tpyappreport.PostPBServlet.<clinit>(PostPBServlet.java:45) [classes/:na] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) [na:1.7.0_67] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) [na:1.7.0_67] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [na:1.7.0_67] at java.lang.reflect.Constructor.newInstance(Constructor.java:526) [na:1.7.0_67] at java.lang.Class.newInstance(Class.java:374) [na:1.7.0_67] at org.apache.catalina.core.DefaultInstanceManager.newInstance(DefaultInstanceManager.java:119) [catalina.jar:8.0.33] at org.apache.catalina.core.StandardWrapper.loadServlet(StandardWrapper.java:1102) [catalina.jar:8.0.33] at org.apache.catalina.core.StandardWrapper.allocate(StandardWrapper.java:828) [catalina.jar:8.0.33] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:135) [catalina.jar:8.0.33] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [catalina.jar:8.0.33] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [catalina.jar:8.0.33] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [catalina.jar:8.0.33] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [catalina.jar:8.0.33] at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) [catalina.jar:8.0.33] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [catalina.jar:8.0.33] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522) [catalina.jar:8.0.33] at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1095) [tomcat-coyote.jar:8.0.33] at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:672) [tomcat-coyote.jar:8.0.33] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1502) [tomcat-coyote.jar:8.0.33] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1458) [tomcat-coyote.jar:8.0.33] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-util.jar:8.0.33] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
从这个日志中其实看不出什么问题,想了半天没有什么思路,抱着试试看的心态把 kafka 的 broker list 从主机名改成了 ip,居然就好了
但是问题是我的 /etc/hosts 里面确实是配了主机名的,而且我 ping 主机名也能够 ping 通,于是开始考虑 java 在实现 getHostByAddr 的时候的实现,看到这里 http://blog.arganzheng.me/post… 说是会查看 /etc/hosts 文件,那就更不应该了啊
写了一段验证代码丢到机器上
public class App { public static void main( String[] args ) { System.out.println( "Hello World!" ); System.out.println("开始根据主机名查 IP 地址"); String dottedQuadIpAddress = null; try { dottedQuadIpAddress = InetAddress.getByName( "BJ-KTDBTEST01" ).getHostAddress(); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("IP 地址是 " + dottedQuadIpAddress); System.out.println("开始根据 IP 地址查主机名"); InetAddress[] addresses = null; try { addresses = InetAddress.getAllByName("10.42.221.40"); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } // ip or DNS name for (int i = 0; i < addresses.length; i++) { String hostname = addresses[i].getHostName(); System.out.println("主机名是 " + hostname); } } }
发现这段代码是瞬间返回的,这个就更不解了,说明 kafka 的主机名和 IP 之间的查询有问题。。
你试试设置
props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, “0”);
这可能并不是IP或主机名的问题
嗯,有可能,当时也没有深究,不过现在也没有当时的现场环境了
我也碰到你的问题了, 检查一下程序里的kafka 配置文件中的ip和端口和kafka集群中的配置是否一致
嗯,是的,当时神奇的点是为啥域名不行