mongodb使用docker搭建replicaSet集群与变更监听(最新推荐)(mongodb python)奔走相告

随心笔谈2年前发布 admin
205 0 0

文章摘要

该文章主要介绍了使用MongoDB实现对特定集合(`myTestCollection`)的监控功能。文章详细描述了如何通过`MongoDB`的异步操作(使用`CompletableFuture`)监控集合中的数据变更,包括插入(`insert`)、更新(`update`)、删除(`delete`)等操作。代码中使用了`changeStream`来获取变化流,并通过`FullDocument`和`FullDocumentBeforeChange`来记录每次修改前后的文档。此外,文章还包含了一个数据变更的测试部分,演示了如何手动向集合中添加、更新和删除文档,并通过日志打印监控结果。代码还提到了如何在集合中启用或禁用`changeStreamPreAndPostImages`功能,以确保监控功能的正确性。

package io.github.puhaiyang;

import com.google.common.collect.Lists;
import com.mongodb.client.*;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;

public class MongodbWatchTestMain {

public static void main(String[] args) throws Exception {
String uri=”mongodb://admin:123456@192.168.1.11:27017,192.168.1.12:27017/?replicaSet=haiyangReplset”;
MongoClient mongoClient=MongoClients.create(uri);
MongoDatabase mongoDatabase=mongoClient.getDatabase(“my-test-db”);
String myTestCollectionName=”myTestCollection”;
//获取出collection
MongoCollection<Document> mongoCollection=initCollection(mongoDatabase, myTestCollectionName);
//进行watch
CompletableFuture.runAsync(() -> {
while (true) {
List<Bson> pipeline=Lists.newArrayList(
Aggregates.match(Filters.in(“ns.coll”, myTestCollectionName)),
Aggregates.match(Filters.in(“operationType”, Arrays.asList(“insert”, “update”, “replace”, “delete”)))
);
ChangeStreamIterable<Document> changeStream=mongoDatabase.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);

changeStream.forEach(event -> {
String collectionName=Objects.requireNonNull(event.getNamespace()).getCollectionName();
System.out.println(“——–> event:” + event.toString());
});
}
});

//数据变更测试
{
Thread.sleep(3_000);
InsertOneResult insertResult=mongoCollection.insertOne(new Document(“test”, “sample movie document”));
System.out.println(“Success! Inserted document id: ” + insertResult.getInsertedId());
UpdateResult updateResult=mongoCollection.updateOne(new Document(“test”, “sample movie document”), Updates.set(“field2”, “sample movie document update”));
System.out.println(“Updated ” + updateResult.getModifiedCount() + ” document.”);
DeleteResult deleteResult=mongoCollection.deleteOne(new Document(“field2”, “sample movie document update”));
System.out.println(“Deleted ” + deleteResult.getDeletedCount() + ” document.”);
}

new Scanner(System.in).next();
}

private static MongoCollection<Document> initCollection(MongoDatabase mongoDatabase, String myTestCollectionName) {
ArrayList<Document> existsCollections=mongoDatabase.listCollections().into(new ArrayList<>());
Optional<Document> existsCollInfoOpl=existsCollections.stream().filter(doc -> StringUtils.equals(myTestCollectionName, doc.getString(“name”))).findFirst();
existsCollInfoOpl.ifPresent(collInfo -> {
//确保开启了changeStreamPreAndPost
Document changeStreamPreAndPostImagesEnable=collInfo.get(“options”, Document.class).get(“changeStreamPreAndPostImages”, Document.class);
if (changeStreamPreAndPostImagesEnable !=null && !changeStreamPreAndPostImagesEnable.getBoolean(“enabled”)) {
Document mod=new Document();
mod.put(“collMod”, myTestCollectionName);
mod.put(“changeStreamPreAndPostImages”, new Document(“enabled”, true));
mongoDatabase.runCommand(mod);
}
});
if (!existsCollInfoOpl.isPresent()) {
CreateCollectionOptions collectionOptions=new CreateCollectionOptions();
//创建collection时开启ChangeStreamPreAndPostImages
collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
mongoDatabase.createCollection(myTestCollectionName, collectionOptions);
}
return mongoDatabase.getCollection(myTestCollectionName);
}
}

© 版权声明

相关文章