[kafka.utils.ZkUtils$]-[INFO]:68 - I wrote this conflicted ephemeral node [{"version":1,"subscription":{"commodity_service_topic_sm_qty_online":3},"pattern":"static","timestamp":"1441834851437"}] at /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-79231a0f a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry
product_search_indexing.idc1.fn| 2015-09-11 00:00:58.814 [kafka.utils.ZkUtils$]-[INFO]:68 - conflict in /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-947eb885 data: {"version":1,"subscription":{"commodity_service_topic_sm_price_online":3},"pattern":"static","timestamp":"1441834849616"} stored data: {"version":1,"subscription":{"commodity_service_topic_sm_price_online":3},"pattern":"static","timestamp":"1441834849365"} product_search_indexing.idc1.fn| 2015-09-11 00:00:58.815 [kafka.utils.ZkUtils$]-[INFO]:68 - I wrote this conflicted ephemeral node [{"version":1,"subscription":{"commodity_service_topic_sm_price_online":3},"pattern":"static","timestamp":"1441834849616"}] at /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-947eb885 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry product_search_indexing.idc1.fn| 2015-09-11 00:00:59.018 [kafka.utils.ZkUtils$]-[INFO]:68 - conflict in /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-14fc69b2 data: {"version":1,"subscription":{"FAVORITE-Topic-1":3},"pattern":"static","timestamp":"1441834850311"} stored data: {"version":1,"subscription":{"FAVORITE-Topic-1":3},"pattern":"static","timestamp":"1441834849799"} product_search_indexing.idc1.fn| 2015-09-11 00:00:59.019 [kafka.utils.ZkUtils$]-[INFO]:68 - I wrote this conflicted ephemeral node [{"version":1,"subscription":{"FAVORITE-Topic-1":3},"pattern":"static","timestamp":"1441834850311"}] at /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-14fc69b2 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry
2015-09-11 11:33:00

4 Answers

根本原因可能是ZK session 过期。你是否在borker有长GCS?你可能需要调整你的GC或增加ZK session超时时间。
2015-09-11 11:35:14

I have seen the issue reported in the original problem description in our QA environment (3 ZooKeeper, 3 Kafka and several application specific nodes) several times now. I have not tested any configurations where 2 nodes try to claim the same broker id. The problem is triggered when the system is under stress (high I/O and CPU load) and the ZooKeeper connections become unstable. When this happens Kafka threads can get stuck trying to register Brokers nodes and Application threads get stuck trying to register Consumer nodes. One way to recover is to restart the impacted nodes. As an experiment I aslo tried deleting the blocking ZooKeeper nodes (hours later when the system was under no stress). When I did so the createEphemeralPathExpectConflictHandleZKBug would finally complete processing the current Expire message (i.e. add the node), break out of its loop, but, then immediately reenter the loop when it tired to process the next expire message. The few times I tested this approach I had to delete the node dozens of times before the problem would clear itself - in other words there were dozens of Expire messages wating to be processed. Obvoisuly I am looking into this issue from a configuration point of view (avoid the unstable connection issue), but, this Kafka error behavior concerns me.

I have reproduced it (somewhat artificially) in a dev environment as follows:

1) Start one ZooKeeper and one Kafka node.
2) Set a thread breakpoint in KafkaHealthCheck.java.

    def handleNewSession() {
      info("re-registering broker info in ZK for broker " + brokerId)
-->   register()
      info("done re-registering broker")
      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))

3) Pause Kafka.
4) Wait for ZooKeeper to expire the first session and drop the ephemeral node.
5) Unpause Kafka.
6) Kafka reconnects with ZooKeeper, receives an Expire, and establishes a second session.
7) Breakpoint hit and event thread paused before handling the first Expire.
8) Pause Kafka again.
9) Wait for ZooKeeper to expire the second session and delete the ephemeral node (again).
10) Remove breakpoint, unpause Kafka, and finally release the event thread.
11) Kafka reconnects with ZooKeeper, receives a second Expire, and establishes a third session.
12) Kafka registers an ephemeral triggered by the first expire (which triggerd the second session), but, ZooKeeper associates it with the third Session. 
13) Kafka tries to register an an ephemeral triggered by the second expire, but, ZooKeeper already has a stable node.
14) Kafka assumes this node will go away soon, sleeps, and then retries.
15) The node is associcated with a valid session and threfore does not go away so Kafka remains stuck in the retry loop.

