前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Zookeeper客户端之Kazoo源码剖析

Zookeeper客户端之Kazoo源码剖析

作者头像
tunsuy
发布于 2022-10-27 01:57:55
发布于 2022-10-27 01:57:55
1.2K00
代码可运行
举报
运行总次数:0
代码可运行

启动ZK连接

调用方可以如下创建一个zk连接:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
zookeeper = KazooClient(hosts=get_address(address), timeout=timeout,
                            client_id=client_id, handler=handler,
                            default_acl=default_acl, auth_data=auth_data,
                            read_only=read_only,
                            randomize_hosts=randomize_hosts,
                            connection_retry=con_retry,
                            command_retry=command_retry, logger=logger)
  
try:
  scheme, credential = kazooACL.get_auth_acl()
  zookeeper.start(timeout=90)
  zookeeper.add_auth(scheme, credential)
  zookeeper.default_acl = (kazooACL.get_set_acl_val(),)
  return zookeeper
except Exception:
  zookeeper.stop()
    zookeeper.close()
  raise

KazooClient类

上面首先实例化了一个KazooClient对象, 实例化KazooClient对象的时候,生成了一个ConnectionHandler实例

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
self._connection = ConnectionHandler(
            self, self._conn_retry.copy(), logger=self.logger)

初始化了这个连接对象,进入该连接对象定义:kazoo.protocol.connection.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class ConnectionHandler(object):
    """Zookeeper connection handler"""
    def __init__(self, client, retry_sleeper, logger=None):
        self.client = client
        self.handler = client.handler
        self.retry_sleeper = retry_sleeper
        self.logger = logger or log

        # Our event objects
        self.connection_closed = client.handler.event_object()
        self.connection_closed.set()
        self.connection_stopped = client.handler.event_object()
        self.connection_stopped.set()
        self.ping_outstanding = client.handler.event_object()

        self._read_sock = None
        self._write_sock = None

        self._socket = None
        self._xid = None
        self._rw_server = None
        self._ro_mode = False
        self._ro = False

        self._connection_routine = None

        self.sasl_cli = None

启动zk连接

再启动ZKClient的时候,也就是调用了该ConnectionHandler对象的start方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def start(self):
  """Start the connection up"""
  if self.connection_closed.is_set():
    rw_sockets = self.handler.create_socket_pair()
    self._read_sock, self._write_sock = rw_sockets
    self.connection_closed.clear()
  if self._connection_routine:
    raise Exception("Unable to start, connection routine already "
            "active.")
  self._connection_routine = self.handler.spawn(self.zk_loop)

这里是新起了一个协程进行执行轮训:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def zk_loop(self):
  """Main Zookeeper handling loop"""
  self.logger.log(BLATHER, 'ZK loop started')

  self.connection_stopped.clear()

  retry = self.retry_sleeper.copy()
  try:
    while not self.client._stopped.is_set():
      # If the connect_loop returns STOP_CONNECTING, stop retrying
      if retry(self._connect_loop, retry) is STOP_CONNECTING:
        break
  except RetryFailedError:
    self.logger.warning("Failed connecting to Zookeeper "
              "within the connection retry policy.")
  finally:
    self.connection_stopped.set()
    self.client._session_callback(KeeperState.CLOSED)
    self.logger.log(BLATHER, 'Connection stopped')

def _connect_loop(self, retry):
  # Iterate through the hosts a full cycle before starting over
  status = None
  host_ports = self._expand_client_hosts()

  # Check for an empty hostlist, indicating none resolved
  if len(host_ports) == 0:
    return STOP_CONNECTING

  for host, port in host_ports:
    if self.client._stopped.is_set():
      status = STOP_CONNECTING
      break
    status = self._connect_attempt(host, port, retry)
    if status is STOP_CONNECTING:
      break

  if status is STOP_CONNECTING:
    return STOP_CONNECTING
  else:
    raise ForceRetryError('Reconnecting')

