|
|
|
@ -19,7 +19,7 @@ from pymongo.errors import (
|
|
|
|
DuplicateKeyError,
|
|
|
|
DuplicateKeyError,
|
|
|
|
BulkWriteError
|
|
|
|
BulkWriteError
|
|
|
|
)
|
|
|
|
)
|
|
|
|
from science_article_wos.items import WosIdRelationItem, WosArticleTodoIdItem, WosCitedNumberItem
|
|
|
|
from science_article_wos.items import ArticleItem, WosArticleItem, WosIdRelationItem, WosArticleTodoIdItem, WosCitedNumberItem
|
|
|
|
from science_article_wos.db_utils.mongo import MongoDBUtils, update_document, build_update_query
|
|
|
|
from science_article_wos.db_utils.mongo import MongoDBUtils, update_document, build_update_query
|
|
|
|
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
@ -139,6 +139,15 @@ class MongoPipeline(MongoDBUtils):
|
|
|
|
return 'items_null_table'
|
|
|
|
return 'items_null_table'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Article2MongoPipeline(MongoPipeline):
|
|
|
|
|
|
|
|
def process_item(self, item, spider):
|
|
|
|
|
|
|
|
# 确定Item类型
|
|
|
|
|
|
|
|
if isinstance(item, ArticleItem):
|
|
|
|
|
|
|
|
super().process_item_update(item, spider=spider)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return item
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CitedRelation2MongoPipeline(MongoPipeline):
|
|
|
|
class CitedRelation2MongoPipeline(MongoPipeline):
|
|
|
|
def process_item(self, item, spider):
|
|
|
|
def process_item(self, item, spider):
|
|
|
|
# 确定Item类型
|
|
|
|
# 确定Item类型
|
|
|
|
@ -206,3 +215,75 @@ class DupTodoBySciencePipeline(DupTodoPipeline):
|
|
|
|
self.inc_item_dropped_count("exists")
|
|
|
|
self.inc_item_dropped_count("exists")
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VerifyDataIntegrity:
|
|
|
|
|
|
|
|
def __init__(self, mongo_uri, mongo_db):
|
|
|
|
|
|
|
|
self.successful_delete = False
|
|
|
|
|
|
|
|
self.batch_ids = set()
|
|
|
|
|
|
|
|
self.successful = []
|
|
|
|
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.mongo_uri = mongo_uri
|
|
|
|
|
|
|
|
self.mongo_db = mongo_db
|
|
|
|
|
|
|
|
self.client: MongoClient = None
|
|
|
|
|
|
|
|
self.db = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
|
|
def from_crawler(cls, crawler):
|
|
|
|
|
|
|
|
settings = crawler.settings
|
|
|
|
|
|
|
|
c = cls(
|
|
|
|
|
|
|
|
mongo_uri=crawler.settings.get("MONGO_URI"),
|
|
|
|
|
|
|
|
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
return c
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_db(self):
|
|
|
|
|
|
|
|
self.client = MongoClient(self.mongo_uri)
|
|
|
|
|
|
|
|
self.db = self.client[self.mongo_db]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def open_spider(self, spider):
|
|
|
|
|
|
|
|
spider_batch_ids = spider.get_batch_ids()
|
|
|
|
|
|
|
|
for batch in spider_batch_ids:
|
|
|
|
|
|
|
|
if batch.get("field", "UT") == "UT":
|
|
|
|
|
|
|
|
self.batch_ids.add(batch.get("third_id"))
|
|
|
|
|
|
|
|
self.init_db()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_item(self, item, spider):
|
|
|
|
|
|
|
|
adapter = ItemAdapter(item)
|
|
|
|
|
|
|
|
if isinstance(item, ArticleItem):
|
|
|
|
|
|
|
|
unique_id = adapter.get("third_id")
|
|
|
|
|
|
|
|
self.successful.append(unique_id)
|
|
|
|
|
|
|
|
if self.successful_delete:
|
|
|
|
|
|
|
|
self.batch_ids.discard(unique_id)
|
|
|
|
|
|
|
|
return item
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def close_spider(self, spider):
|
|
|
|
|
|
|
|
failure = self.batch_ids - set(self.successful)
|
|
|
|
|
|
|
|
coll = self.db.get_collection("todo_ids_wos")
|
|
|
|
|
|
|
|
if self.successful:
|
|
|
|
|
|
|
|
if self.successful_delete:
|
|
|
|
|
|
|
|
coll.delete_many(filter={"third_id": {"$in": self.successful}})
|
|
|
|
|
|
|
|
self.logger.info("Successfully deleted %d articles", len(self.successful))
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
coll.update_many(filter={"third_id": {"$in": self.successful}}, update={"$set": {"state": 1}})
|
|
|
|
|
|
|
|
self.logger.info("Successfully updated %d articles", len(self.successful))
|
|
|
|
|
|
|
|
if failure:
|
|
|
|
|
|
|
|
self.logger.warning("未下载到: %s" % list(failure))
|
|
|
|
|
|
|
|
coll.update_many(filter={"third_id": {"$in": list(failure)}}, update={"$set": {"state": -1}})
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
self.logger.info("Successfully verified: %s" % "下载完整无异常")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def spider_end(self):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
组合检索式,把结果写到数据库里
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
dict(
|
|
|
|
|
|
|
|
content="",
|
|
|
|
|
|
|
|
qeury_id="",
|
|
|
|
|
|
|
|
records_found=0,
|
|
|
|
|
|
|
|
perfact=1,
|
|
|
|
|
|
|
|
state=1,
|
|
|
|
|
|
|
|
reason=""
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|