使用MongoDB创建Outgoing-Webhook,以便将数据高效地发送到ElasticSearch
首先ElasticSearch(+ Kibana)在数据可视化方面,特别是日志分析领域非常受欢迎。
然而,仅仅可视化日志是不够的。
我们也希望用ElasticSearch(+ Kibana)时髦地将保存在MongoDB中的所有数据进行可视化。
现在许多人可能会使用River插件将Mongo-ES(Kibana)进行连接,但遗憾的是,从ElasticSearch2.0开始,这个插件已经被废弃了。
所以,我们将向您介绍MongoDB – ElasticSearch的整合,它充分利用了MongoDB的特点,使其更简单、更简洁,并实时且高效地运行。

蒙哥DB决定因素是 OpLog。 shì OpLog.)
操作日志就像Oracle的REDO日志和MySQL的二进制日志一样,MongoDB也有一个称为OpLog的事务日志。
与其他日志不同的是,MongoDB的OpLog记录的是“由操作引起的数据更新(差异)历史记录”。
与其他日志不同的是,MongoDB的OpLog记录的是“由操作引起的数据更新(差异)历史记录”。
总之,无论操作了多少数据,只有实际发生了更新(差异)的地方才会被记录在操作日志中。
这使得使用起来非常方便!
OpLog的設置OpLog虽然非常方便,但在安装后默认为关闭状态。由于是为了复制而设定的,因此需要设置复制集,但即使只有一台配置也可以。
设置步骤
编辑mongod.conf文件。副本名称设为repltest。
replSet=repltest
我們將從Mongo Shell中進行副本設定。
$ mongo
> config = {_id: 'repltest', members: [
{_id: 0, host: '127.0.0.1:27017'}]
};
> rs.initiate(config);
> exit
重启。如果顺利的话,Mongo Shell的提示符将会改变。
$ mongo
repltest:PRIMARY>
由于OpLog存储在本地数据库中,我们将确认一下。
repltest:PRIMARY> use local
repltest:PRIMARY> show collections
me
oplog.rs ← ありました
startup_log
system.indexes
system.replset
如果本地安装有MongoDB和ElasticSearch的话。代码全部
https://github.com/exabugs/mongo-es
node index.js
- 编辑mongod.conf文件。副本名称设为repltest。
replSet=repltest
- 我們將從Mongo Shell中進行副本設定。
$ mongo
> config = {_id: 'repltest', members: [
{_id: 0, host: '127.0.0.1:27017'}]
};
> rs.initiate(config);
> exit
- 重启。如果顺利的话,Mongo Shell的提示符将会改变。
$ mongo
repltest:PRIMARY>
- 由于OpLog存储在本地数据库中,我们将确认一下。
repltest:PRIMARY> use local
repltest:PRIMARY> show collections
me
oplog.rs ← ありました
startup_log
system.indexes
system.replset
如果本地安装有MongoDB和ElasticSearch的话。代码全部
https://github.com/exabugs/mongo-es
node index.js
node index.js
监视在端口27017上运行的MongoDB的所有集合,并在有更新时通知在端口9200上运行的ElasticSearch。
索引名称为”MongoDB的数据库名.集合名”。

重要的要点 de
MongoDB の OpLog監視部分が以下になります。
– tailable オプションでカーソルを開きます。
– 接続が切れても、err.tailable が true なら、引き続き監視可能です。
– 本当に切れた場合は、find からやり直します。
– 更新データが無い場合の待ち時間は1秒にしています。
function loop(oplog, ts, callback) {
var condition = {ts: {$gt: ts}};
var option = {tailable: true};
oplog.find(condition, option, function (err, cursor) {
function processItem(err, op) {
if (op) {
// 更新処理
update(oplog.s.db, op, function (err) {
if (err) {
log(err.message);
setTimeout(function () {
loop(oplog, ts, callback);
}, WAIT);
} else {
log("Update ElasticSearch");
ts = op.ts;
fs.writeFileSync(posfile, ts); // どこまで処理したか記憶する
setImmediate(function () {
cursor.next(processItem);
});
}
});
} else if (err && err.tailable) {
// tailable=true なら引き続き監視可能
log(err.message);
setTimeout(function () {
cursor.next(processItem);
}, WAIT);
} else {
// 本当に切れた場合(MongoDB再起動等)は、findからやり直し
log(err.message);
setTimeout(function () {
loop(oplog, ts, callback);
}, WAIT);
}
}
cursor.next(processItem);
});
}
function loop(oplog, ts, callback) {
var condition = {ts: {$gt: ts}};
var option = {tailable: true};
oplog.find(condition, option, function (err, cursor) {
function processItem(err, op) {
if (op) {
// 更新処理
update(oplog.s.db, op, function (err) {
if (err) {
log(err.message);
setTimeout(function () {
loop(oplog, ts, callback);
}, WAIT);
} else {
log("Update ElasticSearch");
ts = op.ts;
fs.writeFileSync(posfile, ts); // どこまで処理したか記憶する
setImmediate(function () {
cursor.next(processItem);
});
}
});
} else if (err && err.tailable) {
// tailable=true なら引き続き監視可能
log(err.message);
setTimeout(function () {
cursor.next(processItem);
}, WAIT);
} else {
// 本当に切れた場合(MongoDB再起動等)は、findからやり直し
log(err.message);
setTimeout(function () {
loop(oplog, ts, callback);
}, WAIT);
}
}
cursor.next(processItem);
});
}
ElasticSearch を更新する部分が以下になります。
– 簡単のために、フィールド単位での差分更新はしていません。
– MongoDBの1コレクションはElasticSearchの1インデックスに対応。
– typeは一律で’default’という名前にしています。
function update(db, op, callback) {
var tags = op.ns.split(/^([^.]+)\./);
var coll = db.db(tags[1]).collection(tags[2]);
var _id = op.o._id || op.o2._id;
async.waterfall([
function (next) {
if (op.op === "d") {
next(null, _id, "DELETE", null);
} else {
coll.findOne({_id: _id}, function (err, obj) {
obj && (delete obj._id);
next(err, _id, "PUT", obj);
});
}
},
function (_id, method, obj, next) {
var options = {
uri: [esearch_url, op.ns, "default", _id].join("/"),
method: method,
json: obj
};
request(options, function (err) {
next(err);
});
}
], function (err) {
callback(err);
});
}
总结
-
MongoDB の OpLog は、変更の差分だけを伝えてくれるので、処理に無駄がありません。
- MongoDB の OpLog は、変更の差分だけを伝えてくれるので、処理に無駄がありません。
-
- OpLog を使って、MongoDB の全コレクションを ElasticSearch に、更新時リアルタイムに反映させています。
-
- ログだけでなく、データそのものがカジュアルに可視化できるのは便利で気持ち良いです。
- OpLog のフックは、トリガー、または、Outgoing-Webhook として応用できます。
请留意
-
OpLog は Capped-Collection (サイズが固定) です。常に先端を追っていれば問題ありませんが、何日も前のログは、なくなっている可能性があります。
- OpLog は Capped-Collection (サイズが固定) です。常に先端を追っていれば問題ありませんが、何日も前のログは、なくなっている可能性があります。
- 本プログラムでは、OpLogの処理ポインタをローカルファイルに記憶するようにしています。nodeやMongoDBの障害/再起動には耐えますが、マスタが切り替わった場合の処理を省いています。マスタを判断(localDBに接続できること)し、処理ポインタはネットワーク上のファイル、あるいは、MongoDBのコレクションに記憶する必要があります。