from __future__ import annotations import logging from typing import TYPE_CHECKING, Optional, Dict, Tuple from pymongo import MongoClient from pymongo import UpdateOne from pymongo.errors import DuplicateKeyError, BulkWriteError if TYPE_CHECKING: from pymongo.database import Database from pymongo.collection import Collection from pymongo.results import InsertManyResult, BulkWriteResult def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]: update_query = {} if not update_data: return {}, {} for key, val in update_data.items(): if replace: update_query.setdefault( "$set", {} ).update( {key: val} ) else: if isinstance(val, list): update_query.setdefault( "$addToSet", {} ).update({ key: {"$each": val} }) else: update_query.setdefault( "$set", {} ).update( {key: val} ) return filter_query, update_query class MongoDBUtils: def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db self.client: MongoClient = None self.db: Database = None def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult: collection: Collection = self.db.get_collection(tablename) result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs) return result def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult: collection: Collection = self.db.get_collection(tablename) bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs) return bulk_results