add:mongo util
parent
129ab6569d
commit
1c2fd3c988
@ -0,0 +1,57 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
import logging
|
||||||
|
from typing import TYPE_CHECKING, Optional, Dict, Tuple
|
||||||
|
from pymongo import MongoClient
|
||||||
|
from pymongo import UpdateOne
|
||||||
|
from pymongo.errors import DuplicateKeyError, BulkWriteError
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pymongo.database import Database
|
||||||
|
from pymongo.collection import Collection
|
||||||
|
from pymongo.results import InsertManyResult, BulkWriteResult
|
||||||
|
|
||||||
|
|
||||||
|
def update_document(filter_query: dict = None, update_data: dict = None, replace: bool = True) -> Tuple[dict, dict]:
|
||||||
|
update_query = {}
|
||||||
|
if not update_data:
|
||||||
|
return {}, {}
|
||||||
|
|
||||||
|
for key, val in update_data.items():
|
||||||
|
if replace:
|
||||||
|
update_query.setdefault(
|
||||||
|
"$set", {}
|
||||||
|
).update(
|
||||||
|
{key: val}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if isinstance(val, list):
|
||||||
|
update_query.setdefault(
|
||||||
|
"$addToSet", {}
|
||||||
|
).update({
|
||||||
|
key: {"$each": val}
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
update_query.setdefault(
|
||||||
|
"$set", {}
|
||||||
|
).update(
|
||||||
|
{key: val}
|
||||||
|
)
|
||||||
|
return filter_query, update_query
|
||||||
|
|
||||||
|
|
||||||
|
class MongoDBUtils:
|
||||||
|
def __init__(self, mongo_uri, mongo_db):
|
||||||
|
self.mongo_uri = mongo_uri
|
||||||
|
self.mongo_db = mongo_db
|
||||||
|
self.client: MongoClient = None
|
||||||
|
self.db: Database = None
|
||||||
|
|
||||||
|
def _insert2db(self, items, tablename, ordered: bool = False, **kwargs) -> InsertManyResult:
|
||||||
|
collection: Collection = self.db.get_collection(tablename)
|
||||||
|
result: InsertManyResult = collection.insert_many(items, ordered=ordered, **kwargs)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _update2db(self, items, tablename, ordered: bool = False, **kwargs) -> BulkWriteResult:
|
||||||
|
collection: Collection = self.db.get_collection(tablename)
|
||||||
|
bulk_results: BulkWriteResult = collection.bulk_write(items, ordered=ordered, **kwargs)
|
||||||
|
return bulk_results
|
||||||
Loading…
Reference in New Issue