[kafka.utils.ZkUtils$]-[INFO]:68 - I wrote this conflicted ephemeral node [{"version":1,"subscription":{"commodity_service_topic_sm_qty_online":3},"pattern":"static","timestamp":"1441834851437"}] at /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-79231a0f a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry
product_search_indexing.idc1.fn| 2015-09-11 00:00:58.814 [kafka.utils.ZkUtils$]-[INFO]:68 - conflict in /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-947eb885 data: {"version":1,"subscription":{"commodity_service_topic_sm_price_online":3},"pattern":"static","timestamp":"1441834849616"} stored data: {"version":1,"subscription":{"commodity_service_topic_sm_price_online":3},"pattern":"static","timestamp":"1441834849365"} product_search_indexing.idc1.fn| 2015-09-11 00:00:58.815 [kafka.utils.ZkUtils$]-[INFO]:68 - I wrote this conflicted ephemeral node [{"version":1,"subscription":{"commodity_service_topic_sm_price_online":3},"pattern":"static","timestamp":"1441834849616"}] at /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-947eb885 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry product_search_indexing.idc1.fn| 2015-09-11 00:00:59.018 [kafka.utils.ZkUtils$]-[INFO]:68 - conflict in /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-14fc69b2 data: {"version":1,"subscription":{"FAVORITE-Topic-1":3},"pattern":"static","timestamp":"1441834850311"} stored data: {"version":1,"subscription":{"FAVORITE-Topic-1":3},"pattern":"static","timestamp":"1441834849799"} product_search_indexing.idc1.fn| 2015-09-11 00:00:59.019 [kafka.utils.ZkUtils$]-[INFO]:68 - I wrote this conflicted ephemeral node [{"version":1,"subscription":{"FAVORITE-Topic-1":3},"pattern":"static","timestamp":"1441834850311"}] at /consumers/search_IntoHbase/ids/search_IntoHbase_product_search_indexing.idc1.fn-1441776715521-14fc69b2 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry
2015-09-11 11:33:00

4 Answers

根本原因可能是ZK session 过期。你是否在borker有长GCS?你可能需要调整你的GC或增加ZK session超时时间。
2015-09-11 11:35:14

I have seen the issue reported in the original problem description in our QA environment (3 ZooKeeper, 3 Kafka and several application specific nodes) several times now. I have not tested any configurations where 2 nodes try to claim the same broker id. The problem is triggered when the system is under stress (high I/O and CPU load) and the ZooKeeper connections become unstable. When this happens Kafka threads can get stuck trying to register Brokers nodes and Application threads get stuck trying to register Consumer nodes. One way to recover is to restart the impacted nodes. As an experiment I aslo tried deleting the blocking ZooKeeper nodes (hours later when the system was under no stress). When I did so the createEphemeralPathExpectConflictHandleZKBug would finally complete processing the current Expire message (i.e. add the node), break out of its loop, but, then immediately reenter the loop when it tired to process the next expire message. The few times I tested this approach I had to delete the node dozens of times before the problem would clear itself - in other words there were dozens of Expire messages wating to be processed. Obvoisuly I am looking into this issue from a configuration point of view (avoid the unstable connection issue), but, this Kafka error behavior concerns me.

I have reproduced it (somewhat artificially) in a dev environment as follows:

1) Start one ZooKeeper and one Kafka node.
2) Set a thread breakpoint in KafkaHealthCheck.java.

    def handleNewSession() {
      info("re-registering broker info in ZK for broker " + brokerId)
-->   register()
      info("done re-registering broker")
      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
    }

3) Pause Kafka.
4) Wait for ZooKeeper to expire the first session and drop the ephemeral node.
5) Unpause Kafka.
6) Kafka reconnects with ZooKeeper, receives an Expire, and establishes a second session.
7) Breakpoint hit and event thread paused before handling the first Expire.
8) Pause Kafka again.
9) Wait for ZooKeeper to expire the second session and delete the ephemeral node (again).
10) Remove breakpoint, unpause Kafka, and finally release the event thread.
11) Kafka reconnects with ZooKeeper, receives a second Expire, and establishes a third session.
12) Kafka registers an ephemeral triggered by the first expire (which triggerd the second session), but, ZooKeeper associates it with the third Session. 
13) Kafka tries to register an an ephemeral triggered by the second expire, but, ZooKeeper already has a stable node.
14) Kafka assumes this node will go away soon, sleeps, and then retries.
15) The node is associcated with a valid session and threfore does not go away so Kafka remains stuck in the retry loop.

I have tested this with the latest code in trunk and noted the same behavior (the code looks pretty similar).