I have tested this with the latest code in trunk and noted the same behavior (the code looks pretty similar).

I have coded up a potential patch for this issue based on the following principles:

  1. Ensure that when the node starts stale nodes are removed in main
    • For Brokers this means remove nodes with the same host name and port otherwise fail to start (the existing checker logic)
    • For Consumer nodes don't worry about stale nodes - the way they are named should prevent this from ever happening.
  2. In main add the initial node which should now always work with no looping required - direct call to createEphemeralPath
  3. Create a EphemeralNodeMonitor class that contains:
    • IZkDataListener
    • IZkStateListener
  4. The users of this class provide a path to monitor and a closure that defines what to do when the node is not found
  5. When the state listener is notifed about a new session it checks to see if the node is already gone:
    • Yes, call the provided function
    • No, ignore the event
  6. When the data listener is notified of a deletion it does the same thing
  7. Both the Broker and Comsumer registation use this new class in the same way they curently use their individual state listeners. There only change in behavior is to call createEphemeralPath directly (and avoid the looping code).

Since all this work should be done in the event thread I don't think there are any race conditions and no other nodes should be adding these nodes (or we have a serious configuration issue that should have been detected at startup).

One assumption is that we will always recieve at least one more event (expire and/or delete) after the node is really deleted by ZooKeeper. I think that is a valid assumption (ZooKeeper can't send the delete until the node is gone). If the node was present when when we process the final Expire message then we should get notified of a delete if that node was actually related to a previous session. I wonder if we could get away with just monitoring node deletions, but, that seems risky. The only change in behavior should be that if the expire is recieved before the node is actually deleted then event loop is not blocked and could process other messages while waiting for the delete event.

Note: I have not touched the leader election / contoller node code (the third user of the createEphemeralPathExpectConflictHandleZKBug code). That still uses the looping code. I did apply the KAFKA-1451 patch to our build.

If there is any interest in the code I can provide a patch of what I have so far. I would very much like to get feedback. I was not sure of the protocol for submitting patches for comment.

2015-09-11 11:38:28
I think the main issue here is when there is a zookeeper session timeout, the zkClient will re-try write the data which could be already committed to ZK and failed. This issue is the same as the one causing KAFKA-1382. But I think their fixes would be different.
2015-09-11 11:37:07
查看是否broker.id 设置成一样的在同一服务器上消费
2015-09-11 11:39:10


  • C#中using指令的几种用法

    using + 命名空间名字,这样可以在程序中直接用命令空间中的类型,而不必指定类型的详细命名空间,类似于Java的import,这个功能也是最常用的,几乎每个cs的程序都会用到

  • C#实例解析适配器设计模式


  • C#运行时相互关系


  • C#协变和逆变


  • 使用托管C++粘合C#和C++代码(二)

    本文实现一下C++代码调用C#代码的过程。我构造一个简单并且直观的例子:通过C++ UI 触发C# UI.

  • C#开发高性能Log Help类设计开发


  • Async和Await使异步编程更简单


  • C#开发中的反射机制


  • C#基础概念之延迟加载

    延迟加载(lazy load)是Hibernate3关联关系对象默认的加载方式,延迟加载机制是为了避免一些无谓的性能开销而提出来的,所谓延迟加载就是当在真正需要数据的时候,才真正执行数据加载操作

  • 使用托管C++粘合C#和C++代码(一)


  • C#中的索引器的简单理解和用法


  • 深入C# 序列化(Serialize)、反序列化(Deserialize)