add: wos liteapi 数据增量
parent
b14a503758
commit
f977b8ad51
@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="jdk" jdkName="pydevenv" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PyDocumentationSettings">
|
||||
<option name="format" value="PLAIN" />
|
||||
<option name="myDocStringFormat" value="Plain" />
|
||||
</component>
|
||||
</module>
|
||||
@ -0,0 +1,12 @@
|
||||
from scrapy.crawler import CrawlerProcess
|
||||
from scrapy.utils.project import get_project_settings
|
||||
|
||||
from science_article_add.scripts.get_db_task import TaskManager
|
||||
|
||||
tm = TaskManager()
|
||||
process = CrawlerProcess(get_project_settings())
|
||||
|
||||
task = tm.get_task_from_mysql()
|
||||
|
||||
process.crawl('wos_latest_increment', task_obj=task)
|
||||
process.start()
|
||||
@ -0,0 +1,20 @@
|
||||
# Define here the models for your scraped items
|
||||
#
|
||||
# See documentation in:
|
||||
# https://docs.scrapy.org/en/latest/topics/items.html
|
||||
|
||||
import scrapy
|
||||
|
||||
|
||||
class ScienceArticleAddItem(scrapy.Item):
|
||||
# define the fields for your item here like:
|
||||
# name = scrapy.Field()
|
||||
third_id = scrapy.Field()
|
||||
updated_at = scrapy.Field()
|
||||
|
||||
|
||||
class WosLiteAddItem(ScienceArticleAddItem):
|
||||
year = scrapy.Field()
|
||||
query_ids = scrapy.Field()
|
||||
school_ids = scrapy.Field()
|
||||
task_ids = scrapy.Field()
|
||||
@ -0,0 +1,113 @@
|
||||
# Define here the models for your spider middleware
|
||||
#
|
||||
# See documentation in:
|
||||
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
|
||||
|
||||
from scrapy import signals
|
||||
|
||||
# useful for handling different item types with a single interface
|
||||
from itemadapter import ItemAdapter
|
||||
|
||||
|
||||
class ScienceArticleAddSpiderMiddleware:
|
||||
# Not all methods need to be defined. If a method is not defined,
|
||||
# scrapy acts as if the spider middleware does not modify the
|
||||
# passed objects.
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler):
|
||||
# This method is used by Scrapy to create your spiders.
|
||||
s = cls()
|
||||
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
|
||||
return s
|
||||
|
||||
def process_spider_input(self, response, spider):
|
||||
# Called for each response that goes through the spider
|
||||
# middleware and into the spider.
|
||||
|
||||
# Should return None or raise an exception.
|
||||
return None
|
||||
|
||||
def process_spider_output(self, response, result, spider):
|
||||
# Called with the results returned from the Spider, after
|
||||
# it has processed the response.
|
||||
|
||||
# Must return an iterable of Request, or item objects.
|
||||
for i in result:
|
||||
yield i
|
||||
|
||||
def process_spider_exception(self, response, exception, spider):
|
||||
# Called when a spider or process_spider_input() method
|
||||
# (from other spider middleware) raises an exception.
|
||||
|
||||
# Should return either None or an iterable of Request or item objects.
|
||||
pass
|
||||
|
||||
async def process_start(self, start):
|
||||
# Called with an async iterator over the spider start() method or the
|
||||
# maching method of an earlier spider middleware.
|
||||
async for item_or_request in start:
|
||||
yield item_or_request
|
||||
|
||||
def spider_opened(self, spider):
|
||||
spider.logger.info("Spider opened: %s" % spider.name)
|
||||
|
||||
|
||||
class ScienceArticleAddDownloaderMiddleware:
|
||||
# Not all methods need to be defined. If a method is not defined,
|
||||
# scrapy acts as if the downloader middleware does not modify the
|
||||
# passed objects.
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler):
|
||||
# This method is used by Scrapy to create your spiders.
|
||||
s = cls()
|
||||
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
|
||||
return s
|
||||
|
||||
def process_request(self, request, spider):
|
||||
# Called for each request that goes through the downloader
|
||||
# middleware.
|
||||
|
||||
# Must either:
|
||||
# - return None: continue processing this request
|
||||
# - or return a Response object
|
||||
# - or return a Request object
|
||||
# - or raise IgnoreRequest: process_exception() methods of
|
||||
# installed downloader middleware will be called
|
||||
return None
|
||||
|
||||
def process_response(self, request, response, spider):
|
||||
# Called with the response returned from the downloader.
|
||||
|
||||
# Must either;
|
||||
# - return a Response object
|
||||
# - return a Request object
|
||||
# - or raise IgnoreRequest
|
||||
return response
|
||||
|
||||
def process_exception(self, request, exception, spider):
|
||||
# Called when a download handler or a process_request()
|
||||
# (from other downloader middleware) raises an exception.
|
||||
|
||||
# Must either:
|
||||
# - return None: continue processing this exception
|
||||
# - return a Response object: stops process_exception() chain
|
||||
# - return a Request object: stops process_exception() chain
|
||||
pass
|
||||
|
||||
def spider_opened(self, spider):
|
||||
spider.logger.info("Spider opened: %s" % spider.name)
|
||||
|
||||
|
||||
class WosLiteApiXkeyDownloaderMiddleware:
|
||||
async def process_request(self, request, spider):
|
||||
key_param = {
|
||||
'X-ApiKey': '941a216f25cbef0f80ee4ba58a08ef1e19dee7a4'
|
||||
}
|
||||
if not request.headers:
|
||||
request.headers = key_param
|
||||
return request
|
||||
|
||||
request.headers.update(key_param)
|
||||
return request
|
||||
@ -0,0 +1,59 @@
|
||||
# Define your item pipelines here
|
||||
#
|
||||
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
|
||||
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
|
||||
|
||||
|
||||
# useful for handling different item types with a single interface
|
||||
from itemadapter import ItemAdapter
|
||||
import pymongo
|
||||
|
||||
|
||||
class ScienceAddBufferPipeline:
|
||||
def __init__(self, buffer_max_size: int = 100):
|
||||
self.buffer = []
|
||||
self.buffer_size = 0
|
||||
self.buffer_max_size = buffer_max_size
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler):
|
||||
return cls(
|
||||
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
|
||||
)
|
||||
|
||||
def process_item(self, item, spider):
|
||||
self.buffer.append(item)
|
||||
return item
|
||||
|
||||
def close_spider(self, spider):
|
||||
self.buffer.clear()
|
||||
|
||||
|
||||
class ScienceArticleAddPipeline(ScienceAddBufferPipeline):
|
||||
def __init__(self, mongo_uri, mongo_db, buffer_max_size):
|
||||
super().__init__(buffer_max_size=buffer_max_size)
|
||||
self.mongo_uri = mongo_uri
|
||||
self.mongo_db = mongo_db
|
||||
self.client = None
|
||||
self.db = None
|
||||
|
||||
@classmethod
|
||||
def from_crawler(cls, crawler):
|
||||
return cls(
|
||||
mongo_uri=crawler.settings.get("MONGO_URI"),
|
||||
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
|
||||
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
|
||||
)
|
||||
|
||||
def open_spider(self, spider):
|
||||
self.client = pymongo.MongoClient(self.mongo_uri)
|
||||
self.db = self.client[self.mongo_db]
|
||||
|
||||
def close_spider(self, spider):
|
||||
self.client.close()
|
||||
|
||||
def process_item(self, item, spider):
|
||||
super().process_item(item, spider)
|
||||
if self.buffer_size >= self.buffer_max_size:
|
||||
self.buffer.clear()
|
||||
return item
|
||||
@ -0,0 +1,49 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2025/9/15 14:36
|
||||
# @Author : zhaoxiangpeng
|
||||
# @File : get_db_task.py
|
||||
|
||||
import pymysql
|
||||
|
||||
FULL_QUERY = """
|
||||
SELECT r.%(org_id)s, r.%(org_name)s, r.%(query_id)s, q.%(content)s, q.%(source_type)s
|
||||
FROM task_search_strategy AS q JOIN relation_org_query AS r ON r.query_id = q.id
|
||||
WHERE q.id = %(q_id)s
|
||||
"""
|
||||
STRATEGY_FIELDS = ['org_id', 'org_name', 'query_id', 'content', 'source_type']
|
||||
|
||||
|
||||
class TaskManager:
|
||||
def __init__(self):
|
||||
self.client: pymysql.Connection = pymysql.connect(host='43.140.203.187', port=3306,
|
||||
database='science_data_dept', user='science-data-dept',
|
||||
passwd='datadept1509', )
|
||||
|
||||
def get_task_from_mysql(self):
|
||||
cursor = self.client.cursor()
|
||||
record_fields = ['id', 'batch_date', 'query_id', 'task_condition', 'is_done']
|
||||
sql = "select %(fields)s from task_batch_record" % {'fields': ', '.join(record_fields)}
|
||||
try:
|
||||
cursor.execute(sql)
|
||||
result = cursor.fetchone()
|
||||
task_record_dic = dict(zip(record_fields, result))
|
||||
fill = dict(zip(STRATEGY_FIELDS, STRATEGY_FIELDS))
|
||||
fill.update(q_id=task_record_dic.get("query_id"))
|
||||
cursor.execute(
|
||||
FULL_QUERY % fill,
|
||||
)
|
||||
result = cursor.fetchone()
|
||||
task_dic = dict(zip(STRATEGY_FIELDS, result))
|
||||
task_dic.update(task_record_dic)
|
||||
except Exception as exc:
|
||||
raise exc
|
||||
else:
|
||||
print(task_dic)
|
||||
return task_dic
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
tm = TaskManager()
|
||||
tm.get_task_from_mysql()
|
||||
@ -0,0 +1,94 @@
|
||||
# Scrapy settings for science_article_add project
|
||||
#
|
||||
# For simplicity, this file contains only settings considered important or
|
||||
# commonly used. You can find more settings consulting the documentation:
|
||||
#
|
||||
# https://docs.scrapy.org/en/latest/topics/settings.html
|
||||
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
|
||||
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
|
||||
|
||||
BOT_NAME = "science_article_add"
|
||||
|
||||
SPIDER_MODULES = ["science_article_add.spiders"]
|
||||
NEWSPIDER_MODULE = "science_article_add.spiders"
|
||||
|
||||
ADDONS = {}
|
||||
|
||||
|
||||
# Crawl responsibly by identifying yourself (and your website) on the user-agent
|
||||
#USER_AGENT = "science_article_add (+http://www.yourdomain.com)"
|
||||
|
||||
# Obey robots.txt rules
|
||||
ROBOTSTXT_OBEY = False
|
||||
|
||||
# Concurrency and throttling settings
|
||||
#CONCURRENT_REQUESTS = 16
|
||||
CONCURRENT_REQUESTS_PER_DOMAIN = 1
|
||||
DOWNLOAD_DELAY = 1
|
||||
|
||||
# Disable cookies (enabled by default)
|
||||
#COOKIES_ENABLED = False
|
||||
|
||||
# Disable Telnet Console (enabled by default)
|
||||
#TELNETCONSOLE_ENABLED = False
|
||||
|
||||
# Override the default request headers:
|
||||
#DEFAULT_REQUEST_HEADERS = {
|
||||
# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
# "Accept-Language": "en",
|
||||
#}
|
||||
|
||||
# Enable or disable spider middlewares
|
||||
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
|
||||
#SPIDER_MIDDLEWARES = {
|
||||
# "science_article_add.middlewares.ScienceArticleAddSpiderMiddleware": 543,
|
||||
#}
|
||||
|
||||
# Enable or disable downloader middlewares
|
||||
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
|
||||
RETRY_ENABLED = True
|
||||
RETRY_TIMES = 2 # 重试3次
|
||||
# RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 400, 403, 404] # 增加了一些常见的错误码
|
||||
#DOWNLOADER_MIDDLEWARES = {
|
||||
# "science_article_add.middlewares.ScienceArticleAddDownloaderMiddleware": 543,
|
||||
#}
|
||||
|
||||
# Enable or disable extensions
|
||||
# See https://docs.scrapy.org/en/latest/topics/extensions.html
|
||||
#EXTENSIONS = {
|
||||
# "scrapy.extensions.telnet.TelnetConsole": None,
|
||||
#}
|
||||
|
||||
# Configure item pipelines
|
||||
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
|
||||
#ITEM_PIPELINES = {
|
||||
# "science_article_add.pipelines.ScienceArticleAddPipeline": 300,
|
||||
#}
|
||||
MONGO_URI = "mongodb://root:123456@192.168.1.211:27017/"
|
||||
MONGO_DATABASE = "science2"
|
||||
|
||||
REDIS_URL = 'redis://:kcidea1509@192.168.1.211:6379/10'
|
||||
|
||||
# Enable and configure the AutoThrottle extension (disabled by default)
|
||||
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
|
||||
#AUTOTHROTTLE_ENABLED = True
|
||||
# The initial download delay
|
||||
#AUTOTHROTTLE_START_DELAY = 5
|
||||
# The maximum download delay to be set in case of high latencies
|
||||
#AUTOTHROTTLE_MAX_DELAY = 60
|
||||
# The average number of requests Scrapy should be sending in parallel to
|
||||
# each remote server
|
||||
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
|
||||
# Enable showing throttling stats for every response received:
|
||||
#AUTOTHROTTLE_DEBUG = False
|
||||
|
||||
# Enable and configure HTTP caching (disabled by default)
|
||||
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
|
||||
#HTTPCACHE_ENABLED = True
|
||||
#HTTPCACHE_EXPIRATION_SECS = 0
|
||||
#HTTPCACHE_DIR = "httpcache"
|
||||
#HTTPCACHE_IGNORE_HTTP_CODES = []
|
||||
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
|
||||
|
||||
# Set settings whose default value is deprecated to a future-proof value
|
||||
FEED_EXPORT_ENCODING = "utf-8"
|
||||
@ -0,0 +1,4 @@
|
||||
# This package will contain the spiders of your Scrapy project
|
||||
#
|
||||
# Please refer to the documentation for information on how to create and manage
|
||||
# your spiders.
|
||||
@ -0,0 +1,138 @@
|
||||
import math
|
||||
from urllib.parse import urlencode
|
||||
from copy import deepcopy
|
||||
import scrapy
|
||||
|
||||
from science_article_add.items import WosLiteAddItem
|
||||
from science_article_add.models import wos_model as model
|
||||
from science_article_add.configs import wos as config
|
||||
from science_article_add.utils import tools
|
||||
|
||||
|
||||
def calculate_next_page(next_page: int = 1, page_size: int = 100):
|
||||
return (next_page - 1) * page_size + 1
|
||||
|
||||
|
||||
class WosLatestIncrementSpider(scrapy.Spider):
|
||||
name = "wos_latest_increment"
|
||||
# allowed_domains = ["wos-api.clarivate.com"]
|
||||
# start_urls = ["https://wos-api.clarivate.com/api/woslite"]
|
||||
custom_settings = dict(
|
||||
DOWNLOADER_MIDDLEWARES={
|
||||
"science_article_add.middlewares.WosLiteApiXkeyDownloaderMiddleware": 500
|
||||
},
|
||||
ITEM_PIPELINES={
|
||||
"science_article_add.pipelines.ScienceAddBufferPipeline": 300,
|
||||
}
|
||||
)
|
||||
|
||||
def __init__(self, task_obj):
|
||||
scrapy.Spider.__init__(self)
|
||||
self.task_obj = task_obj
|
||||
self.record_id = task_obj['id']
|
||||
self.org_id = task_obj['org_id']
|
||||
self.org_name = task_obj['org_name']
|
||||
self.query_id = task_obj['query_id']
|
||||
self.query_content = task_obj['content']
|
||||
self.query_condition = task_obj['task_condition']
|
||||
|
||||
async def start(self):
|
||||
full_query = self.query_content
|
||||
if self.query_condition is not None:
|
||||
full_query = '%(query)s %(condition)s' % {'query': self.query_content, 'condition': self.query_condition}
|
||||
yield scrapy.Request(url=config.WOS_LITE_QUERY_FIRST_API + '?' + urlencode(model.lite_base_model(usr_query=full_query)),
|
||||
dont_filter=True,
|
||||
meta={'query': full_query, 'PAGE': 1})
|
||||
|
||||
async def parse(self, response, **kwargs):
|
||||
meta = response.meta
|
||||
request = response.request
|
||||
task_query_id = self.query_id
|
||||
task_org_id = self.org_id
|
||||
task_record_id = self.record_id
|
||||
self.logger.debug('%s: %s' % ('parse_query_api', meta))
|
||||
|
||||
resp_result = response.json()
|
||||
|
||||
query_result = resp_result.get('QueryResult')
|
||||
datas = resp_result.get('Data')
|
||||
|
||||
query_id = query_result.get('QueryID')
|
||||
records_found = query_result.get('RecordsFound')
|
||||
max_page = math.ceil(records_found / 100)
|
||||
meta_copy: dict = deepcopy(meta)
|
||||
meta_copy.update({'MAX_PAGE': max_page})
|
||||
meta_copy.update({'TOTAL': records_found})
|
||||
meta_copy.update({'QUERY_ID': query_id})
|
||||
meta_copy.update({'next_page': meta['PAGE'] + 1})
|
||||
meta_copy.update({'PAGE': meta['PAGE'] + 1})
|
||||
meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])})
|
||||
|
||||
for data in datas:
|
||||
add_item = WosLiteAddItem()
|
||||
# 入库年份优先按照自己指定的
|
||||
to_db_year = meta.get("search_year")
|
||||
if not to_db_year:
|
||||
publish_year = data.get("Source", {}).get("Published.BiblioYear", [])
|
||||
if publish_year:
|
||||
to_db_year = tools.str2int(publish_year[0])
|
||||
add_item["third_id"] = data.get('UT')
|
||||
add_item["year"] = to_db_year
|
||||
add_item["query_ids"] = [task_query_id]
|
||||
add_item["school_ids"] = [task_org_id]
|
||||
add_item["task_ids"] = [task_record_id]
|
||||
yield add_item
|
||||
|
||||
yield scrapy.Request(
|
||||
url=config.WOS_LITE_QUERY_API + f'/{query_id}',
|
||||
body=model.lite_query_model(**meta_copy),
|
||||
meta=meta_copy,
|
||||
callback=self.parse_query_api,
|
||||
)
|
||||
|
||||
async def parse_query_api(self, response, **kwargs):
|
||||
meta = response.meta
|
||||
task_query_id = self.query_id
|
||||
task_org_id = self.org_id
|
||||
task_record_id = self.record_id
|
||||
self.logger.debug("""
|
||||
%s
|
||||
%s""" % ('parse_query_api', meta))
|
||||
|
||||
resp_result = response.json()
|
||||
query_id = meta.get('QUERY_ID')
|
||||
|
||||
datas = resp_result.get('Data', [])
|
||||
if len(datas):
|
||||
for data in datas:
|
||||
add_item = WosLiteAddItem()
|
||||
# 入库年份优先按照自己指定的
|
||||
to_db_year = meta.get("search_year")
|
||||
if not to_db_year:
|
||||
publish_year = data.get("Source", {}).get("Published.BiblioYear", [])
|
||||
if publish_year:
|
||||
to_db_year = tools.str2int(publish_year[0])
|
||||
add_item["third_id"] = data.get('UT')
|
||||
add_item["year"] = to_db_year
|
||||
add_item["query_ids"] = [task_query_id]
|
||||
add_item["school_ids"] = [task_org_id]
|
||||
add_item["task_ids"] = [task_record_id]
|
||||
yield add_item
|
||||
else:
|
||||
# 根据条件记录生成sql记录结果
|
||||
update_state_condition = ('batch_date = "%(batch_date)s"\n'
|
||||
'query_id = %(query_id)s\n'
|
||||
'task_condition = "%(task_condition)s"') % self.task_obj
|
||||
self.logger.warning('没有数据了\n%s' % update_state_condition)
|
||||
return
|
||||
if meta['first_record'] < meta['TOTAL']:
|
||||
meta_copy = deepcopy(meta)
|
||||
meta_copy.update({'next_page': meta['PAGE'] + 1})
|
||||
meta_copy.update({'PAGE': meta['PAGE'] + 1})
|
||||
meta_copy.update({'first_record': calculate_next_page(meta_copy['next_page'])})
|
||||
yield scrapy.Request(
|
||||
url=config.WOS_LITE_QUERY_API + f'/{query_id}',
|
||||
body=model.lite_query_model(**meta_copy),
|
||||
meta=meta_copy,
|
||||
callback=self.parse_query_api,
|
||||
)
|
||||
@ -0,0 +1,8 @@
|
||||
def str2int(val, replace=0):
|
||||
try:
|
||||
val = int(val)
|
||||
except ValueError:
|
||||
val = replace
|
||||
except TypeError:
|
||||
val = replace
|
||||
return val
|
||||
@ -0,0 +1,11 @@
|
||||
# Automatically created by: scrapy startproject
|
||||
#
|
||||
# For more information about the [deploy] section see:
|
||||
# https://scrapyd.readthedocs.io/en/latest/deploy.html
|
||||
|
||||
[settings]
|
||||
default = science_article_add.settings
|
||||
|
||||
[deploy]
|
||||
#url = http://localhost:6800/
|
||||
project = science_article_add
|
||||
Loading…
Reference in New Issue