Zookeeper 是一个分布式的、开源的协调服务,用在分布式应用程序中。它提出了一组简单的原语,分布式应用程序可以基于这些原语之上构建更高层的分布式服务用于实现同步、配置管理、分组和命名等。Zookeeper 设计的容易进行编程,它使用一种类似于文件系统的目录树结构的数据模型,以 java 方式运行,有 java 和 c 的绑定(binding)。
分布式系统中的协调服务总所周知地难于正确实现,尤其容易产生诸如争用条件 (race conditions)、死锁(deadlock) 等错误。Zookeeper 背后的动机就是减轻分布式应用程序从头做起实现协调服务的难度。
Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,如下图所示:
Zookeeper 这种数据结构有如下这些特点:
简洁的API Zookeeper 的设计目标之一就是提供简单的编程接口。于是,它只提供了以下的操作:
如小米公司的米聊,其后台就采用了ZooKeeper作为分布式服务的统一协作系统。而阿里公司的开发人员也广泛使用ZooKeeper,并对其进行了适当修改,开源了一款TaoKeeper软件,以适应自身业务需要。另外还包括Apache HBase、Apache Kafka、Facebook Message等产品也都使用了ZooKeeper。
那么当Node.js应用作为整个异构分布式系统中的一环,需要作为客户端去操作ZooKeeper Server上的znode时,应该如何实现? 说实话,上文介绍了这么多ZooKeeper的原理,其实作为客户端只需要单纯的把znode作为文件来操作就好,并且可以监听znode的改变,十分方便。本文只描述怎样使用Node.js实现ZooKeeper客户端角色。
node-zookeeper是ZooKeeper的一个Node.js客户端实现,这个模块是基于ZooKeeper原生提供的C API来实现的。
下载 npm install zookeeper
栗子
var ZooKeeper = require ("zookeeper");
var zk = new ZooKeeper({
connect: "localhost:8888" // zk server的服务器地址和监听的端口号,timeout: 200000 // 以毫秒为单位
,debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARN
,host_order_deterministic: false
});
zk.connect(function (err) {
if(err) throw err;
console.log ("zk session established, id=%s", zk.client_id);
zk.a_create ("/node.js1", "some value", ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path) {
if (rc != 0) {
console.log ("zk node create result: %d, error: '%s', path=%s", rc, error, path);
} else {
console.log ("created zk node %s", path);
process.nextTick(function () {
zk.close ();
});
}
});
});
其中:
ZOO_LOG_LEVEL_ERROR
, ZOO_LOG_LEVEL_WARN
, ZOO_LOG_LEVEL_INFO
, ZOO_LOG_LEVEL_DEBUG
常见API:
'use strict'
const ZooKeeper = require('zookeeper');
const logger = require('../logger/index.js'); // 打日志的工具
const Promise = require('bluebird');
const _ = require('lodash');
let node_env = process.env.NODE_ENV ? process.env.NODE_ENV: 'development';
let connect = node_env === 'development' ? 'zktest.imweb.com:8888' : 'zk.imweb.oa.com:8888';
let timeout = 200000; // 单位毫秒
let path = node_env === 'development' ? '/zk_test/blackList' : '/zk/blackList';
let debug_level = ZooKeeper.ZOO_LOG_LEVEL_WARN;
let host_order_deterministic = false;
let defaultInitOpt = {
connect,
timeout,
debug_level,
host_order_deterministic
};
class ZK {
constructor(opt) {
this.opt = opt;
this._initZook();
}
_initZook() {
this.zookeeper = new ZooKeeper(this.opt.initOpt || defaultInitOpt);
}
/**
* [get zookeeper blackList]
* @return {[type]} [description]
*/
get() {
return new Promise((resolve, reject) => {
let self = this;
self.zookeeper.connect(function(error) {
if (error) {
reject(error);
return;
}
console.log('zk session established, id=%s', self.zookeeper.client_id);
self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
if (rc !== 0) {
console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
reject(err);
} else {
logger.info('get zk node: ' + data)
resolve(data);
}
process.nextTick(() => {self.zookeeper.close();});
})
});
});
}
/**
* [set zookeeper black_list]
* @param {object} opt:
* {
* 380533076: {
* "anchor_uin": 380533076,
* "expired_time": 1462876279
* },
* 380533077: {
* "anchor_uin": 380533077,
* "expired_time": 1462876279
* },
* }
*/
set(opt) {
let zkData = null;
let self = this;
return new Promise((resolve, reject) => {
self.zookeeper.connect(function(err) {
if (err) {
reject(err);
return;
}
console.log('zk session established, id=%s', self.zookeeper.client_id);
self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
if (rc !== 0) {
console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
reject(error);
} else {
console.log('get zk node %s', data);
console.log('stat: ', stat);
console.log('data: ', typeof data);
try {
zkData = JSON.parse(data);
} catch (e) {
reject(e);
return;
}
zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
_.extend(zkData.data, opt);
let currVersion = stat.version;
try {
zkData = JSON.stringify(zkData);
} catch (e) {
reject(e);
return;
}
self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
if (rc !== 0) {
console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
reject(error);
} else {
logger.info('set zk node succ!');
resolve(stat);
}
process.nextTick(function() {
self.zookeeper.close();
});
})
}
})
});
});
}
/**
* [delete zookeeper znode]
* @param {array} keys [要删除的黑名单的QQ号]
* @return {[type]} [description]
*/
delete(keys) {
let zkData = null;
let self = this;
return new Promise((resolve, reject) => {
self.zookeeper.connect(function(err) {
if (err) {
reject(err);
return;
}
console.log('zk session established, id=%s', self.zookeeper.client_id);
self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
if (rc !== 0) {
console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
reject(error);
} else {
console.log('get zk node %s', data);
console.log('stat: ', stat);
console.log('data: ', typeof data);
try {
zkData = JSON.parse(data);
} catch (e) {
reject(e);
return;
}
zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
for (let key of keys) {
delete zkData.data[key];
}
let currVersion = stat.version; // 只对这个znode被读取时的这个ersion,否则会抛错。
try {
zkData = JSON.stringify(zkData);
} catch (e) {
reject(e);
return;
}
self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
if (rc !== 0) {
console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
reject(error);
} else {
logger.info('set zk node succ!');
resolve(stat);
}
process.nextTick(function() {
self.zookeeper.close();
});
})
}
})
});
})
}
/**
* [add description]
* @param {[type]} opt [description]
*/
add(opt) {
// zookeeper只能以覆盖的方式set
return this.set(opt);
}
clear() {
let zkData = null;
let self = this;
return new Promise((resolve, reject) => {
self.zookeeper.connect(function(err) {
if (err) {
reject(err);
return;
}
console.log('zk session established, id=%s', self.zookeeper.client_id);
self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
if (rc !== 0) {
console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
reject(error);
} else {
console.log('stat: ', stat);
zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
zkData.data = '';
let currVersion = stat.version;
try {
zkData = JSON.stringify(zkData);
} catch (e) {
reject(e);
return;
}
self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
if (rc !== 0) {
console.log('zk node clear result: %d, error: "%s", stat=%s', rc, error, stat);
reject(error);
} else {
logger.info('clear zk node succ!');
resolve(stat);
}
process.nextTick(function() {
self.zookeeper.close();
});
})
}
})
});
});
}
}
module.exports = ZK;