我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?
是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?
这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。
我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?
是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?
这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。
当前回答
有一套很棒的服务叫做MongoDB Stitch。查看stitch函数/触发器。注意,这是一个基于云的付费服务(AWS)。在您的例子中,对于插入,您可以调用用javascript编写的自定义函数。
其他回答
在3.6允许使用数据库之后,以下数据库触发类型:
事件驱动触发器——用于自动更新相关文档、通知下游服务、传播数据以支持混合工作负载、数据完整性和审计 计划触发器——对于计划数据检索、传播、归档和分析工作负载非常有用
登录到您的Atlas帐户,选择触发器界面,并添加新的触发器:
展开每个部分以了解更多设置或详细信息。
看看这个:改变流
2018年1月10日——3.6版
*编辑:我写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76
https://docs.mongodb.com/v3.6/changeStreams/
这是mongodb 3.6中的新功能 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10
$ mongod --version
db version v3.6.2
为了使用changeStreams,数据库必须是一个复制集
关于复制集的更多信息: https://docs.mongodb.com/manual/replication/
默认情况下,数据库将是“独立的”。
如何将一个独立的副本集:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/
下面的示例是一个实际应用程序,说明如何使用它。 *特别针对Node。
/* file.js */
'use strict'
module.exports = function (
app,
io,
User // Collection Name
) {
// SET WATCH ON COLLECTION
const changeStream = User.watch();
// Socket Connection
io.on('connection', function (socket) {
console.log('Connection!');
// USERS - Change
changeStream.on('change', function(change) {
console.log('COLLECTION CHANGED');
User.find({}, (err, data) => {
if (err) throw err;
if (data) {
// RESEND ALL USERS
socket.emit('users', data);
}
});
});
});
};
/* END - file.js */
有用的链接: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set https://docs.mongodb.com/manual/tutorial/change-streams-example
https://docs.mongodb.com/v3.6/tutorial/change-streams-example http://plusnconsulting.com/post/MongoDB-Change-Streams
或者,你可以使用标准的Mongo findupdate方法,在回调中,在回调运行时触发EventEmitter事件(在Node中)。
应用程序或体系结构中侦听此事件的任何其他部分都将收到更新通知,并将任何相关数据发送到那里。这是实现Mongo通知的一个非常简单的方法。
MongoDB有所谓的上限集合和可尾游标,允许MongoDB将数据推送到侦听器。
有上限的集合本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:
db.createCollection("messages", { capped: true, size: 100000000 })
MongoDB可Tailable游标(原文由Jonathan H. Wage撰写)
Ruby
coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
if doc = cursor.next_document
puts doc
else
sleep 1
end
end
PHP
$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
print_r($doc);
} else {
sleep(1);
}
}
Python(作者:罗伯特·斯图尔特)
from pymongo import Connection
import time
db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
try:
doc = cursor.next()
print doc
except StopIteration:
time.sleep(1)
Perl (by Max)
use 5.010;
use strict;
use warnings;
use MongoDB;
my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
if (defined(my $doc = $cursor->next))
{
say $doc;
}
else
{
sleep 1;
}
}
额外的资源:
Ruby/Node.js教程,指导您创建一个应用程序,侦听MongoDB capped collection中的插入。
一篇详细讨论可尾标的文章。
PHP、Ruby、Python和Perl使用可尾游标的示例。
有一套很棒的服务叫做MongoDB Stitch。查看stitch函数/触发器。注意,这是一个基于云的付费服务(AWS)。在您的例子中,对于插入,您可以调用用javascript编写的自定义函数。