From 1c2fd3c98805dae5e0252995d305b106945dce9a Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Wed, 5 Nov 2025 16:38:07 +0800 Subject: [PATCH] add:mongo util --- .../science_article_add/db_utils/__init__.py | 0 .../science_article_add/db_utils/mongo.py | 57 +++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 science_article_add/science_article_add/db_utils/__init__.py create mode 100644 science_article_add/science_article_add/db_utils/mongo.py diff --git a/science_article_add/science_article_add/db_utils/__init__.py b/science_article_add/science_article_add/db_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/science_article_add/science_article_add/db_utils/mongo.py b/science_article_add/science_article_add/db_utils/mongo.py new file mode 100644 index 0000000..b94207d --- /dev/null +++ b/science_article_add/science_article_add/db_utils/mongo.py @@ -0,0 +1,57 @@ +from __future__ import annotations +import logging +from typing import TYPE_CHECKING, Optional, Dict, Tuple +from pymongo import MongoClient +from pymongo import UpdateOne +from pymongo.errors import DuplicateKeyError, BulkWriteError + +if TYPE_CHECKING: + from pymongo.database import Database + from pymongo.collection import Collection + from pymongo.results import InsertManyResult, BulkWriteResult + + +def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]: + update_query = {} + if not update_data: + return {}, {} + + for key, val in update_data.items(): + if replace: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + else: + if isinstance(val, list): + update_query.setdefault( + "$addToSet", {} + ).update({ + key: {"$each": val} + }) + else: + update_query.setdefault( + "$set", {} + ).update( + {key: val} + ) + return filter_query, update_query + + +class MongoDBUtils: + def __init__(self, mongo_uri, mongo_db): + self.mongo_uri = mongo_uri + self.mongo_db = mongo_db + self.client: MongoClient = None + self.db: Database = None + + def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult: + collection: Collection = self.db.get_collection(tablename) + result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs) + return result + + def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult: + collection: Collection = self.db.get_collection(tablename) + bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs) + return bulk_results