连接状态变更

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _connect_attempt(self, host, port, retry):
  client = self.client
  KazooTimeoutError = self.handler.timeout_exception
  close_connection = False

  self._socket = None

  # Were we given a r/w server? If so, use that instead
  if self._rw_server:
    self.logger.log(BLATHER,
            "Found r/w server to use, %s:%s", host, port)
    host, port = self._rw_server
    self._rw_server = None

  if client._state != KeeperState.CONNECTING:
    client._session_callback(KeeperState.CONNECTING)

  try:
    self._xid = 0
    read_timeout, connect_timeout = self._connect(host, port)
    read_timeout = read_timeout / 1000.0
    connect_timeout = connect_timeout / 1000.0
    retry.reset()
    self.ping_outstanding.clear()
    with self._socket_error_handling():
      while not close_connection:
        # Watch for something to read or send
        jitter_time = random.randint(0, 40) / 100.0
        # Ensure our timeout is positive
        timeout = max([read_timeout / 2.0 - jitter_time,
                 jitter_time])
        s = self.handler.select([self._socket, self._read_sock],
                    [], [], timeout)[0]

        if not s:
          if self.ping_outstanding.is_set():
            self.ping_outstanding.clear()
            raise ConnectionDropped(
              "outstanding heartbeat ping not received")
          self._send_ping(connect_timeout)
        elif s[0] == self._socket:
          response = self._read_socket(read_timeout)
          close_connection = response == CLOSE_RESPONSE
        else:
          self._send_request(read_timeout, connect_timeout)
    self.logger.info('Closing connection to %s:%s', host, port)
    client._session_callback(KeeperState.CLOSED)
    return STOP_CONNECTING
  except (ConnectionDropped, KazooTimeoutError) as e:
    if isinstance(e, ConnectionDropped):
      self.logger.warning('Connection dropped: %s', e)
    else:
      self.logger.warning('Connection time-out: %s', e)
    if client._state != KeeperState.CONNECTING:
      self.logger.warning("Transition to CONNECTING")
      client._session_callback(KeeperState.CONNECTING)
  except AuthFailedError:
    retry.reset()
    self.logger.warning('AUTH_FAILED closing')
    client._session_callback(KeeperState.AUTH_FAILED)
    return STOP_CONNECTING
  except SessionExpiredError:
    retry.reset()
    self.logger.warning('Session has expired')
    client._session_callback(KeeperState.EXPIRED_SESSION)
  except RWServerAvailable:
    retry.reset()
    self.logger.warning('Found a RW server, dropping connection')
    client._session_callback(KeeperState.CONNECTING)
  except Exception:
    self.logger.exception('Unhandled exception in connection loop')
    raise
  finally:
    if self._socket is not None:
      self._socket.close()

当连接的状态变化的时候,都会调用client._session_callback回调方法

监听器

zk的客户端持有一个监听器的集合属性

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class KazooClient


self.state_listeners = set()

增加listener

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def add_listener(self, listener):
  """Add a function to be called for connection state changes.

  This function will be called with a
  :class:`~kazoo.protocol.states.KazooState` instance indicating
  the new connection state on state transitions.

  .. warning::

    This function must not block. If its at all likely that it
    might need data or a value that could result in blocking
    than the :meth:`~kazoo.interfaces.IHandler.spawn` method
    should be used so that the listener can return immediately.

  """
  if not (listener and callable(listener)):
    raise ConfigurationError("listener must be callable")
  self.state_listeners.add(listener)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
执行listener

从上面的分析我们知道,在client跟zk建立连接之后,client会监控session的状态

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _session_callback(self, state):
  if state == self._state:
    return

  # Note that we don't check self.state == LOST since that's also
  # the client's initial state
  dead_state = self._state in LOST_STATES
  self._state = state

  # If we were previously closed or had an expired session, and
  # are now connecting, don't bother with the rest of the
  # transitions since they only apply after
  # we've established a connection
  if dead_state and state == KeeperState.CONNECTING:
    self.logger.log(BLATHER, "Skipping state change")
    return

  if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO):
    self.logger.info("Zookeeper connection established, "
             "state: %s", state)
    self._live.set()
    self._make_state_change(KazooState.CONNECTED)
  elif state in LOST_STATES:
    self.logger.info("Zookeeper session lost, state: %s", state)
    self._live.clear()
    self._make_state_change(KazooState.LOST)
    self._notify_pending(state)
    self._reset()
  else:
    self.logger.info("Zookeeper connection lost")
    # Connection lost
    self._live.clear()
    self._notify_pending(state)
    self._make_state_change(KazooState.SUSPENDED)
    self._reset_watchers()

