我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?
是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?
这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。
我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?
是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?
这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。
当前回答
许多这些答案只会给你新的记录,而不是更新和/或非常低效
要做到这一点,唯一可靠、高效的方法是在本地db: oplog上创建一个可尾游标。rs集合得到所有的变化到MongoDB和做什么,你会。(MongoDB甚至在内部或多或少地支持复制!)
oplog包含内容的解释: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/
一个Node.js库的例子,它提供了一个关于oplog可以做什么的API: https://github.com/cayasso/mongo-oplog
其他回答
这里有一个工作的java示例。
MongoClient mongoClient = new MongoClient();
DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");
DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
System.out.println("== open cursor ==");
Runnable task = () -> {
System.out.println("\tWaiting for events");
while (cur.hasNext()) {
DBObject obj = cur.next();
System.out.println( obj );
}
};
new Thread(task).start();
键是这里给出的QUERY OPTIONS。
你也可以改变查找查询,如果你不需要每次都加载所有的数据。
BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation
DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
你的想法听起来很像触发器。MongoDB不支持任何触发器,但是有些人已经使用一些技巧“滚出了自己的”触发器。这里的关键是oplog。
当您在Replica Set中运行MongoDB时,所有的MongoDB操作都会被记录到一个操作日志(称为oplog)。oplog基本上只是对数据所做修改的一个运行列表。Replicas Sets的功能是监听这个oplog上的更改,然后在本地应用这些更改。
这听起来熟悉吗?
我不能在这里详细说明整个过程,这是几页的文档,但你需要的工具是可用的。
首先是对oplog的一些评论 -简要描述 -本地集合的布局(其中包含oplog)
您还需要利用可尾游标。这将为您提供一种侦听更改的方法,而不是轮询更改。注意,复制使用可尾游标,因此这是一个受支持的特性。
实际上,与其观察输出,不如使用mongoose schema提供的中间件在插入新内容时注意到它
您可以捕捉插入新文档的事件,并在插入完成后执行一些操作
MongoDB 3.6版现在包含了变更流,它本质上是OpLog之上的API,允许触发/通知类用例。
下面是一个Java示例的链接: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/
一个NodeJS的例子可能是这样的:
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
.then(function(client){
let db = client.db('MyStore')
let change_streams = db.collection('products').watch()
change_streams.on('change', function(change){
console.log(JSON.stringify(change));
});
});
许多这些答案只会给你新的记录,而不是更新和/或非常低效
要做到这一点,唯一可靠、高效的方法是在本地db: oplog上创建一个可尾游标。rs集合得到所有的变化到MongoDB和做什么,你会。(MongoDB甚至在内部或多或少地支持复制!)
oplog包含内容的解释: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/
一个Node.js库的例子,它提供了一个关于oplog可以做什么的API: https://github.com/cayasso/mongo-oplog