diff --git a/science_article_cssci/science_article_cssci/pipelines.py b/science_article_cssci/science_article_cssci/pipelines.py index 8ebc9bb..949e80e 100644 --- a/science_article_cssci/science_article_cssci/pipelines.py +++ b/science_article_cssci/science_article_cssci/pipelines.py @@ -28,6 +28,7 @@ if TYPE_CHECKING: mongo_logger = logging.getLogger('pymongo') mongo_logger.setLevel(logging.WARNING) +logging.getLogger('kafka').setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -253,7 +254,7 @@ class KafkaPipeline: future = self.producer.send( topic=self.topic, value=d, - headers=[{'source_type': b'cssci'}] + headers=[('source_type', b'cssci')] ) future.add_callback(self.on_send_success) future.add_callback(self.on_send_success) @@ -270,6 +271,9 @@ class KafkaPipeline: def build2kafka(self, item: dict) -> dict: dd = dict( id=item.get("third_id"), - **item.get('detailed') + school_id="999", + **item.get('detailed'), + updated_time="2025-11-01 09:01:56" ) + dd.pop("references", None) return dd diff --git a/science_article_cssci/science_article_cssci/scripts/firld_parser.py b/science_article_cssci/science_article_cssci/scripts/firld_parser.py new file mode 100644 index 0000000..a0027c9 --- /dev/null +++ b/science_article_cssci/science_article_cssci/scripts/firld_parser.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# @Time : 2026/1/21 16:45 +# @Author : zhaoxiangpeng +# @File : firld_parser.py +import json +from datetime import datetime +from typing import Dict, Callable, Any, List +import pandas as pd + + +class ScopusFieldParsing: + @staticmethod + def parse_basic_information(frame: Dict[str, dict]) -> Dict[str, Any]: + return dict( + id=frame.get("sno"), + title=frame.get("lypm"), + title_format=handle_format_str(frame.get("lypm")), + abstract=None, + url='http://cssci.nju.edu.cn/control/controllers.php?control=search&action=source_id&id=' + frame.get("sno", ''), + article_type_string=frame.get("subtypeDescription"), + doi=frame.get("prism:doi"), + ) + + @staticmethod + def parse_date_information(frame: Dict[str, dict]) -> Dict[str, Any]: + frame = frame.get("frame") + date = frame.get("prism:coverDate") + source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source") + publicationdate = source.get("publicationdate") + + def f(): + results = [dict( + pub_year=publicationdate.get("year"), + v_month=publicationdate.get("month"), + v_day=publicationdate.get("day") + )] + return json_dumps(results, ensure_ascii=False) + + return dict( + vyear=publicationdate.get("year"), + pub_date=f(), + ea_year=None, + ea_month=None, + ) + + @staticmethod + def parse_article_source_information(frame: Dict[str, dict]) -> Dict[str, Any]: + frame = frame.get("frame") + return dict( + volume=frame.get("prism:volume"), + issue=frame.get("prism:issueIdentifier"), + lang=frame.get("language").get("@xml:lang"), + pages=None, + startpage=frame.get("prism:startingPage"), + endpage=frame.get("prism:endingPage"), + ) + + @staticmethod + def parse_source_information(frame: Dict[str, dict]) -> Dict[str, Any]: + frame = frame.get("frame") + source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source") + issn_list = source.get("issn") + if isinstance(issn_list, dict): + issn_list = [issn_list] + issn = None + eissn = None + for issn_obj in issn_list: + if issn_obj.get('@type') == "print": + issn = issn_obj.get("$") + elif issn_obj.get('@type') == "electronic": + eissn = issn_obj.get("$") + else: + issn = issn_obj.get("$") + + return dict( + journal=frame.get("prism:publicationName"), + journal_format=handle_format_str(frame.get("prism:publicationName"), str_type="en"), + issn=FormatUtil.formatISSN(issn), + eissn=FormatUtil.formatISSN(eissn), + cn=None, + isbn=None, + ) + + @staticmethod + def parse_meeting_information(frame: pd.DataFrame = None) -> Dict[str, Any]: + return dict( + meeting_name=None, + meeting_time=None, + meeting_address=None, + ) + + @staticmethod + def parse_publish_information(frame: pd.DataFrame = None) -> Dict[str, Any]: + source = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("source") + return dict( + publisher=None, + pub_city=None, + pub_country=source.get("@country"), + ) + + @staticmethod + def parse_author_information(frame: pd.DataFrame = None) -> Dict[str, Any]: + author_group: List[dict] = frame.get("item", {}).get("bibrecord", {}).get("head", {}).get("author-group", []) + orcid_list = [] + for group in author_group: + affiliation: dict = group.get("affiliation", {}) + author_list: List[dict] = group.get("author", []) + for author_obj in author_list: + surname = author_obj.get("ce:surname") + given_name = author_obj.get("ce:given-name") + auid = author_obj.get("@auid") + orcid = author_obj.get("@orcid") + if orcid: + orcid_list.append(orcid) + result_dict = process_author_address_relation_row(frame) + + return dict( + email=None, + researcher_id=None, + orc_id='; '.join(orcid_list) if orcid_list else None, + author_order=result_dict['author_order'], + address_order=result_dict['address_order'], + relation_author_address=result_dict['relation_author_address'], + ) + + @staticmethod + def parse_other_information(frame: Dict[str, dict]) -> Dict[str, Any]: + authkeywords = frame.get("authkeywords", {}) + auth_keywords = authkeywords.get("author-keyword", []) + keywords = json_dumps([auth_keyword.get("$") for auth_keyword in auth_keywords], ensure_ascii=False) + subject_areas = frame.get("subject-areas", {}).get("subject-area", []) + sub_areas = json_dumps([subject_area.get("$") for subject_area in subject_areas], ensure_ascii=False) + return dict( + key_words=keywords, + sub_code=sub_areas, + source_type="2", + wos_we_tag=None, + ) + + def _parsing(self, row) -> Dict[str, Any]: + scopus_json = row.get('scopus_json') + df_dict = json.loads(scopus_json) + df = df_dict.get("abstracts-retrieval-response") + new_dict = dict() + new_dict.update(self.parse_basic_information(df)) + new_dict.update(self.parse_date_information(df)) + new_dict.update(self.parse_article_source_information(df)) + new_dict.update(self.parse_source_information(df)) + new_dict.update(self.parse_meeting_information(df)) + new_dict.update(self.parse_publish_information(df)) + new_dict.update(self.parse_author_information(df)) + new_dict.update(self.parse_other_information(df)) + new_dict.update(dict(updated_time=row.get('updated_time'))) + return new_dict + + def parsing(self, df: pd.DataFrame = None): + result = df[['scopus_json', 'updated_time']].apply(self._parsing, axis=1) + pdf_result = list(result.values) + return pdf_result diff --git a/science_article_cssci/science_article_cssci/settings.py b/science_article_cssci/science_article_cssci/settings.py index 5bc6a08..e9c32e5 100644 --- a/science_article_cssci/science_article_cssci/settings.py +++ b/science_article_cssci/science_article_cssci/settings.py @@ -107,4 +107,4 @@ COOKIE_POOL_REDIS_KEY = 'cookies_pool:cssci:session' COOKIE_REDIS_TTL = 60 * 60 * 6 KAFKA_SERVERS = ['hadoop01:9092', 'hadoop02:9092', 'hadoop03:9092'] -KAFKA_TOPIC = "test2kafka" # +KAFKA_TOPIC = "testWosTopic" #