每个状态的变更都调用了_make_state_change方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _make_state_change(self, state):
  # skip if state is current
  if self.state == state:
    return

  self.state = state

  # Create copy of listeners for iteration in case one needs to
  # remove itself
  for listener in list(self.state_listeners):
    try:
      remove = listener(state)
      if remove is True:
        self.remove_listener(listener)
    except Exception:
      self.logger.exception("Error in connection state listener")

该方法遍历所有的listener,然后执行每个回调方法,比如,DataWatcher的get_data方法

Watcher

DataWatcher

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class DataWatch(object):
  def __init__(self, client, path, func=None, *args, **kwargs):
        """Create a data watcher for a path

        :param client: A zookeeper client.
        :type client: :class:`~kazoo.client.KazooClient`
        :param path: The path to watch for data changes on.
        :type path: str
        :param func: Function to call initially and every time the
                     node changes. `func` will be called with a
                     tuple, the value of the node and a
                     :class:`~kazoo.client.ZnodeStat` instance.
        :type func: callable

        """
        self._client = client
        self._path = path
        self._func = func
        self._stopped = False
        self._run_lock = client.handler.lock_object()
        self._version = None
        self._retry = KazooRetry(max_tries=None,
                                 sleep_func=client.handler.sleep_func)
        self._include_event = None
        self._ever_called = False
        self._used = False

        if args or kwargs:
            warnings.warn('Passing additional arguments to DataWatch is'
                          ' deprecated. ignore_missing_node is now assumed '
                          ' to be True by default, and the event will be '
                          ' sent if the function can handle receiving it',
                          DeprecationWarning, stacklevel=2)

        # Register our session listener if we're going to resume
        # across session losses
        if func is not None:
            self._used = True
            self._client.add_listener(self._session_watcher)
            self._get_data()

在实例化该watch的时候,就会通过_get_data方法通知下所有的客户端执行

增加watcher

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@_ignore_closed
def _get_data(self, event=None):
  # Ensure this runs one at a time, possible because the session
  # watcher may trigger a run
  with self._run_lock:
    if self._stopped:
      return

    initial_version = self._version

    try:
      data, stat = self._retry(self._client.get,
                   self._path, self._watcher)
    except NoNodeError:
      data = None

这里调用了KazooClient的get方法,进而调用get_async

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def get_async(self, path, watch=None):
  """Asynchronously get the value of a node. Takes the same
  arguments as :meth:`get`.

  :rtype: :class:`~kazoo.interfaces.IAsyncResult`

  """
  if not isinstance(path, string_types):
    raise TypeError("Invalid type for 'path' (string expected)")
  if watch and not callable(watch):
    raise TypeError("Invalid type for 'watch' (must be a callable)")

  async_result = self.handler.async_result()
  self._call(GetData(_prefix_root(self.chroot, path), watch),
         async_result)
  return async_result

这里将path、watch封装成了一个对象GetData,也就是下面的request

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _call(self, request, async_object):
  """Ensure there's an active connection and put the request in
  the queue if there is.

  Returns False if the call short circuits due to AUTH_FAILED,
  CLOSED, EXPIRED_SESSION or CONNECTING state.

  """

  if self._state == KeeperState.AUTH_FAILED:
    async_object.set_exception(AuthFailedError())
    return False
  elif self._state == KeeperState.CLOSED:
    async_object.set_exception(ConnectionClosedError(
      "Connection has been closed"))
    return False
  elif self._state in (KeeperState.EXPIRED_SESSION,
             KeeperState.CONNECTING):
    async_object.set_exception(SessionExpiredError())
    return False

  self._queue.append((request, async_object))

  # wake the connection, guarding against a race with close()
  write_sock = self._connection._write_sock
  if write_sock is None:
    async_object.set_exception(ConnectionClosedError(
      "Connection has been closed"))
  try:
    write_sock.send(b'\0')
  except:
    async_object.set_exception(ConnectionClosedError(
      "Connection has been closed"))

