kafka 发送消息卡顿 60s

今天遇到这么一个问题, http://stackoverflow.com/quest… 跟这位哥们的问题一模一样,Kafka 在 producer 调用 send 发送数据的时候卡住一分钟,精确的一分钟,然后函数返回,没有抛出异常

我比原题中的案例还肯定这一点的原因是我在 send 的前后分别打了日志

在这里, http://blog.chinaunix.net/xmlr… 看到了同样的问题记录,他的问题排查完了发现是没有配置 host,但是我这里是配置了的

于是在日志中把 kafka 的日志放开,得到如下:

[2017-02-24 21:42:46,080][ INFO]ProducerConfig values: 
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	buffer.memory = 33554432
	ssl.truststore.password = null
	batch.size = 16384
	ssl.keymanager.algorithm = SunX509
	receive.buffer.bytes = 32768
	ssl.cipher.suites = null
	ssl.key.password = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.provider = null
	sasl.kerberos.service.name = null
	max.in.flight.requests.per.connection = 5
	sasl.kerberos.ticket.renew.window.factor = 0.8
	bootstrap.servers = [BJ-KTDBTEST01:9092, BJ-KTDBTEST02:9092, BJ-KTDBTEST03:9092]
	client.id = 
	max.request.size = 1048576
	acks = 1
	linger.ms = 0
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	metadata.fetch.timeout.ms = 60000
	ssl.endpoint.identification.algorithm = null
	ssl.keystore.location = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	ssl.truststore.location = null
	ssl.keystore.password = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	block.on.buffer.full = false
	metrics.sample.window.ms = 30000
	metadata.max.age.ms = 300000
	security.protocol = PLAINTEXT
	ssl.protocol = TLS
	sasl.kerberos.min.time.before.relogin = 60000
	timeout.ms = 30000
	connections.max.idle.ms = 540000
	ssl.trustmanager.algorithm = PKIX
	metric.reporters = []
	compression.type = none
	ssl.truststore.type = JKS
	max.block.ms = 60000
	retries = 0
	send.buffer.bytes = 131072
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	reconnect.backoff.ms = 50
	metrics.num.samples = 2
	ssl.keystore.type = JKS
[org.apache.kafka.clients.producer.ProducerConfig::logAll::165::http-nio-8081-exec-62]
[2017-02-24 21:42:46,119][ WARN]The configuration request.required.acks = 0 was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62]
[2017-02-24 21:42:46,121][ WARN]The configuration metadata.broker.list = BJ-KTDBTEST01:9092,BJ-KTDBTEST02:9092,BJ-KTDBTEST03:9092 was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62]
[2017-02-24 21:42:46,121][ WARN]The configuration key.serializer.class = kafka.serializer.StringEncoder was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62]
[2017-02-24 21:42:46,121][ WARN]The configuration serializer.class = kafka.serializer.StringEncoder was supplied but isn't a known config.[org.apache.kafka.clients.producer.ProducerConfig::logUnused::173::http-nio-8081-exec-62]
[2017-02-24 21:42:46,123][ INFO]Kafka version : 0.9.0.0[org.apache.kafka.common.utils.AppInfoParser::<init>::82::http-nio-8081-exec-62]
[2017-02-24 21:42:46,123][ INFO]Kafka commitId : fc7243c2af4b2b4a[org.apache.kafka.common.utils.AppInfoParser::<init>::83::http-nio-8081-exec-62]
[2017-02-24 21:42:46,129][ WARN]Error registering AppInfo mbean[org.apache.kafka.common.utils.AppInfoParser::registerAppInfo::59::http-nio-8081-exec-62]
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_67]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_67]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_67]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_67]
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_67]
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_67]
	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57) ~[kafka-clients-0.9.0.0.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:314) [kafka-clients-0.9.0.0.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194) [kafka-clients-0.9.0.0.jar:na]
	at com.quantex.data.dao.KafkaDao.<init>(KafkaDao.java:71) [classes/:na]
	at com.quantex.data.tpyappreport.PostPBServlet.<clinit>(PostPBServlet.java:45) [classes/:na]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) [na:1.7.0_67]
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) [na:1.7.0_67]
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [na:1.7.0_67]
	at java.lang.reflect.Constructor.newInstance(Constructor.java:526) [na:1.7.0_67]
	at java.lang.Class.newInstance(Class.java:374) [na:1.7.0_67]
	at org.apache.catalina.core.DefaultInstanceManager.newInstance(DefaultInstanceManager.java:119) [catalina.jar:8.0.33]
	at org.apache.catalina.core.StandardWrapper.loadServlet(StandardWrapper.java:1102) [catalina.jar:8.0.33]
	at org.apache.catalina.core.StandardWrapper.allocate(StandardWrapper.java:828) [catalina.jar:8.0.33]
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:135) [catalina.jar:8.0.33]
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [catalina.jar:8.0.33]
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [catalina.jar:8.0.33]
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [catalina.jar:8.0.33]
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [catalina.jar:8.0.33]
	at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) [catalina.jar:8.0.33]
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [catalina.jar:8.0.33]
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522) [catalina.jar:8.0.33]
	at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1095) [tomcat-coyote.jar:8.0.33]
	at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:672) [tomcat-coyote.jar:8.0.33]
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1502) [tomcat-coyote.jar:8.0.33]
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1458) [tomcat-coyote.jar:8.0.33]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-util.jar:8.0.33]
	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]

从这个日志中其实看不出什么问题,想了半天没有什么思路,抱着试试看的心态把 kafka 的 broker list 从主机名改成了 ip,居然就好了

但是问题是我的 /etc/hosts 里面确实是配了主机名的,而且我 ping 主机名也能够 ping 通,于是开始考虑 java 在实现 getHostByAddr 的时候的实现,看到这里 http://blog.arganzheng.me/post… 说是会查看 /etc/hosts 文件,那就更不应该了啊

写了一段验证代码丢到机器上

public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
        
        System.out.println("开始根据主机名查 IP 地址");
        String dottedQuadIpAddress = null;
		try {
			dottedQuadIpAddress = InetAddress.getByName( "BJ-KTDBTEST01" ).getHostAddress();
		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
        System.out.println("IP 地址是 " + dottedQuadIpAddress);
        
        System.out.println("开始根据 IP 地址查主机名");
        InetAddress[] addresses = null;
		try {
			addresses = InetAddress.getAllByName("10.42.221.40");
		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} // ip or DNS name
        for (int i = 0; i < addresses.length; i++) {
            String hostname = addresses[i].getHostName();
            System.out.println("主机名是 " + hostname);
        }
    }
}

发现这段代码是瞬间返回的,这个就更不解了,说明 kafka 的主机名和 IP 之间的查询有问题。。

4 thoughts on “kafka 发送消息卡顿 60s

  1. 我也碰到你的问题了, 检查一下程序里的kafka 配置文件中的ip和端口和kafka集群中的配置是否一致

Leave a Reply to ZRJ Cancel reply

Your email address will not be published. Required fields are marked *