You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
2.7 KiB
Python

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
from typing import Union
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import pymongo
from pymongo.errors import DuplicateKeyError
class DoubanBookPipeline:
def process_item(self, item, spider):
return item
def standard_list_or_str(list_or_str: Union[list, str, int]) -> Union[str, int, None]:
new_arr = []
if not list_or_str:
return None
if isinstance(list_or_str, int):
return list_or_str
elif isinstance(list_or_str, str):
text = list_or_str.strip()
return text
elif isinstance(list_or_str, list):
for text in list_or_str:
text = text.strip()
text = text.replace('.intro p{text-indent:2em;word-break:normal;}', '')
if text:
new_arr.append(text)
else:
return list_or_str
return '; '.join(new_arr)
class DoubanBookInfoStandard:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# item['author_name'] = standard_list_or_str(adapter.get('author_name'))
# item['translator'] = standard_list_or_str(adapter.get('translator'))
# item['book_catalog'] = standard_list_or_str(adapter.get('book_catalog'))
for key in adapter.keys():
item[key] = standard_list_or_str(adapter.get(key))
return item
class ToCSVPipeline:
def open_spider(self, spider):
print('爬虫开始')
self.f = open('data_douban_top250.csv', 'a', encoding='utf-8')
self.f.write()
def close_spider(self, spider):
if self.f:
self.f.close()
print('爬虫结束')
def process_item(self, item, spider):
pass
class MongoPipeline:
collection_name = "data_douban_top250"
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
try:
self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
except DuplicateKeyError as dup_err:
spider.logger.warning(dup_err)
except Exception:
raise
return item