|
|
|
@ -41,6 +41,7 @@ class MongoPipeline(MongoDBUtils):
|
|
|
|
super().__init__(mongo_uri, mongo_db)
|
|
|
|
super().__init__(mongo_uri, mongo_db)
|
|
|
|
self.stats: StatsCollector = stats
|
|
|
|
self.stats: StatsCollector = stats
|
|
|
|
self.insert_failure_update_enable = True
|
|
|
|
self.insert_failure_update_enable = True
|
|
|
|
|
|
|
|
self.duplicate_cover_enable = False # 重复项覆盖
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@classmethod
|
|
|
|
def from_crawler(cls, crawler: Crawler):
|
|
|
|
def from_crawler(cls, crawler: Crawler):
|
|
|
|
@ -71,7 +72,8 @@ class MongoPipeline(MongoDBUtils):
|
|
|
|
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
|
|
|
|
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
|
|
|
|
d.pop("_id", None)
|
|
|
|
d.pop("_id", None)
|
|
|
|
[d.pop(k, None) for k in key_pattern.keys()]
|
|
|
|
[d.pop(k, None) for k in key_pattern.keys()]
|
|
|
|
up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True)
|
|
|
|
update_q = build_update_query(d, replace=self.duplicate_cover_enable)
|
|
|
|
|
|
|
|
up_result = collection.update_one(filter=key_value, update=update_q, upsert=True)
|
|
|
|
self.stats.inc_value("item2db_updated/{}".format(item_type))
|
|
|
|
self.stats.inc_value("item2db_updated/{}".format(item_type))
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
|
raise
|
|
|
|
raise
|
|
|
|
|