I have coded up a potential 0.8.1.1 patch for this issue based on the following principles:

  1. Ensure that when the node starts stale nodes are removed in main
    • For Brokers this means remove nodes with the same host name and port otherwise fail to start (the existing checker logic)
    • For Consumer nodes don't worry about stale nodes - the way they are named should prevent this from ever happening.
  2. In main add the initial node which should now always work with no looping required - direct call to createEphemeralPath
  3. Create a EphemeralNodeMonitor class that contains:
    • IZkDataListener
    • IZkStateListener
  4. The users of this class provide a path to monitor and a closure that defines what to do when the node is not found
  5. When the state listener is notifed about a new session it checks to see if the node is already gone:
    • Yes, call the provided function
    • No, ignore the event
  6. When the data listener is notified of a deletion it does the same thing
  7. Both the Broker and Comsumer registation use this new class in the same way they curently use their individual state listeners. There only change in behavior is to call createEphemeralPath directly (and avoid the looping code).

Since all this work should be done in the event thread I don't think there are any race conditions and no other nodes should be adding these nodes (or we have a serious configuration issue that should have been detected at startup).

One assumption is that we will always recieve at least one more event (expire and/or delete) after the node is really deleted by ZooKeeper. I think that is a valid assumption (ZooKeeper can't send the delete until the node is gone). If the node was present when when we process the final Expire message then we should get notified of a delete if that node was actually related to a previous session. I wonder if we could get away with just monitoring node deletions, but, that seems risky. The only change in behavior should be that if the expire is recieved before the node is actually deleted then event loop is not blocked and could process other messages while waiting for the delete event.

Note: I have not touched the leader election / contoller node code (the third user of the createEphemeralPathExpectConflictHandleZKBug code). That still uses the looping code. I did apply the KAFKA-1451 patch to our 0.8.1.1 build.

If there is any interest in the code I can provide a patch of what I have so far. I would very much like to get feedback. I was not sure of the protocol for submitting patches for comment.

2015-09-11 11:38:28
I think the main issue here is when there is a zookeeper session timeout, the zkClient will re-try write the data which could be already committed to ZK and failed. This issue is the same as the one causing KAFKA-1382. But I think their fixes would be different.
2015-09-11 11:37:07
查看是否broker.id 设置成一样的在同一服务器上消费
2015-09-11 11:39:10
您不能回答该问题或者回答已经关闭!

相关文章推荐

  • C#中using指令的几种用法

    using + 命名空间名字,这样可以在程序中直接用命令空间中的类型,而不必指定类型的详细命名空间,类似于Java的import,这个功能也是最常用的,几乎每个cs的程序都会用到

  • C#实例解析适配器设计模式

    将一个类的接口变成客户端所期待的另一种接口,从而使原本因接口不匹配而无法在一起工作的两个类能够一起工作

  • C#运行时相互关系

    C#运行时相互关系,包括运行时类型、对象、线程栈和托管堆之间的相互关系,静态方法、实例方法和虚方法的区别等等

  • 使用托管C++粘合C#和C++代码(二)

    本文实现一下C++代码调用C#代码的过程。我构造一个简单并且直观的例子:通过C++ UI 触发C# UI.

  • C#开发高性能Log Help类设计开发

    项目中要在操作数据库的异常处理中加入写Log日志,对于商业上有要求,写log时对其它操作尽可能影响小,不能因为加入log导致耗时太多

  • Async和Await使异步编程更简单

    C#5.0中async和await两个关键字,这两个关键字简化了异步编程,之所以简化了,还是因为编译器给我们做了更多的工作

  • C#开发中的反射机制

    反射的定义:审查元数据并收集关于它的类型信息的能力。元数据(编译以后的最基本数据单元)就是一大堆的表,当编译程序集或者模块时,编译器会创建一个类定义表,一个字段定义表,和一个方法定义表等

  • C#协变和逆变

    “协变”是指能够使用与原始指定的派生类型相比,派生程度更大的类型,“逆变”则是指能够使用派生程度更小的类型

  • C#基础概念之延迟加载

    延迟加载(lazy load)是Hibernate3关联关系对象默认的加载方式,延迟加载机制是为了避免一些无谓的性能开销而提出来的,所谓延迟加载就是当在真正需要数据的时候,才真正执行数据加载操作

  • C#中的索引器的简单理解和用法

    C#中的类成员可以是任意类型,包括数组和集合。当一个类包含了数组和集合成员时,索引器将大大简化对数组或集合成员的存取操作

  • 使用托管C++粘合C#和C++代码(一)

    C#在xml读写,数据库操纵,界面构造等很多方面性能卓越;C++的效率高,是底层开发的必备武器

  • 深入C# 序列化(Serialize)、反序列化(Deserialize)

    C#中的序列化和反序列化,序列化是.NET运行时环境用来支持用户定义类型的流化的机制