这里将watcher放入了队列中。

执行watcher

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _send_request(self, read_timeout, connect_timeout):
  """Called when we have something to send out on the socket"""
  client = self.client
  try:
    request, async_object = client._queue[0]
  except IndexError:
    # Not actually something on the queue, this can occur if
    # something happens to cancel the request such that we
    # don't clear the socket below after sending
    try:
      # Clear possible inconsistence (no request in the queue
      # but have data in the read socket), which causes cpu to spin.
      self._read_sock.recv(1)
    except OSError:
      pass
    return

  # Special case for testing, if this is a _SessionExpire object
  # then throw a SessionExpiration error as if we were dropped
  if request is _SESSION_EXPIRED:
    raise SessionExpiredError("Session expired: Testing")
  if request is _CONNECTION_DROP:
    raise ConnectionDropped("Connection dropped: Testing")

  # Special case for auth packets
  if request.type == Auth.type:
    xid = AUTH_XID
  else:
    self._xid = (self._xid % 2147483647) + 1
    xid = self._xid

  self._submit(request, connect_timeout, xid)
  client._queue.popleft()
  self._read_sock.recv(1)
  client._pending.append((request, async_object, xid))

handler

kazoo.handlers.threading 默认为SequentialThreadingHandler 顺序执行回调的线程执行器,为每一个回调时间创建队列,它们分为两个队列,一个用于监视事件,一个用于异步结果完成回调 每种队列类型都有一个线程工作程序,该工作程序将回调事件从队列中拉出并按客户端看到的顺序运行。这种拆分有助于确保在Zookeeper客户端执行期间连接断开的情况下,watch回调不会阻止会话重建。监视和完成回调应避免阻塞行为。如果需要阻止,请生成一个新线程并立即返回,以便继续进行回调。

队列

1、callback_queue

2、completion_queue

放入队列

当连接的socket通道中收到请求时

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _read_socket(self, read_timeout):
  """Called when there's something to read on the socket"""
  client = self.client

  header, buffer, offset = self._read_header(read_timeout)
  if header.xid == PING_XID:
    self.logger.log(BLATHER, 'Received Ping')
    self.ping_outstanding.clear()
  elif header.xid == AUTH_XID:
    self.logger.log(BLATHER, 'Received AUTH')

    request, async_object, xid = client._pending.popleft()
    if header.err:
      async_object.set_exception(AuthFailedError())
      client._session_callback(KeeperState.AUTH_FAILED)
    else:
      async_object.set(True)
  elif header.xid == WATCH_XID:
    self._read_watch_event(buffer, offset)

这里根据请求header里面的xid判断,当xid为WATCH_XID的时候

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def _read_watch_event(self, buffer, offset):
  client = self.client
  watch, offset = Watch.deserialize(buffer, offset)
  path = watch.path

  self.logger.debug('Received EVENT: %s', watch)

  watchers = []

  if watch.type in (CREATED_EVENT, CHANGED_EVENT):
    watchers.extend(client._data_watchers.pop(path, []))
  elif watch.type == DELETED_EVENT:
    watchers.extend(client._data_watchers.pop(path, []))
    watchers.extend(client._child_watchers.pop(path, []))
  elif watch.type == CHILD_EVENT:
    watchers.extend(client._child_watchers.pop(path, []))
  else:
    self.logger.warn('Received unknown event %r', watch.type)
    return

  # Strip the chroot if needed
  path = client.unchroot(path)
  ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], client._state, path)

  # Last check to ignore watches if we've been stopped
  if client._stopped.is_set():
    return

  # Dump the watchers to the watch thread
  for watch in watchers:
    client.handler.dispatch_callback(Callback('watch', watch, (ev,)))

    
def dispatch_callback(self, callback):
  self.callback_queue.put(lambda: callback.func(*callback.args))

从这里我们就可以看到,将watcher放到了callback_queue队列中

取出队列

在实例化KazooClient的时候

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Record the handler strategy used
self.handler = handler if handler else SequentialThreadingHandler()
if inspect.isclass(self.handler):
  raise ConfigurationError("Handler must be an instance of a class, "
               "not the class: %s" % self.handler)

