cssci:add settings

main
zhaoxiangpeng 1 week ago
parent 43b26550e7
commit 7f16b4da3c

@ -28,6 +28,7 @@ if TYPE_CHECKING:
mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING)
logging.getLogger('kafka').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@ -253,7 +254,7 @@ class KafkaPipeline:
future = self.producer.send(
topic=self.topic,
value=d,
headers=[{'source_type': b'cssci'}]
headers=[('source_type', b'cssci')]
)
future.add_callback(self.on_send_success)
future.add_callback(self.on_send_success)
@ -270,6 +271,9 @@ class KafkaPipeline:
def build2kafka(self, item: dict) -> dict:
dd = dict(
id=item.get("third_id"),
**item.get('detailed')
school_id="999",
**item.get('detailed'),
updated_time="2025-11-01 09:01:56"
)
dd.pop("references", None)
return dd

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
# @Time : 2026/1/21 16:45
# @Author : zhaoxiangpeng
# @File : firld_parser.py
import json
from datetime import datetime
from typing import Dict, Callable, Any, List
import pandas as pd
class ScopusFieldParsing:
@staticmethod
def parse_basic_information(frame: Dict[str, dict]) -> Dict[str, Any]:
return dict(
id=frame.get("sno"),
title=frame.get("lypm"),
title_format=handle_format_str(frame.get("lypm")),
abstract=None,
url='http://cssci.nju.edu.cn/control/controllers.php?control=search&action=source_id&id=' + frame.get("sno", ''),
article_type_string=frame.get("subtypeDescription"),
doi=frame.get("prism:doi"),
)
@staticmethod
def parse_date_information(frame: Dict[str, dict]) -> Dict[str, Any]:
frame = frame.get("frame")
date = frame.get("prism:coverDate")
source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source")
publicationdate = source.get("publicationdate")
def f():
results = [dict(
pub_year=publicationdate.get("year"),
v_month=publicationdate.get("month"),
v_day=publicationdate.get("day")
)]
return json_dumps(results, ensure_ascii=False)
return dict(
vyear=publicationdate.get("year"),
pub_date=f(),
ea_year=None,
ea_month=None,
)
@staticmethod
def parse_article_source_information(frame: Dict[str, dict]) -> Dict[str, Any]:
frame = frame.get("frame")
return dict(
volume=frame.get("prism:volume"),
issue=frame.get("prism:issueIdentifier"),
lang=frame.get("language").get("@xml:lang"),
pages=None,
startpage=frame.get("prism:startingPage"),
endpage=frame.get("prism:endingPage"),
)
@staticmethod
def parse_source_information(frame: Dict[str, dict]) -> Dict[str, Any]:
frame = frame.get("frame")
source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source")
issn_list = source.get("issn")
if isinstance(issn_list, dict):
issn_list = [issn_list]
issn = None
eissn = None
for issn_obj in issn_list:
if issn_obj.get('@type') == "print":
issn = issn_obj.get("$")
elif issn_obj.get('@type') == "electronic":
eissn = issn_obj.get("$")
else:
issn = issn_obj.get("$")
return dict(
journal=frame.get("prism:publicationName"),
journal_format=handle_format_str(frame.get("prism:publicationName"), str_type="en"),
issn=FormatUtil.formatISSN(issn),
eissn=FormatUtil.formatISSN(eissn),
cn=None,
isbn=None,
)
@staticmethod
def parse_meeting_information(frame: pd.DataFrame = None) -> Dict[str, Any]:
return dict(
meeting_name=None,
meeting_time=None,
meeting_address=None,
)
@staticmethod
def parse_publish_information(frame: pd.DataFrame = None) -> Dict[str, Any]:
source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source")
return dict(
publisher=None,
pub_city=None,
pub_country=source.get("@country"),
)
@staticmethod
def parse_author_information(frame: pd.DataFrame = None) -> Dict[str, Any]:
author_group: List[dict] = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("author-group", [])
orcid_list = []
for group in author_group:
affiliation: dict = group.get("affiliation", {})
author_list: List[dict] = group.get("author", [])
for author_obj in author_list:
surname = author_obj.get("ce:surname")
given_name = author_obj.get("ce:given-name")
auid = author_obj.get("@auid")
orcid = author_obj.get("@orcid")
if orcid:
orcid_list.append(orcid)
result_dict = process_author_address_relation_row(frame)
return dict(
email=None,
researcher_id=None,
orc_id='; '.join(orcid_list) if orcid_list else None,
author_order=result_dict['author_order'],
address_order=result_dict['address_order'],
relation_author_address=result_dict['relation_author_address'],
)
@staticmethod
def parse_other_information(frame: Dict[str, dict]) -> Dict[str, Any]:
authkeywords = frame.get("authkeywords", {})
auth_keywords = authkeywords.get("author-keyword", [])
keywords = json_dumps([auth_keyword.get("$") for auth_keyword in auth_keywords], ensure_ascii=False)
subject_areas = frame.get("subject-areas", {}).get("subject-area", [])
sub_areas = json_dumps([subject_area.get("$") for subject_area in subject_areas], ensure_ascii=False)
return dict(
key_words=keywords,
sub_code=sub_areas,
source_type="2",
wos_we_tag=None,
)
def _parsing(self, row) -> Dict[str, Any]:
scopus_json = row.get('scopus_json')
df_dict = json.loads(scopus_json)
df = df_dict.get("abstracts-retrieval-response")
new_dict = dict()
new_dict.update(self.parse_basic_information(df))
new_dict.update(self.parse_date_information(df))
new_dict.update(self.parse_article_source_information(df))
new_dict.update(self.parse_source_information(df))
new_dict.update(self.parse_meeting_information(df))
new_dict.update(self.parse_publish_information(df))
new_dict.update(self.parse_author_information(df))
new_dict.update(self.parse_other_information(df))
new_dict.update(dict(updated_time=row.get('updated_time')))
return new_dict
def parsing(self, df: pd.DataFrame = None):
result = df[['scopus_json', 'updated_time']].apply(self._parsing, axis=1)
pdf_result = list(result.values)
return pdf_result

@ -107,4 +107,4 @@ COOKIE_POOL_REDIS_KEY = 'cookies_pool:cssci:session'
COOKIE_REDIS_TTL = 60 * 60 * 6
KAFKA_SERVERS = ['hadoop01:9092', 'hadoop02:9092', 'hadoop03:9092']
KAFKA_TOPIC = "test2kafka" #
KAFKA_TOPIC = "testWosTopic" #

Loading…
Cancel
Save