测试环境
系统 | 版本 |
---|---|
centos | 7 |
mongodb | 4.4.6 |
php | 7.4 |
生成认证文件
openssl rand -base64 756 > mongo.key
#修改mongo.key文件的权限
chmod 600 mongo.key
启动容器
docker run -it -d --name mongo-relicat-set -p 27017:27017 -v `pwd`/mongo.key:/mongo.key --restart always -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=Admin@123 mongo:4.4.6 mongod --replSet rs0 --keyFile /mongo.key
控制台1
$ mongo # 进入mongo控制台
rs0:PRIMARY> use admin # 使用admin数据库
switched to db admin
rs0:PRIMARY> db.auth('root','Admin@123') # 用户名密码认证
1 # 1表示认证成功
rs0:PRIMARY> use test # 切换到test数据库
switched to db test
rs0:PRIMARY> db.watch([],{maxAwaitTimeMS:60000}) # 监听当前数据库test的所有变动
控制台2
$ mongo # 进入mongo控制台
rs0:PRIMARY> use admin # 使用admin数据库
switched to db admin
rs0:PRIMARY> db.auth('root','Admin@123') # 用户名密码认证
1 # 1表示认证成功
rs0:PRIMARY> use test # 切换到test数据库
switched to db test
rs0:PRIMARY> db.users.insertMany(
[
{ username:"user01", name:"1"},
{ username:"user02", name:"2"}
]
)
# 结果如下,成功新增两条数据
{
"acknowledged" : true,
"insertedIds" : [
ObjectId("6319a9203df851bacd5446b6"),
ObjectId("6319a9203df851bacd5446b7")
]
}
此时控制台1的效果如下
rs0:PRIMARY> db.watch([],{maxAwaitTimeMS:30000})
{ "_id" : { "_data" : "826319AACA000000012B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AACA2A72408C1964CAD40004" }, "operationType" : "insert", "clusterTime" : Timestamp(1662626506, 1), "fullDocument" : { "_id" : ObjectId("6319aaca2a72408c1964cad4"), "username" : "user01", "name" : "1" }, "ns" : { "db" : "test", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("6319aaca2a72408c1964cad4") } }
{ "_id" : { "_data" : "826319AACA000000022B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AACA2A72408C1964CAD50004" }, "operationType" : "insert", "clusterTime" : Timestamp(1662626506, 2), "fullDocument" : { "_id" : ObjectId("6319aaca2a72408c1964cad5"), "username" : "user02", "name" : "2" }, "ns" : { "db" : "test", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("6319aaca2a72408c1964cad5") } }
安装扩展
pecl install mongodb
在php.ini文件中添加如下内容
extension=mongodb.so
可用php -m
检查配置是否生效
composer安装 mongodb/mongodb 库
composer require mongodb/mongodb
示例代码:
change_stream_demo01.php
<?php
include_once 'vendor/autoload.php';
$client = new MongoDB\Client(
'mongodb://192.168.1.100:27017',
[
'username' => 'root',
'password' => 'Admin@123',
'authSource' => 'admin',
]
);
$options = [
'maxAwaitTimeMS' => 30000,
'fullDocument' => 'updateLookup'
];
//最后处理的一条记录,如果有记录,从记录的位置开始
if ($lastContent = @file_get_contents('last.txt')) {
$options['resumeAfter'] = new MongoDB\Model\BSONDocument(json_decode($lastContent, true));
}
//$client->selectCollection('watch','users')->watch([],$options); //监听指定集合的变动
//监听数据库的变动
$changeStream = $client->selectDatabase('watch')
->watch([], $options);
while (true) {
$changeStream->next();//如果没有变动数据,阻塞进程,最长时间为 maxAwaitTimeMS 设置的值
if (!$changeStream->valid()) {
continue;
}
$row = $changeStream->current();
var_dump(json_encode($row));
$item = json_decode(json_encode($row), true);
//保存处理成功的最后一条日志
file_put_contents('last.txt', json_encode($item['_id']));
}
运行php change_stream_demo01.php
后,在指定集合中新增两条数据,效果如下:
$ php change_stream_demo01.php
string(391) "{"_id":{"_data":"826319AC79000000012B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AC792A72408C1964CAD60004"},"operationType":"insert","clusterTime":{"$timestamp":{"t":1662626937,"i":1}},"fullDocument":{"_id":{"$oid":"6319ac792a72408c1964cad6"},"username":"user01","name":"1"},"ns":{"db":"test","coll":"users"},"documentKey":{"_id":{"$oid":"6319ac792a72408c1964cad6"}}}"
string(391) "{"_id":{"_data":"826319AC79000000022B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AC792A72408C1964CAD70004"},"operationType":"insert","clusterTime":{"$timestamp":{"t":1662626937,"i":2}},"fullDocument":{"_id":{"$oid":"6319ac792a72408c1964cad7"},"username":"user02","name":"2"},"ns":{"db":"test","coll":"users"},"documentKey":{"_id":{"$oid":"6319ac792a72408c1964cad7"}}}"
参考链接
MongoDB 4.2 内核解析 - Change Stream
MongoDB Change Stream之一——上手及初体验 - 腾讯云开发者社区-腾讯云
db.watch() 官网说明
MongoDB PHP库