这里支持调用方自定义Handler并传入进来,默认采用SequentialThreadingHandler处理器

自动KazooClient的时候

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def start(self, timeout=15):
  """Initiate connection to ZK.

  :param timeout: Time in seconds to wait for connection to
          succeed.
  :raises: :attr:`~kazoo.interfaces.IHandler.timeout_exception`
       if the connection wasn't established within `timeout`
       seconds.

  """
  event = self.start_async()
  event.wait(timeout=timeout)
  if not self.connected:
    # We time-out, ensure we are disconnected
    self.stop()
    raise self.handler.timeout_exception("Connection time-out")

  if self.chroot and not self.exists("/"):
    warnings.warn("No chroot path exists, the chroot path "
            "should be created before normal use.")


def start_async(self):
  """Asynchronously initiate connection to ZK.

  :returns: An event object that can be checked to see if the
        connection is alive.
  :rtype: :class:`~threading.Event` compatible object.

  """
  # If we're already connected, ignore
  if self._live.is_set():
    return self._live

  # Make sure we're safely closed
  self._safe_close()

  # We've been asked to connect, clear the stop and our writer
  # thread indicator
  self._stopped.clear()
  self._writer_stopped.clear()

  # Start the handler
  self.handler.start()

  # Start the connection
  self._connection.start()
  return self._live

这里会调用handler的start方法,也就是SequentialThreadingHandler的start方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def start(self):
  """Start the worker threads."""
  with self._state_change:
    if self._running:
      return

    # Spawn our worker threads, we have
    # - A callback worker for watch events to be called
    # - A completion worker for completion events to be called
    for queue in (self.completion_queue, self.callback_queue):
      w = self._create_thread_worker(queue)
      self._workers.append(w)
    self._running = True
    python2atexit.register(self.stop)

这里我们就看到,会对队列中的每一个watcher回调方法,启动一个协程进行执行。

KazooRetry

当kazoo与zk服务端的请求中,出现以下几种异常时会一直重试

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
RETRY_EXCEPTIONS = (
        ConnectionLoss,
        OperationTimeoutError,
        ForceRetryError
)

EXPIRED_EXCEPTIONS = (
        SessionExpiredError,
)

其中,session过期异常时可选的重试,根据实例化的参数决定

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
self.retry_exceptions = self.RETRY_EXCEPTIONS
self.interrupt = interrupt
if ignore_expire:
  self.retry_exceptions += self.EXPIRED_EXCEPTIONS
  
def __call__(self, func, *args, **kwargs):
  """Call a function with arguments until it completes without
  throwing a Kazoo exception

  :param func: Function to call
  :param args: Positional arguments to call the function with
  :params kwargs: Keyword arguments to call the function with

  The function will be called until it doesn't throw one of the
  retryable exceptions (ConnectionLoss, OperationTimeout, or
  ForceRetryError), and optionally retrying on session
  expiration.

  """
  self.reset()

  while True:
    try:
      if self.deadline is not None and self._cur_stoptime is None:
        self._cur_stoptime = time.time() + self.deadline
      return func(*args, **kwargs)
    except ConnectionClosedError:
      raise
    except self.retry_exceptions:
      # Note: max_tries == -1 means infinite tries.
      if self._attempts == self.max_tries:
        raise RetryFailedError("Too many retry attempts")
      self._attempts += 1
      sleeptime = random.randint(0, 1 + int(self._cur_delay))

      if self._cur_stoptime is not None and \
         time.time() + sleeptime >= self._cur_stoptime:
        raise RetryFailedError("Exceeded retry deadline")

      if self.interrupt:
        while sleeptime > 0:
          # Break the time period down and sleep for no
          # longer than 0.1 before calling the interrupt
          if sleeptime < 0.1:
            self.sleep_func(sleeptime)
            sleeptime -= sleeptime
          else:
            self.sleep_func(0.1)
            sleeptime -= 0.1
          if self.interrupt():
            raise InterruptedError()
      else:
        self.sleep_func(sleeptime)
      self._cur_delay = min(self._cur_delay * self.backoff,
                  self.max_delay)

Lock

kazoo提供了zookeeper作为分布式锁的实现,也就是下面的Lock类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class Lock(object):
    """Kazoo Lock

    Example usage with a :class:`~kazoo.client.KazooClient` instance:

    .. code-block:: python

        zk = KazooClient()
        zk.start()
        lock = zk.Lock("/lockpath", "my-identifier")
        with lock:  # blocks waiting for lock acquisition
            # do something with the lock

    Note: This lock is not *re-entrant*. Repeated calls after already
    acquired will block.

    This is an exclusive lock. For a read/write lock, see :class:`WriteLock`
    and :class:`ReadLock`.

    """

AsyncResult

条件锁

条件同步机制是指:一个线程等待特定条件,而另一个线程发出特定条件满足的信号。解释条件同步机制的一个很好的例子就是生产者/消费者(producer/consumer)模型。生产者随机的往列表中“生产”一个随机整数,而消费者从列表中“消费”整数。

优先队列

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有文化的技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
Zookeeper客户端kazoo的watch流程详解
根据我在zk的通信协议中提到的,GetData请求的参数中如果watch为1,则表示客户端希望收到zk的数据监控回调 而这里就是带了watch=1.
tunsuy
2022/10/27
6960
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。
阿东
2023/09/02
6790
【Zookeeper】Apach Curator 框架源码分析:初始化过程(一)【Ver 4.3.0】
zookeeper使用详解(命令、客户端、源码)
  zookeeper我们常用来做分布式协调中间件,很多时候我们都接触不到它的原理和用法,我对他的了解也仅限于知道它可以做分布式协调、配置管理、分布式锁,并且有个watch节点监听常常能听到。接下来我要系统的学下zookeeper的功能和原理,一起走进zookeeper的世界
老梁
2019/09/10
2.5K0
zookeeper使用详解(命令、客户端、源码)
zookeeper源码分析(2)-客户端启动流程
客户端的入口,负责启动整个客户端。持有ClientCnxn和ZKWatchManager的实例,提供了客户端对节点操作的方法。
Monica2333
2020/06/22
9540
python codis集群客户端(二) - 基于zookeeper对实例创建与摘除
 在这一篇中我们实现了不通过zk来编写codis集群proxys的api, 如果codis集群暴露zk给你的话,那么就方便了,探活和故障摘除与恢复codis集群都给你搞定了,你只需要监听zookeeper中实例的状态就好了。 下面看我的实现。 1、CodisByZKPool.py 这里通过zk读取并初始化pool_shards,简单说一下如何故障摘除和恢复 1)我们监听zk中节点状态改变,当发现某个实例对应的节点状态变化了,比如DELETE了,那么我们认为这个实例挂了,我们就会重新_create_pool刷
用户1225216
2018/03/05
1.8K2
Python 基于python操纵zookeeper介绍
kazoo-2.6.1-py2.py3-none-any.whl(windows)
授客
2019/09/10
1.3K0
Python 基于python操纵zookeeper介绍
Kazoo Python Zookeeper 选主
本文讲述基于zookeeper选主与故障切换的方法。我们的例子使用的是python。 使用的库是kazoo,安装方式 pip install kazoo  应用场景: 多个实例部署,但不是“去中心化”的部署方式; 有且只有一个节点作为master,履行master的职责,在例子中是注册调度器; 其他实例作为slave,不提供调度功能,但是在master节点挂掉之后,可以重新进行选主调度。 1、注册调度器 我们只给出伪代码,简单的打印调度器注册结果。 # -*- coding:utf-8 -*- # 调度
用户1225216
2018/03/05
1.9K2
【Zookeeper】Apach Curator 框架源码分析:初始化过程
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。和ZK的原生客户端相比,Curator的抽象层次要更高,同时简化了ZK的常用功能开发量,比如Curator自带连接重试、反复注册Watcher、NodeExistsException 异常处理等等。
阿东
2023/07/10
1.5K0
【Zookeeper】Apach Curator 框架源码分析:初始化过程
深入分析 Watcher 机制的实现原理(三)客户端接收服务端处理完成的响应及事件触发
服 务 端 处 理 完 成 以 后 , 会 通 过NettyServerCnxn.sendResponse 发送返回的响应信息, 客户端会在 ClientCnxnSocketNetty.messageReceived 接收服务端的返回
周杰伦本人
2022/10/25
1.2K0
深入分析 Watcher 机制的实现原理(三)客户端接收服务端处理完成的响应及事件触发
深入浅出Zookeeper源码(四):Watch实现剖析
用过zookeeper的同学都知道watch是一个非常好用的机制,今天我们就来看看它的实现原理。
泊浮目
2023/12/25
2370
Zookeeper的通信协议详解
基于TCP/IP协议,zk实现了自己的通信协议来完成客户端与服务端,服务端与服务端之间的网络通信,zk的通信协议整体上的设计非常简单,
tunsuy
2022/10/27
1K0
zookeeper源码分析(9)-Curator相关介绍
zookeeper常用的Java客户端有三种:zookeeper原生的、Apache Curator、开源的zkclient。Curator官网上这么说
Monica2333
2020/06/22
2.4K0
Zookeeper Python调用
安装:pip install kazoo 1、链接 from kazoo.client import KazooClient
用户5760343
2019/10/29
7180
Zookeeper 通知更新可靠吗? 解读源码找答案!
遇到Keepper通知更新无法收到的问题,思考节点变更通知的可靠性,通过阅读源码解析了解到zk Watch的注册以及触发的机制,本地调试运行模拟zk更新的不可靠的场景以及得出相应的解决方案。
特鲁门
2018/07/17
3.4K12
Zookeeper入门(三)—使用CuratorFramework操作节点并添加监视器
在上一篇文章ZooKeeper入门(二)中笔者讲解了分布式协调中间件ZooKeeper的常用命令并使用Curator客户端实现了一个简单的配置中心功能。本文的目的就是带领读者朋友们一起学习如何在SpringBoot项目中使用Curator客户端对ZooKeeper节点进行简单的增删改查并对节点设置Watcher监视器等实践,让大家掌握使用Curator客户端对ZooKeeper进行基础的操作。
用户3587585
2022/11/22
3.7K0
深入分析 Watcher 机制的实现原理(一)客户端注册watcher
在创建一个 ZooKeeper 客户端对象实例时,我们通过new Watcher()向构造方法中传入一个默认的 Watcher, 这 个 Watcher 将作为整个 ZooKeeper 会话期间的默认Watcher,会一直被保存在客户端 ZKWatchManager 的defaultWatcher 中
周杰伦本人
2022/10/25
9930
深入分析 Watcher 机制的实现原理(一)客户端注册watcher
Python与ZooKeeper集群连接
由于项目的需要,需要学习Python客户端连接ZooKeeper集群,并实现创建临时节点、获得指定的路径下的信息、监听子节点变化的功能。
py3study
2020/01/08
1.8K0
Netty + ZooKeeper 实现简单的服务注册与发现
最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。
fengzhizi715
2019/06/17
1.4K0
Zookeeper详解(十):Pytho
关于Watcher,网上很多帖子都是通过装饰器的方式实现的,其实我上面的方式和装饰器是一样的,只是形式不同罢了。功能都能实现,只是用装饰器有时候会不方便。
py3study
2020/01/07
8750
进阶分布式系统架构系列:Zookeeper 监听机制
当今时代,发布订阅场景到处可见,像微信中的公众号消息订阅,或者网购场景下库存消息的订阅通知等等,这些都是属于发布订阅的场景。
民工哥
2023/08/25
4890
进阶分布式系统架构系列:Zookeeper 监听机制
推荐阅读
相关推荐
Zookeeper客户端kazoo的watch流程详解
更多 >
目录
  • 启动ZK连接
    • KazooClient类
    • 启动zk连接
    • 连接状态变更
  • 监听器
    • 增加listener
  • Watcher
    • DataWatcher
    • 增加watcher
    • 执行watcher
  • handler
    • 队列
      • 放入队列
      • 取出队列
  • KazooRetry
  • Lock
  • AsyncResult
    • 条件锁
  • 优先队列
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档