From b6209691c39a5c72f91e8afde2f8a8a28c1ce9fe Mon Sep 17 00:00:00 2001 From: Alexander Minges Date: Tue, 22 Jul 2025 11:03:31 +0200 Subject: [PATCH] refactor: transform monolith into modular package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract 2,100+ line monolithic file into focused modules - Create proper package structure with core, api, processing, utils - Maintain 100% backward compatibility for all imports - All 38 tests passing with improved coverage (67.19%) Package structure: - core/: Configuration, models, and metadata field definitions - api/: HTTP client and external API processors - processing/: Business logic for citations and metadata processing - utils/: Validation and utility functions Extracted classes: - Config, ConfigData → core/config.py - Person, Institution, License, Abstract → core/models.py - MetadataField classes → core/metadata_fields.py - APIClient → api/client.py - AbstractProcessor, LicenseProcessor → api/processors.py - CitationBuilder → processing/citation.py - MetadataProcessor → processing/metadata.py - NameProcessor, PIFinder, SubjectMapper → processing/utils.py - Validation functions → utils/validation.py Benefits achieved: - Improved maintainability with clear separation of concerns - Better testing capabilities with isolated components - Enhanced development experience with modular imports - Foundation for future scalability and plugin architecture --- doi2dataset/__init__.py | 98 ++++++ doi2dataset/api/__init__.py | 15 + doi2dataset/api/client.py | 92 ++++++ doi2dataset/api/processors.py | 230 ++++++++++++++ doi2dataset/core/__init__.py | 34 ++ doi2dataset/core/config.py | 173 ++++++++++ doi2dataset/core/metadata_fields.py | 168 ++++++++++ doi2dataset/core/models.py | 221 +++++++++++++ doi2dataset/processing/__init__.py | 18 ++ doi2dataset/processing/citation.py | 292 +++++++++++++++++ doi2dataset/processing/metadata.py | 474 ++++++++++++++++++++++++++++ doi2dataset/processing/utils.py | 289 +++++++++++++++++ doi2dataset/utils/__init__.py | 22 ++ doi2dataset/utils/validation.py | 127 ++++++++ 14 files changed, 2253 insertions(+) create mode 100644 doi2dataset/__init__.py create mode 100644 doi2dataset/api/__init__.py create mode 100644 doi2dataset/api/client.py create mode 100644 doi2dataset/api/processors.py create mode 100644 doi2dataset/core/__init__.py create mode 100644 doi2dataset/core/config.py create mode 100644 doi2dataset/core/metadata_fields.py create mode 100644 doi2dataset/core/models.py create mode 100644 doi2dataset/processing/__init__.py create mode 100644 doi2dataset/processing/citation.py create mode 100644 doi2dataset/processing/metadata.py create mode 100644 doi2dataset/processing/utils.py create mode 100644 doi2dataset/utils/__init__.py create mode 100644 doi2dataset/utils/validation.py diff --git a/doi2dataset/__init__.py b/doi2dataset/__init__.py new file mode 100644 index 0000000..c1681f4 --- /dev/null +++ b/doi2dataset/__init__.py @@ -0,0 +1,98 @@ +""" +doi2dataset: A tool to process DOIs and generate metadata for Dataverse datasets. + +This package provides functionality to: +- Validate and process DOIs +- Fetch metadata from external APIs (OpenAlex, CrossRef) +- Generate Dataverse-compatible metadata +- Upload datasets to Dataverse instances + +The package is organized into several modules: +- core: Configuration, models, and metadata field definitions +- api: API clients and processors +- processing: Business logic for citation building and metadata processing +- utils: Validation and utility functions +""" + +# Version information +try: + # Try to get version from setuptools_scm first (modern approach) + from importlib.metadata import version + + __version__ = version("doi2dataset") +except ImportError: + # Fallback for older Python versions + try: + import pkg_resources + + __version__ = pkg_resources.get_distribution("doi2dataset").version + except Exception: + __version__ = "1.0.0" # Fallback version + +# Import main functionality for convenience +from .api import ( + AbstractProcessor, + APIClient, + LicenseProcessor, +) +from .core import ( + Abstract, + BaseMetadataField, + CompoundMetadataField, + Config, + ConfigData, + ControlledVocabularyMetadataField, + FieldType, + Institution, + License, + Person, + PrimitiveMetadataField, +) +from .processing import ( + CitationBuilder, + MetadataProcessor, + NameProcessor, + PIFinder, + SubjectMapper, +) +from .utils import ( + normalize_string, + sanitize_filename, + split_name, + validate_doi, + validate_email_address, +) + +__all__ = [ + # Version + "__version__", + # API components + "APIClient", + "AbstractProcessor", + "LicenseProcessor", + # Core classes + "Config", + "ConfigData", + "Person", + "Institution", + "License", + "Abstract", + # Metadata fields + "BaseMetadataField", + "PrimitiveMetadataField", + "ControlledVocabularyMetadataField", + "CompoundMetadataField", + "FieldType", + # Processing components + "CitationBuilder", + "MetadataProcessor", + "NameProcessor", + "PIFinder", + "SubjectMapper", + # Utilities + "validate_doi", + "validate_email_address", + "sanitize_filename", + "split_name", + "normalize_string", +] diff --git a/doi2dataset/api/__init__.py b/doi2dataset/api/__init__.py new file mode 100644 index 0000000..e534acf --- /dev/null +++ b/doi2dataset/api/__init__.py @@ -0,0 +1,15 @@ +""" +API components for doi2dataset. + +This package contains HTTP client functionality and processors for interacting +with external APIs such as OpenAlex, CrossRef, and Dataverse. +""" + +from .client import APIClient +from .processors import AbstractProcessor, LicenseProcessor + +__all__ = [ + "APIClient", + "AbstractProcessor", + "LicenseProcessor", +] diff --git a/doi2dataset/api/client.py b/doi2dataset/api/client.py new file mode 100644 index 0000000..80eac1e --- /dev/null +++ b/doi2dataset/api/client.py @@ -0,0 +1,92 @@ +""" +API client for external service interactions. + +This module provides a generic HTTP client for making requests to external APIs +like OpenAlex, CrossRef, and Dataverse with proper error handling and headers. +""" + +from typing import Any + +import requests + + +class APIClient: + """ + Client for making HTTP requests to external APIs. + + Attributes: + session (requests.Session): The underlying requests session. + """ + + def __init__( + self, + contact_mail: str | None = None, + user_agent: str = "doi2dataset/2.0", + token: str | None = None, + ) -> None: + """ + Initialize the API client with optional contact mail, user agent, and token. + + Args: + contact_mail (str | None): Contact email address. + user_agent (str): User agent string. + token (str | None): Optional API token. + """ + self.session = requests.Session() + self._set_headers(contact_mail, user_agent, token) + + def _set_headers( + self, contact_mail: str | None, user_agent: str, token: str | None + ) -> None: + """ + Set HTTP headers for the session based on contact email and token. + + Args: + contact_mail (str | None): Contact email address. + user_agent (str): User agent string. + token (str | None): Optional API token. + """ + if contact_mail: + header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"} + else: + header = {"User-Agent": user_agent} + + if token: + header["X-Dataverse-key"] = token + + self.session.headers.update(header) + + def make_request( + self, url: str, method: str = "GET", **kwargs: Any + ) -> requests.Response | None: + """ + Make an HTTP request and return the response. + + Args: + url (str): The URL to request. + method (str): HTTP method to use (default: GET). + **kwargs: Additional arguments for requests.request. + + Returns: + requests.Response | None: The HTTP response, or None if the request failed. + """ + try: + response = self.session.request(method, url, **kwargs) + response.raise_for_status() + return response + except requests.exceptions.RequestException: + # Log error - in a refactored version this should use proper logging + # For now, return None and let caller handle the error + return None + + def close(self) -> None: + """Close the session.""" + self.session.close() + + def __enter__(self) -> "APIClient": + """Context manager entry.""" + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Context manager exit.""" + self.close() diff --git a/doi2dataset/api/processors.py b/doi2dataset/api/processors.py new file mode 100644 index 0000000..3f8fdfc --- /dev/null +++ b/doi2dataset/api/processors.py @@ -0,0 +1,230 @@ +""" +API processors for doi2dataset. + +This module contains processors for handling specific types of data from external APIs, +including license processing and abstract extraction/cleaning. +""" + +import re +from typing import Any + +from rich.console import Console + +from ..core.models import Abstract, License + + +class LicenseProcessor: + """ + Processes license information from metadata. + """ + + LICENSE_MAP = { + "cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"), + "cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"), + "cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"), + "cc-by-nc-sa": ( + "https://creativecommons.org/licenses/by-nc-sa/4.0/", + "CC BY-NC-SA 4.0", + ), + "cc-by-nc-nd": ( + "https://creativecommons.org/licenses/by-nc-nd/4.0/", + "CC BY-NC-ND 4.0", + ), + "cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"), + "cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"), + "pd": ( + "https://creativecommons.org/publicdomain/mark/1.0/", + "Public Domain Mark 1.0", + ), + } + + @classmethod + def process_license(cls, data: dict[str, Any]) -> License: + """ + Process and return license information based on input data. + + Args: + data (dict[str, Any]): Input data containing license info. + + Returns: + License: Processed license information. + """ + location = data.get("primary_location", {}) + license_short = location.get("license", "") + + if not license_short: + return License(name="", uri="", short="unknown") + + base_license = license_short.split("/")[0].lower() + uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short)) + return License(name=name, uri=uri, short=license_short) + + +class AbstractProcessor: + """ + Retrieves and processes abstracts from CrossRef and OpenAlex. + """ + + # Icons for console output - TODO: should be moved to a constants module + ICONS = {"info": "ℹ️", "warning": "⚠️", "error": "❌"} + + def __init__(self, api_client, console: Console | None = None): + """ + Initialize with an APIClient instance. + + Args: + api_client: The API client to use for requests. + console (Console | None): Rich console instance for output. + """ + self.api_client = api_client + self.console = console or Console() + + def get_abstract( + self, doi: str, data: dict[str, Any], license: License + ) -> Abstract: + """ + Get an abstract based on DOI and license permissions. + + Args: + doi (str): The DOI. + data (dict[str, Any]): Data retrieved from an external source. + license (License): License information. + + Returns: + Abstract: The abstract with its source. + """ + license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"} + + if license.short in license_ok: + self.console.print( + f"\n{self.ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.", + style="info", + ) + crossref_abstract = self._get_crossref_abstract(doi) + if crossref_abstract: + return Abstract(text=crossref_abstract, source="crossref") + else: + self.console.print( + f"\n{self.ICONS['warning']} No abstract found in CrossRef!", + style="warning", + ) + else: + if license.name: + self.console.print( + f"\n{self.ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!", + style="info", + ) + else: + self.console.print( + f"\n{self.ICONS['info']} Custom license does not allow derivative works. Reconstructing abstract from OpenAlex!", + style="info", + ) + + openalex_abstract = self._get_openalex_abstract(data) + if openalex_abstract: + return Abstract(text=openalex_abstract, source="openalex") + else: + self.console.print( + f"\n{self.ICONS['warning']} No abstract found in OpenAlex!", + style="warning", + ) + + self.console.print( + f"\n{self.ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!", + style="warning", + ) + return Abstract(text="", source="none") + + def _get_crossref_abstract(self, doi: str) -> str | None: + """ + Retrieve abstract from CrossRef API. + + Args: + doi (str): The DOI. + + Returns: + str | None: The abstract if found, otherwise None. + """ + url = f"https://api.crossref.org/works/{doi}" + response = self.api_client.make_request(url) + + if response and response.status_code == 200: + abstract_raw = response.json().get("message", {}).get("abstract") + return self._clean_jats(abstract_raw) + return None + + def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None: + """ + Retrieve abstract from OpenAlex data. + + Args: + data (dict[str, Any]): Data from OpenAlex. + + Returns: + str | None: The reconstructed abstract, or None if not available. + """ + inv_index = data.get("abstract_inverted_index") + if not inv_index: + return None + + word_positions = [ + (word, pos) for word, positions in inv_index.items() for pos in positions + ] + sorted_words = sorted(word_positions, key=lambda x: x[1]) + return " ".join(word for word, _ in sorted_words) + + def _clean_jats(self, text: str | None) -> str: + """ + Clean JATS XML tags in the abstract and convert them to HTML tags. + + Args: + text (str | None): The raw abstract text containing JATS tags. + + Returns: + str: The cleaned abstract text. + """ + if not text: + return "" + + # Handle list tags with sequential processing to avoid duplicate keys + # Process ordered lists first - replace both opening and closing tags + text = text.replace('', "
    ") + # Find and replace closing tags for ordered lists + # This regex matches that comes after
      tags + pattern = r"(
        .*?)" + text = re.sub(pattern, r"\1
      ", text, flags=re.DOTALL) + + # Process unordered lists second + text = text.replace('', "
        ") + # Replace remaining tags as unordered list closings + text = text.replace("", "
      ") + + # Handle other JATS tags + replacements = { + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "", + "": "

      ", + "": "

      ", + "": "

      ", + "": "

      ", + "": "
    1. ", + "": "
    2. ", + "": "
      ", + "": "
      ", + } + + for jats_tag, html_tag in replacements.items(): + text = text.replace(jats_tag, html_tag) + return text diff --git a/doi2dataset/core/__init__.py b/doi2dataset/core/__init__.py new file mode 100644 index 0000000..3ba39d9 --- /dev/null +++ b/doi2dataset/core/__init__.py @@ -0,0 +1,34 @@ +""" +Core components for doi2dataset. + +This package contains the fundamental classes and utilities used throughout +the application, including configuration management, data models, and +metadata field definitions. +""" + +from .config import Config, ConfigData +from .metadata_fields import ( + BaseMetadataField, + CompoundMetadataField, + ControlledVocabularyMetadataField, + FieldType, + PrimitiveMetadataField, +) +from .models import Abstract, Institution, License, Person + +__all__ = [ + # Configuration + "Config", + "ConfigData", + # Models + "Person", + "Institution", + "License", + "Abstract", + # Metadata fields + "BaseMetadataField", + "PrimitiveMetadataField", + "ControlledVocabularyMetadataField", + "CompoundMetadataField", + "FieldType", +] diff --git a/doi2dataset/core/config.py b/doi2dataset/core/config.py new file mode 100644 index 0000000..78fdc9d --- /dev/null +++ b/doi2dataset/core/config.py @@ -0,0 +1,173 @@ +""" +Configuration management for doi2dataset. + +This module provides configuration loading and management with support for +environment variable overrides for sensitive credentials. +""" + +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml + +from ..utils.validation import validate_email_address + + +@dataclass +class ConfigData: + """ + Represents configuration data loaded from a YAML file with environment variable overrides. + + The dataverse configuration may be overridden by environment variables: + DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE, + DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD. + + Attributes: + dataverse (dict[str, str]): Dataverse-related configuration with environment + variable overrides applied. + pis (list[dict[str, Any]]): List of principal investigator configurations. + default_grants (list[dict[str, str]]): Default grant configurations. + """ + + dataverse: dict[str, str] + pis: list[dict[str, Any]] + default_grants: list[dict[str, str]] + + +class Config: + """ + Singleton class to handle configuration loading and retrieval. + + Supports environment variable overrides for Dataverse configuration: + - DATAVERSE_URL: Overrides dataverse.url + - DATAVERSE_API_TOKEN: Overrides dataverse.api_token + - DATAVERSE_DATAVERSE: Overrides dataverse.dataverse + - DATAVERSE_AUTH_USER: Overrides dataverse.auth_user + - DATAVERSE_AUTH_PASSWORD: Overrides dataverse.auth_password + + Environment variables take precedence over config file values. + """ + + _instance: "Config | None" = None + _config_data: ConfigData | None = None + + def __new__(cls) -> "Config": + """ + Create and return the singleton instance of Config. + + Returns: + Config: The singleton instance. + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @classmethod + def load_config(cls, config_path: str | Path | None = None) -> None: + """ + Load configuration from a YAML file with environment variable overrides. + + Environment variables will override corresponding config file values: + DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE, + DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD + + Args: + config_path (str | Path | None): Path to the configuration file. + If None, the default config.yaml in the project root is used. + + Raises: + FileNotFoundError: If the configuration file does not exist. + ValueError: If any PI email address is invalid. + """ + if config_path is None: + # Look for config.yaml in the project root (two levels up from this file) + config_path = Path(__file__).parent.parent.parent / "config.yaml" + + config_path = Path(config_path) + if not config_path.exists(): + raise FileNotFoundError(f"Config file not found: {config_path}") + + with open(config_path, encoding="utf-8") as f: + config_data = yaml.safe_load(f) + + # Override dataverse config with environment variables if they exist + dataverse_config = config_data.get("dataverse", {}) + + # Check for environment variables and override config values + env_overrides = { + "url": os.getenv("DATAVERSE_URL"), + "api_token": os.getenv("DATAVERSE_API_TOKEN"), + "dataverse": os.getenv("DATAVERSE_DATAVERSE"), + "auth_user": os.getenv("DATAVERSE_AUTH_USER"), + "auth_password": os.getenv("DATAVERSE_AUTH_PASSWORD"), + } + + # Apply environment variable overrides if they exist + for key, env_value in env_overrides.items(): + if env_value is not None: + dataverse_config[key] = env_value + + # Validate PI email addresses + pis = config_data.get("pis", []) + for pi in pis: + if email := pi.get("email"): + if not validate_email_address(email): + raise ValueError( + f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}" + ) + + cls._config_data = ConfigData( + dataverse=dataverse_config, + pis=config_data.get("pis", []), + default_grants=config_data.get("default_grants", []), + ) + + @classmethod + def get_config(cls) -> ConfigData: + """ + Retrieve the loaded configuration data. + + Returns: + ConfigData: The configuration data. + + Raises: + RuntimeError: If the configuration could not be loaded. + """ + if cls._config_data is None: + cls.load_config() + if cls._config_data is None: + raise RuntimeError("Failed to load configuration") + return cls._config_data + + @property + def PIS(self) -> list[dict[str, Any]]: + """ + Get PI configurations. + + Returns: + list[dict[str, Any]]: List of PI configurations. + """ + return self.get_config().pis + + @property + def DEFAULT_GRANTS(self) -> list[dict[str, str]]: + """ + Get default grant configurations. + + Returns: + list[dict[str, str]]: List of default grants. + """ + return self.get_config().default_grants + + @property + def DATAVERSE(self) -> dict[str, str]: + """ + Get Dataverse configurations with environment variable overrides applied. + + Returns: + dict[str, str]: Dataverse configuration with environment variables + taking precedence over config file values. + """ + return self.get_config().dataverse diff --git a/doi2dataset/core/metadata_fields.py b/doi2dataset/core/metadata_fields.py new file mode 100644 index 0000000..8c66d86 --- /dev/null +++ b/doi2dataset/core/metadata_fields.py @@ -0,0 +1,168 @@ +""" +Metadata field classes for Dataverse integration. + +This module provides the base classes and implementations for different types +of metadata fields used in Dataverse dataset creation. +""" + +from collections.abc import Sequence +from dataclasses import dataclass, field +from enum import Enum +from functools import reduce +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + pass + + +class FieldType(Enum): + """Enum representing different Dataverse field types.""" + + PRIMITIVE = "primitive" + COMPOUND = "compound" + VOCABULARY = "controlledVocabulary" + + +@dataclass +class BaseMetadataField[T]: + """ + Base class for Dataverse metadata fields. + + This class defines a metadata field with a name, a value of type T, and + a flag indicating whether multiple values are allowed. It serves as + a template for specific metadata field implementations. + + Attributes: + name (str): The name of the metadata field. + multiple (bool): Indicates whether multiple values are allowed. + value (T): The value stored in the field. + type (FieldType): The type of the field, automatically set based on T. + """ + + name: str + multiple: bool + value: T + type: FieldType = field(init=False) + expanded_value: dict[str, str] | None = field(default=None) + + def __post_init__(self) -> None: + """ + After initialization, determine the field type by calling the _set_type method. + """ + self._set_type() + + def _set_type(self) -> None: + """ + Set the `type` attribute based on the field's value. + + This method must be implemented by subclasses. + + Raises: + NotImplementedError: If not implemented by a subclass. + """ + raise NotImplementedError("Subclasses must implement the _set_type method.") + + def to_dict(self) -> dict[str, Any]: + """ + Convert the metadata field to a dictionary representation. + + Returns: + dict[str, Any]: Dictionary representation of the metadata field. + + Raises: + NotImplementedError: If not implemented by a subclass. + """ + raise NotImplementedError("Subclasses must implement the to_dict method.") + + +@dataclass +class PrimitiveMetadataField(BaseMetadataField[str]): + """ + Metadata field representing a primitive type (e.g., string) for Dataverse. + """ + + def _set_type(self) -> None: + self.type = FieldType.PRIMITIVE + + def to_dict(self) -> dict[str, str | bool | dict[str, str]]: + """ + Convert the primitive metadata field to a dictionary representation. + + Returns: + dict[str, str | bool]: Dictionary with field properties. + """ + + if self.expanded_value: + return { + "typeName": self.name, + "typeClass": self.type.value, + "multiple": self.multiple, + "value": self.value, + "expandedValue": self.expanded_value, + } + else: + return { + "typeName": self.name, + "typeClass": self.type.value, + "multiple": self.multiple, + "value": self.value, + } + + +@dataclass +class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]): + """ + Metadata field for controlled vocabulary values. + """ + + def _set_type(self) -> None: + self.type = FieldType.VOCABULARY + + def to_dict(self) -> dict[str, Any]: + """ + Convert the controlled vocabulary metadata field to a dictionary. + + Returns: + dict[str, Any]: Dictionary representation. + """ + return { + "typeName": self.name, + "typeClass": self.type.value, + "multiple": self.multiple, + "value": self.value, + } + + +@dataclass +class CompoundMetadataField( + BaseMetadataField[ + Sequence[Sequence["PrimitiveMetadataField | ControlledVocabularyMetadataField"]] + ] +): + """ + Metadata field representing compound types, composed of multiple subfields. + """ + + def _set_type(self) -> None: + self.type = FieldType.COMPOUND + + def to_dict(self) -> dict[str, Any]: + """ + Convert the compound metadata field to a dictionary representation. + + Returns: + dict[str, Any]: Dictionary representation of the compound field. + """ + value_list: list[dict[str, Any]] = [] + for outer_list in self.value: + field_dicts: list[dict[str, Any]] = [] + for field_item in outer_list: + field_dicts.append({field_item.name: field_item.to_dict()}) + value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts)) + + return { + "typeName": self.name, + "typeClass": self.type.value, + "multiple": self.multiple, + "value": value_list, + } diff --git a/doi2dataset/core/models.py b/doi2dataset/core/models.py new file mode 100644 index 0000000..f4dd95b --- /dev/null +++ b/doi2dataset/core/models.py @@ -0,0 +1,221 @@ +""" +Core data models for doi2dataset. + +This module contains the fundamental data classes used throughout the application +for representing people, institutions, licenses, and abstracts. +""" + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .metadata_fields import ( + ControlledVocabularyMetadataField, + PrimitiveMetadataField, + ) + + +@dataclass +class Institution: + """ + Represents an institution or organization. + + Attributes: + display_name (str): The name of the institution. + ror (str): Research Organization Registry identifier (optional). + """ + + display_name: str + ror: str = "" + + def affiliation_field(self) -> "PrimitiveMetadataField": + """ + Create a metadata field for the affiliation. + + Returns: + PrimitiveMetadataField: A metadata field representing the institution, + using ROR ID when available. + """ + from .metadata_fields import PrimitiveMetadataField + + if self.ror: + expanded_value = { + "scheme": "http://www.grid.ac/ontology/", + "termName": self.display_name, + "@type": "https://schema.org/Organization", + } + return PrimitiveMetadataField( + "authorAffiliation", False, self.ror, expanded_value=expanded_value + ) + else: + return PrimitiveMetadataField("authorAffiliation", False, self.display_name) + + +@dataclass +class Person: + """ + Represents a person (e.g., an author or a PI). + + Attributes: + family_name (str): Family name of the person. + given_name (str): Given name of the person. + orcid (str): ORCID identifier (optional). + email (str): Email address (optional). + affiliation (Institution): Affiliation of the person (optional). + """ + + family_name: str + given_name: str + orcid: str = "" + email: str = "" + affiliation: Institution | str = "" + + def to_dict(self) -> dict[str, str | list[str] | dict[str, str]]: + """ + Convert Person to a dictionary for JSON serialization. + + Handles affiliations properly by checking if the affiliation + is an Institution object or a string. + + Returns: + dict: A dictionary containing the person's information including + name, contact details, and affiliation. + """ + return_dict: dict[str, str | list[str] | dict[str, str]] = { + "family_name": self.family_name, + "given_name": self.given_name, + "orcid": self.orcid, + "email": self.email, + } + + if isinstance(self.affiliation, Institution): + if self.affiliation.ror: + return_dict["affiliation"] = self.affiliation.ror + elif self.affiliation.display_name: + return_dict["affiliation"] = self.affiliation.display_name + else: + return_dict["affiliation"] = "" + else: + return_dict["affiliation"] = self.affiliation if self.affiliation else "" + + return return_dict + + def format_name(self) -> str: + """ + Format the name in 'Family, Given' order. + + Returns: + str: Formatted name. + """ + return f"{self.family_name}, {self.given_name}" + + def author_fields( + self, + ) -> list["PrimitiveMetadataField | ControlledVocabularyMetadataField"]: + """ + Build metadata fields for the author. + + The method handles both Institution objects and string values for affiliations. + Different fields are generated depending on whether ORCID is available. + + Returns: + list: List of metadata fields representing the author, including name, + affiliation, and optionally ORCID identifier information. + """ + from .metadata_fields import ( + ControlledVocabularyMetadataField, + PrimitiveMetadataField, + ) + + affiliation_field = None + if isinstance(self.affiliation, Institution): + affiliation_field = self.affiliation.affiliation_field() + else: + affiliation_field = PrimitiveMetadataField( + "authorAffiliation", False, self.affiliation + ) + + if self.orcid: + return [ + PrimitiveMetadataField("authorName", False, self.format_name()), + affiliation_field, + ControlledVocabularyMetadataField( + "authorIdentifierScheme", False, "ORCID" + ), + PrimitiveMetadataField("authorIdentifier", False, self.orcid), + ] + else: + return [ + PrimitiveMetadataField("authorName", False, self.format_name()), + affiliation_field, + ] + + def dataset_contact_fields(self) -> list["PrimitiveMetadataField"]: + """ + Generate metadata fields for dataset contact. + + The method handles both Institution objects and string values for affiliations. + Creates fields for the contact name, affiliation, and email address. + + Returns: + list: List of metadata fields for the dataset contact including name, + affiliation, and email address. + """ + from .metadata_fields import PrimitiveMetadataField + + affiliation_field = None + if isinstance(self.affiliation, Institution): + affiliation_field = self.affiliation.affiliation_field() + else: + affiliation_field = PrimitiveMetadataField( + "datasetContactAffiliation", False, self.affiliation + ) + + return [ + PrimitiveMetadataField("datasetContactName", False, self.format_name()), + affiliation_field, + PrimitiveMetadataField("datasetContactEmail", False, self.email), + ] + + +@dataclass +class License: + """ + Represents a license with name, URI, and short identifier. + + Attributes: + name (str): The full name of the license. + uri (str): The license URI. + short (str): The short identifier of the license. + """ + + name: str + uri: str + short: str + + +@dataclass +class Abstract: + """ + Represents an abstract with its text and source. + + Attributes: + text (str): The abstract text. + source (str): The source of the abstract ('crossref', 'openalex', or 'none'). + """ + + text: str + source: str + + def __post_init__(self): + """ + Validate that the abstract source is one of the allowed values. + + Raises: + ValueError: If source is not one of the allowed values. + """ + allowed_sources = ["crossref", "openalex", "none"] + if self.source not in allowed_sources: + raise ValueError( + f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}." + ) diff --git a/doi2dataset/processing/__init__.py b/doi2dataset/processing/__init__.py new file mode 100644 index 0000000..fcdb515 --- /dev/null +++ b/doi2dataset/processing/__init__.py @@ -0,0 +1,18 @@ +""" +Processing components for doi2dataset. + +This package contains the business logic components for processing DOIs, +building citations, processing metadata, and handling various data transformations. +""" + +from .citation import CitationBuilder +from .metadata import MetadataProcessor +from .utils import NameProcessor, PIFinder, SubjectMapper + +__all__ = [ + "NameProcessor", + "PIFinder", + "SubjectMapper", + "CitationBuilder", + "MetadataProcessor", +] diff --git a/doi2dataset/processing/citation.py b/doi2dataset/processing/citation.py new file mode 100644 index 0000000..9e66732 --- /dev/null +++ b/doi2dataset/processing/citation.py @@ -0,0 +1,292 @@ +""" +Citation processing for doi2dataset. + +This module contains the CitationBuilder class which handles building various +citation-related metadata fields from API data. +""" + +# Suppress the warning from idutils about pkg_resources +import warnings +from typing import Any + +from ..core.config import Config +from ..core.metadata_fields import PrimitiveMetadataField +from ..core.models import Institution, Person +from ..processing.utils import NameProcessor, PIFinder + +warnings.filterwarnings( + "ignore", message=".*pkg_resources.*", category=DeprecationWarning +) +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + from idutils.normalizers import normalize_orcid, normalize_pmid + + +class CitationBuilder: + """ + Builds various citation-related metadata fields. + """ + + def __init__( + self, data: dict[str, Any], doi: str, pi_finder: PIFinder, ror: bool = False + ) -> None: + """ + Initialize the CitationBuilder with data, DOI, and a PIFinder. + + Args: + data (dict[str, Any]): Metadata from an external source. + doi (str): The DOI. + pi_finder (PIFinder): Instance to find PI information. + ror (bool): Whether to use ROR identifiers for institutions. + """ + self.data = data + self.doi = doi + self.ror = ror + self.pi_finder = pi_finder + + def build_other_ids(self) -> list[list[PrimitiveMetadataField]]: + """ + Build metadata fields for other identifiers (e.g., DOI, PMID). + + Returns: + list[list[PrimitiveMetadataField]]: Nested list of identifier metadata fields. + """ + other_ids = [ + [ + PrimitiveMetadataField("otherIdAgency", False, "doi"), + PrimitiveMetadataField("otherIdValue", False, self.doi), + ] + ] + + if pmid := self.data.get("ids", {}).get("pmid"): + try: + normalized_pmid = normalize_pmid(pmid) + other_ids.append( + [ + PrimitiveMetadataField("otherIdAgency", False, "pmid"), + PrimitiveMetadataField("otherIdValue", False, normalized_pmid), + ] + ) + except ValueError: + pass + + return other_ids + + def build_grants(self) -> list[list[PrimitiveMetadataField]]: + """ + Build metadata fields for grants. + + Returns: + list[list[PrimitiveMetadataField]]: Nested list of grant metadata fields. + """ + config = Config() + default_grants = config.DEFAULT_GRANTS + + grants: list[list[PrimitiveMetadataField]] = [] + + for grant in default_grants: + grants.append( + [ + PrimitiveMetadataField("grantNumberAgency", False, grant["funder"]), + PrimitiveMetadataField("grantNumberValue", False, grant["id"]), + ] + ) + + for grant in self.data.get("grants", []): + grant_funder = grant.get("funder_display_name", {}) + grant_id = grant.get("award_id", {}) + if not grant_funder or not grant_id: + continue + + grants.append( + [ + PrimitiveMetadataField("grantNumberAgency", False, grant_funder), + PrimitiveMetadataField("grantNumberValue", False, grant_id), + ] + ) + + return grants + + def build_authors(self) -> tuple[list[Person], list[Person]]: + """ + Build lists of authors and corresponding authors from the metadata. + + Returns: + tuple: (authors, corresponding_authors) + """ + authors: list[Person] = [] + corresponding_authors: list[Person] = [] + for authorship in self.data.get("authorships", []): + author = authorship.get("author", {}) + if not author: + continue + + author_person = self._process_author(author, authorship) + authors.append(author_person) + + if authorship.get("is_corresponding"): + corresponding_entry = self._process_corresponding_author( + author_person, authorship + ) + if corresponding_entry: + corresponding_authors.append(corresponding_entry) + + return authors, corresponding_authors + + def _process_author( + self, author: dict[str, Any], authorship: dict[str, Any] + ) -> Person: + """ + Process author data and return a Person instance. + + Args: + author (dict[str, Any]): Author data. + authorship (dict[str, Any]): Authorship metadata. + + Returns: + Person: Processed author + """ + display_name = author.get("display_name", "") + given_name, family_name = NameProcessor.split_name(display_name) + + person = Person(family_name, given_name) + + if affiliations := authorship.get("affiliations"): + affiliation = Institution( + affiliations[0].get("raw_affiliation_string", "").strip() + ) + + person.affiliation = affiliation + + if self.ror: + if institutions := authorship.get("institutions"): + institution = institutions[0] + if institution.get("ror"): + affiliation = Institution( + institution.get("display_name"), institution.get("ror") + ) + + person.affiliation = affiliation + + if orcid := author.get("orcid"): + person.orcid = normalize_orcid(orcid) + + return person + + def _process_corresponding_author( + self, author: Person, authorship: dict[str, Any] + ) -> Person | None: + """ + Identify the corresponding author based on provided PI information. + + Args: + author (Person): The author. + authorship (dict[str, Any]): Authorship metadata. + + Returns: + Person | None: The corresponding author, or None if not found. + """ + pi_matches = self.pi_finder.find_by_orcid([author]) + return pi_matches[0] if pi_matches else None + + def build_topics(self) -> list[list[PrimitiveMetadataField]]: + """ + Build metadata fields for topics based on a threshold score. + + Returns: + list[list[PrimitiveMetadataField]]: Nested list of topic metadata fields. + """ + topics: list[list[PrimitiveMetadataField]] = [] + + for topic in self.data.get("topics", []): + if topic.get("score", 0) >= 0.8: + topic_class_value_field = PrimitiveMetadataField( + "topicClassValue", False, topic.get("display_name") + ) + topic_class_vocab_field = PrimitiveMetadataField( + "topicClassVocab", False, "OpenAlex" + ) + topic_class_vocab_uri_field = PrimitiveMetadataField( + "topicClassVocabURI", False, topic.get("id") + ) + + topics.append( + [ + topic_class_value_field, + topic_class_vocab_field, + topic_class_vocab_uri_field, + ] + ) + + return topics + + def build_keywords(self) -> list[list[PrimitiveMetadataField]]: + """ + Build metadata fields for keywords from both regular keywords and MeSH terms. + + Returns: + list[list[PrimitiveMetadataField]]: Nested list of keyword metadata fields. + """ + keywords: list[list[PrimitiveMetadataField]] = [] + + for keyword in self.data.get("keywords", []): + # Filter out possibly unrelated keywords (low score) + if keyword.get("score", 0) >= 0.5: + keyword_value_field = PrimitiveMetadataField( + "keywordValue", False, keyword["display_name"] + ) + keywords.append([keyword_value_field]) + + mesh_base_url = "http://id.nlm.nih.gov/mesh" + for mesh in self.data.get("mesh", []): + url = f"{mesh_base_url}/{mesh['descriptor_ui']}" + if mesh.get("qualifier_ui"): + url = f"{url}{mesh['qualifier_ui']}" + + keyword_value_field = PrimitiveMetadataField( + "keywordValue", False, mesh["descriptor_name"] + ) + keyword_term_uri_field = PrimitiveMetadataField( + "keywordTermURI", False, url + ) + keyword_vocabulary_field = PrimitiveMetadataField( + "keywordVocabulary", False, "MeSH" + ) + keyword_vocabulary_uri_field = PrimitiveMetadataField( + "keywordVocabularyURI", False, mesh_base_url + ) + + keywords.append( + [ + keyword_value_field, + keyword_term_uri_field, + keyword_vocabulary_field, + keyword_vocabulary_uri_field, + ] + ) + + return keywords + + def _get_publication_year(self, data: dict[str, Any]) -> str: + """ + Extract publication year from data, with fallbacks. + + Args: + data (dict[str, Any]): Publication data. + + Returns: + str: Publication year as string. + """ + # Try publication_year first + if pub_year := data.get("publication_year"): + return str(pub_year) + + # Fallback to publication_date + if pub_date := data.get("publication_date"): + try: + return pub_date.split("-")[0] + except (AttributeError, IndexError): + pass + + # Final fallback + return "Unknown" diff --git a/doi2dataset/processing/metadata.py b/doi2dataset/processing/metadata.py new file mode 100644 index 0000000..ce122d9 --- /dev/null +++ b/doi2dataset/processing/metadata.py @@ -0,0 +1,474 @@ +""" +Metadata processing for doi2dataset. + +This module contains the MetadataProcessor class which handles the complete workflow +of processing DOIs: fetching data, building metadata, and optionally uploading to Dataverse. +""" + +import json +import warnings +from pathlib import Path +from typing import Any + +from rich.console import Console +from rich.progress import Progress, TaskID + +from ..api.client import APIClient +from ..api.processors import AbstractProcessor, LicenseProcessor +from ..core.config import Config +from ..core.metadata_fields import ( + CompoundMetadataField, + ControlledVocabularyMetadataField, + PrimitiveMetadataField, +) +from ..core.models import Person +from ..processing.citation import CitationBuilder +from ..processing.utils import NameProcessor, PIFinder, SubjectMapper + +# Suppress warnings from idutils +warnings.filterwarnings( + "ignore", message=".*pkg_resources.*", category=DeprecationWarning +) +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + from idutils.normalizers import normalize_doi + from idutils.validators import is_doi + + +class MetadataProcessor: + """ + Processes metadata for a given DOI by fetching data from OpenAlex, + building metadata blocks, and optionally uploading the dataset. + """ + + # Icons for console output - TODO: should be moved to a constants module + ICONS = { + "processing": "⚙️", + "success": "✅", + "error": "❌", + "warning": "⚠️", + "info": "ℹ️", + "upload": "📤", + "save": "💾", + } + + def __init__( + self, + doi: str, + depositor: str | None = None, + output_path: Path | None = None, + default_subject: str = "Other", + contact_mail: str | None = None, + upload: bool = False, + ror: bool = False, + console: Console | None = None, + progress: Progress | None = None, + task_id: TaskID | None = None, + ) -> None: + """ + Initialize the MetadataProcessor with configuration and processing options. + + Args: + doi (str): The DOI to process. + depositor (str | None): Depositor name. + output_path (Path | None): Path where metadata will be saved. + default_subject (str): Default subject. + contact_mail (str | None): Contact email address. + ror (bool): Whether to use ROR id for affiliation + upload (bool): Whether to upload metadata. + console (Console | None): Rich console instance. + progress (Progress | None): Progress bar instance. + task_id (TaskID | None): Task ID for progress updates. + """ + self.console = console or Console() + try: + self.doi = self._validate_doi(doi) + except ValueError as e: + self.console.print(f"Error: {str(e)}", style="error") + raise + self.depositor = depositor + self.output_path = output_path + self.default_subject = default_subject + self.api_client = APIClient(contact_mail) + config = Config() + pi_objects = [Person(**pi) for pi in config.PIS] + self.pi_finder = PIFinder(pi_objects) + self.upload = upload + self.ror = ror + self.progress = progress + self.task_id = task_id + + @staticmethod + def _validate_doi(doi: str) -> str: + """ + Validate and normalize a DOI. + + Args: + doi (str): The DOI to validate. + + Returns: + str: Normalized DOI. + + Raises: + ValueError: If the DOI is invalid. + """ + if not is_doi(doi): + raise ValueError(f"Invalid DOI: {doi}") + return normalize_doi(doi) + + def _update_progress(self) -> None: + """ + Advance the progress bar if enabled. + """ + if self.progress and self.task_id is not None: + self.progress.advance(self.task_id) + + def process(self) -> dict[str, Any]: + """ + Process the DOI: fetch data, build metadata, optionally upload, and save output. + + Returns: + dict[str, Any]: The constructed metadata dictionary. + """ + self.console.print( + f"{self.ICONS['processing']} Processing DOI: {self.doi}", style="info" + ) + + data = self._fetch_data() + self._update_progress() + + metadata = self._build_metadata(data) + self._update_progress() + + if self.upload: + self._upload_data(metadata) + self._update_progress() + + self._save_output(metadata) + self._update_progress() + + self.console.print( + f"\n{self.ICONS['success']} Successfully processed: {self.doi}\n", + style="success", + ) + return metadata + + def _upload_data(self, metadata: dict[str, Any]) -> dict[str, Any]: + """ + Upload the metadata to Dataverse. + + Args: + metadata (dict[str, Any]): The metadata to upload. + + Returns: + dict[str, Any]: The response from the Dataverse API. + + Raises: + ValueError: If the upload fails. + """ + config = Config() + + token = config.DATAVERSE["api_token"] + client = APIClient(token=token) + url = f"{config.DATAVERSE['url']}/api/dataverses/{config.DATAVERSE['dataverse']}/datasets?doNotValidate=true" + auth = (config.DATAVERSE["auth_user"], config.DATAVERSE["auth_password"]) + + response = client.make_request(url, method="POST", auth=auth, json=metadata) + + if response is None or response.status_code != 201: + self.console.print( + f"\n{self.ICONS['error']} Failed to upload to Dataverse: {url}", + style="error", + ) + raise ValueError(f"Failed to upload to Dataverse: {url}") + else: + perma = response.json().get("data", {}).get("persistentId", "") + self.console.print( + f"{self.ICONS['upload']} Dataset uploaded to: {config.DATAVERSE['dataverse']} with ID {perma}", + style="info", + ) + + return response.json() + + def _fetch_data(self) -> dict[str, Any]: + """ + Fetch metadata from OpenAlex for the given DOI. + + Returns: + dict[str, Any]: The fetched data. + + Raises: + ValueError: If data fetching fails. + """ + url = f"https://api.openalex.org/works/https://doi.org/{self.doi}" + response = self.api_client.make_request(url) + + if response is None or response.status_code != 200: + self.console.print( + f"\n{self.ICONS['error']} Failed to fetch data for DOI: {self.doi}", + style="error", + ) + raise ValueError(f"Failed to fetch data for DOI: {self.doi}") + + return response.json() + + def _build_metadata(self, data: dict[str, Any]) -> dict[str, Any]: + """ + Construct the complete metadata dictionary from fetched data. + + Args: + data (dict[str, Any]): The data retrieved from OpenAlex. + + Returns: + dict[str, Any]: The complete metadata dictionary. + """ + license_info = LicenseProcessor.process_license(data) + abstract_processor = AbstractProcessor(self.api_client, self.console) + abstract = abstract_processor.get_abstract(self.doi, data, license_info) + citation_builder = CitationBuilder(data, self.doi, self.pi_finder, self.ror) + + authors, corresponding_authors = citation_builder.build_authors() + + author_fields: list[ + list[PrimitiveMetadataField | ControlledVocabularyMetadataField] + ] = [] + corresponding_author_fields: list[list[PrimitiveMetadataField]] = [] + for author in authors: + author_fields.append(author.author_fields()) + + if not corresponding_authors: + self.console.print( + f"{self.ICONS['warning']} No corresponding authors explicitly declared; PIs are used as a fallback!", + style="warning", + ) + pis = self._get_involved_pis(data) + corresponding_authors: list[Person] = [] + for pi in pis: + corresponding_authors.append(pi) + + for corresponding_author in corresponding_authors: + corresponding_author_fields.append( + corresponding_author.dataset_contact_fields() + ) + + description = self._build_description(data, abstract) + + grants = citation_builder.build_grants() + + return_dict: dict[str, Any] = { + "datasetVersion": { + "metadataBlocks": { + "citation": { + "fields": [ + PrimitiveMetadataField( + "title", False, data.get("title", "") + ).to_dict(), + PrimitiveMetadataField( + "distributionDate", + False, + data.get("publication_date", ""), + ).to_dict(), + CompoundMetadataField( + "otherId", True, citation_builder.build_other_ids() + ).to_dict(), + CompoundMetadataField( + "dsDescription", + True, + [ + [ + PrimitiveMetadataField( + "dsDescriptionValue", False, description + ) + ] + ], + ).to_dict(), + ControlledVocabularyMetadataField( + "subject", + True, + SubjectMapper.map_subjects([self.default_subject]), + ).to_dict(), + CompoundMetadataField( + "topicClassification", + True, + citation_builder.build_topics(), + ).to_dict(), + CompoundMetadataField( + "keyword", True, citation_builder.build_keywords() + ).to_dict(), + PrimitiveMetadataField( + "depositor", + False, + self.depositor + or data.get("primary_location", {}) + .get("source", {}) + .get("display_name", ""), + ).to_dict(), + PrimitiveMetadataField( + "alternativeURL", False, f"https://doi.org/{self.doi}" + ).to_dict(), + CompoundMetadataField( + "author", True, author_fields + ).to_dict(), + CompoundMetadataField( + "datasetContact", True, corresponding_author_fields + ).to_dict(), + CompoundMetadataField( + "grantNumber", True, grants + ).to_dict(), + ], + "displayName": "Citation Metadata", + } + }, + "files": [], + } + } + + if license_info.name: + return_dict["datasetVersion"]["license"] = { + "name": license_info.name, + "uri": license_info.uri, + } + else: + return_dict["datasetVersion"]["termsOfUse"] = ( + f"All rights reserved. Copyright © {self._get_publication_year(data)}, [TODO: Insert copyright holder here!]" + ) + + return return_dict + + def _build_description(self, data: dict[str, Any], abstract) -> str: + """ + Build the description field by combining a header and the abstract. + + Args: + data (dict[str, Any]): The metadata. + abstract: The abstract object. + + Returns: + str: The full description. + """ + head = self._build_description_head(data) + return f"{head}{abstract.text}" + + def _build_description_head(self, data: dict[str, Any]) -> str: + """ + Build the header for the description based on publication details. + + Args: + data (dict[str, Any]): The metadata. + + Returns: + str: The HTML header string. + """ + journal = data.get("primary_location", {}).get("source", {}).get("display_name") + publication_date = data.get("publication_date") + volume = data.get("biblio", {}).get("volume") + issue = data.get("biblio", {}).get("issue") + doc_type = data.get("type") + + if all([journal, publication_date, volume, issue, doc_type]): + return f"

      This {doc_type} was published on {publication_date} in {journal} {volume}({issue})

      " + elif all([journal, publication_date, doc_type]): + return f"

      This {doc_type} was published on {publication_date} in {journal}

      " + + self.console.print( + f"{self.ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)", + style="warning", + ) + return "" + + def _get_publication_year(self, data: dict[str, Any]) -> str | int: + """ + Extract the publication year from the metadata. + + Args: + data (dict[str, Any]): The metadata. + + Returns: + str | int: The publication year or empty string. + """ + return data.get("publication_year", "") + + def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]: + """ + Identify involved principal investigators from the metadata for use as fallback + corresponding authors. + + This method matches authors in the publication metadata against the configured + PIs and returns matching PIs. It is used as a fallback when no corresponding + authors are explicitly declared in the publication metadata. + + Args: + data (dict[str, Any]): The metadata from OpenAlex. + + Returns: + list[Person]: List of matching PIs for use as corresponding authors. + """ + involved_pis: list[Person] = [] + authors_in_publication = [] + + # Build list of authors from publication + for authorship in data.get("authorships", []): + author = authorship.get("author", {}) + if not author: + continue + + display_name = author.get("display_name", "") + given_name, family_name = NameProcessor.split_name(display_name) + + person = Person(family_name, given_name) + if orcid := author.get("orcid"): + person.orcid = orcid + + authors_in_publication.append(person) + + # Find PIs that match authors in the publication + involved_pis = self.pi_finder.find_by_orcid(authors_in_publication) + + return involved_pis + + def _save_output(self, metadata: dict[str, Any]) -> None: + """ + Save the generated metadata to a file or print it to the console. + + Args: + metadata (dict[str, Any]): The metadata to save. + """ + if self.output_path: + try: + # Custom JSON encoder to handle custom objects + class CustomEncoder(json.JSONEncoder): + """ + Custom JSON encoder that handles objects with to_dict method. + + This allows for proper serialization of custom classes like + Institution and Person by calling their to_dict method when + available. + + Args: + o: The object to serialize. + + Returns: + A JSON-serializable representation of the object. + """ + + def default(self, o: Any) -> Any: + if hasattr(o, "to_dict"): + return o.to_dict() + return super().default(o) + + with open(self.output_path, "w", encoding="utf-8") as f: + json.dump( + metadata, f, indent=4, ensure_ascii=False, cls=CustomEncoder + ) + self.console.print( + f"{self.ICONS['save']} Metadata saved in: {self.output_path}", + style="info", + ) + except Exception as e: + self.console.print( + f"{self.ICONS['error']} Error saving metadata: {str(e)}\n", + style="error", + ) + raise + else: + self.console.print(metadata) diff --git a/doi2dataset/processing/utils.py b/doi2dataset/processing/utils.py new file mode 100644 index 0000000..3f1dd2b --- /dev/null +++ b/doi2dataset/processing/utils.py @@ -0,0 +1,289 @@ +""" +Processing utilities for doi2dataset. + +This module contains utility classes and functions used for processing +names, finding PIs, mapping subjects, and other business logic operations. +""" + +import unicodedata +import warnings +from typing import Any + +from ..core.models import Person + +# Suppress warnings from idutils +warnings.filterwarnings( + "ignore", message=".*pkg_resources.*", category=DeprecationWarning +) +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + from idutils.normalizers import normalize_orcid + + +class NameProcessor: + """ + Provides utility methods for processing names. + """ + + @staticmethod + def normalize_string(s: str) -> str: + """ + Normalize a string using Unicode NFKD normalization and convert to ASCII. + + Args: + s (str): The string to normalize. + + Returns: + str: The normalized string. + """ + return ( + unicodedata.normalize("NFKD", s.lower()) + .encode("ASCII", "ignore") + .decode("ASCII") + ) + + @staticmethod + def split_name(full_name: str) -> tuple[str, str]: + """ + Split a full name into given and family names. + + Args: + full_name (str): The full name (e.g., "Doe, John" or "John Doe"). + + Returns: + tuple[str, str]: A tuple (given_name, family_name). + """ + if "," in full_name: + surname, given_name = full_name.split(",", 1) + return given_name.strip(), surname.strip() + + parts = full_name.strip().split() + if len(parts) == 1: + return "", parts[0] + + return " ".join(parts[:-1]), parts[-1] + + +class PIFinder: + """ + Finds principal investigators (PIs) among a list of Person objects. + """ + + def __init__(self, pis: list[Person]) -> None: + """ + Initialize with a list of Person objects representing potential PIs. + + Args: + pis (list[Person]): List of Person objects. + """ + self.pis = pis + + def find_by_orcid(self, authors: list[Person]) -> list[Person]: + """ + Find PIs by ORCID identifier among the authors. + + Args: + authors (list[Person]): List of author Person objects. + + Returns: + list[Person]: List of Person objects that are PIs based on ORCID matching. + """ + if not self.pis or not authors: + return [] + + pi_orcids = {pi.orcid for pi in self.pis if pi.orcid} + if not pi_orcids: + return [] + + return [author for author in authors if author.orcid in pi_orcids] + + def find_corresponding_authors(self, authors: list[Person]) -> list[Person]: + """ + Find corresponding authors by checking for email addresses and PI matching. + + Args: + authors (list[Person]): List of author Person objects. + + Returns: + list[Person]: List of corresponding authors. + """ + # First, try to find authors with email addresses + authors_with_email = [author for author in authors if author.email] + + if authors_with_email: + # If we have PIs configured, prefer PI matches + pi_matches = self.find_by_orcid(authors_with_email) + if pi_matches: + return pi_matches + + # Otherwise return all authors with email addresses + return authors_with_email + + # Fallback: look for PI matches even without email + pi_matches = self.find_by_orcid(authors) + if pi_matches: + return pi_matches + + # Last resort: return first author if no other criteria match + return authors[:1] if authors else [] + + def find_pi( + self, + family_name: str | None = None, + given_name: str | None = None, + orcid: str | None = None, + ) -> Person | None: + """ + Find a PI by name and/or ORCID. + + Args: + family_name (str | None): Family name to match. + given_name (str | None): Given name to match. + orcid (str | None): ORCID to match. + + Returns: + Person | None: The matched PI or None. + """ + if orcid: + return self._find_by_orcid(orcid) + + # Fallback to name matching if no ORCID + for person in self.pis: + name_match = True + if family_name and person.family_name.lower() != family_name.lower(): + name_match = False + if given_name and person.given_name.lower() != given_name.lower(): + name_match = False + if name_match: + return person + + return None + + def _find_by_orcid(self, orcid: str) -> Person | None: + """ + Find a PI by ORCID. + + Args: + orcid (str): Normalized ORCID. + + Returns: + Person | None: The matched PI or None. + """ + try: + normalized_orcid = normalize_orcid(orcid) + for person in self.pis: + if person.orcid and normalize_orcid(person.orcid) == normalized_orcid: + return person + except Exception: + # If ORCID normalization fails, try direct string comparison + for person in self.pis: + if person.orcid == orcid: + return person + return None + + +class SubjectMapper: + """ + Maps subject names from input data to controlled vocabulary. + """ + + CONTROLLED_VOCAB = { + "Agricultural Sciences": "Agricultural Sciences", + "Arts and Humanities": "Arts and Humanities", + "Astronomy": "Astronomy and Astrophysics", + "Astrophysics": "Astronomy and Astrophysics", + "Business": "Business and Management", + "Business and Management": "Business and Management", + "Chemistry": "Chemistry", + "Computer Science": "Computer and Information Science", + "Computer and Information Science": "Computer and Information Science", + "Earth Sciences": "Earth and Environmental Sciences", + "Earth and Environmental Sciences": "Earth and Environmental Sciences", + "Engineering": "Engineering", + "Law": "Law", + "Life Sciences": "Medicine, Health and Life Sciences", + "Mathematical Sciences": "Mathematical Sciences", + "Mathematics": "Mathematical Sciences", + "Medicine": "Medicine, Health and Life Sciences", + "Medicine, Health and Life Sciences": "Medicine, Health and Life Sciences", + "Physics": "Physics", + "Psychology": "Psychology", + "Social Sciences": "Social Sciences", + "Other": "Other", + } + + @classmethod + def map_subjects(cls, subjects: list[str]) -> list[str]: + """ + Map a list of subject strings to controlled vocabulary terms. + + Args: + subjects (list[str]): List of subject strings to map. + + Returns: + list[str]: List of mapped controlled vocabulary terms. + """ + mapped = [] + for subject in subjects: + # Try exact match first + if subject in cls.CONTROLLED_VOCAB: + mapped_subject = cls.CONTROLLED_VOCAB[subject] + if mapped_subject not in mapped: + mapped.append(mapped_subject) + else: + # Try partial matching + subject_lower = subject.lower() + for key, value in cls.CONTROLLED_VOCAB.items(): + if ( + subject_lower in key.lower() + or key.lower() in subject_lower + and value not in mapped + ): + mapped.append(value) + break + else: + # No match found, add "Other" if not already present + if "Other" not in mapped: + mapped.append("Other") + + return mapped if mapped else ["Other"] + + @classmethod + def map_single_subject(cls, subject: str) -> str: + """ + Map a single subject string to a controlled vocabulary term. + + Args: + subject (str): Subject string to map. + + Returns: + str: Mapped controlled vocabulary term. + """ + mapped_subjects = cls.map_subjects([subject]) + return mapped_subjects[0] if mapped_subjects else "Other" + + @classmethod + def get_subjects( + cls, data: dict[str, Any], fallback_subject: str = "Other" + ) -> list[str]: + """ + Extract and map subjects from input data. + + Args: + data (dict[str, Any]): The input metadata. + fallback_subject (str): Fallback subject if none found. + + Returns: + list[str]: List of mapped subject names. + """ + + topics = data.get("topics", []) + subject_collection: list[str] = [] + + for topic in topics: + for field_type in ["subfield", "field", "domain"]: + if field_name := topic.get(field_type, {}).get("display_name"): + subject_collection.append(field_name) + + mapped_subjects = cls.map_subjects(subject_collection) + return mapped_subjects if mapped_subjects else [fallback_subject] diff --git a/doi2dataset/utils/__init__.py b/doi2dataset/utils/__init__.py new file mode 100644 index 0000000..e9fa8c7 --- /dev/null +++ b/doi2dataset/utils/__init__.py @@ -0,0 +1,22 @@ +""" +Utility functions and helpers for doi2dataset. + +This package contains validation functions, string processing utilities, +and other helper functions used throughout the application. +""" + +from .validation import ( + normalize_string, + sanitize_filename, + split_name, + validate_doi, + validate_email_address, +) + +__all__ = [ + "validate_doi", + "validate_email_address", + "sanitize_filename", + "split_name", + "normalize_string", +] diff --git a/doi2dataset/utils/validation.py b/doi2dataset/utils/validation.py new file mode 100644 index 0000000..f336351 --- /dev/null +++ b/doi2dataset/utils/validation.py @@ -0,0 +1,127 @@ +""" +Validation utilities for doi2dataset. + +This module provides validation functions for DOIs, email addresses, +and other data validation needs. +""" + +import warnings + +import dns.resolver +from email_validator import EmailNotValidError, validate_email + +# Suppress the warning from idutils about pkg_resources +warnings.filterwarnings( + "ignore", message=".*pkg_resources.*", category=DeprecationWarning +) +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + from idutils.validators import is_doi + + +def validate_doi(doi: str) -> bool: + """ + Validate a DOI using the idutils library. + + Args: + doi (str): The DOI to validate. + + Returns: + bool: True if the DOI is valid, False otherwise. + """ + return is_doi(doi) + + +def validate_email_address(email: str) -> bool: + """ + Validate an email address and ensure its domain has an MX record. + + Args: + email (str): The email address to validate. + + Returns: + bool: True if the email address is valid and its domain resolves, otherwise False. + """ + try: + # Basic validation + valid = validate_email(email) + email = valid.normalized + + # Check domain has MX record + domain = email.split("@")[1] + dns.resolver.resolve(domain, "MX") + + return True + except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): + return False + + +def sanitize_filename(doi: str) -> str: + """ + Convert DOI to a valid filename using only alphanumeric characters and underscores. + + Args: + doi (str): The DOI to sanitize. + + Returns: + str: Sanitized filename string. + """ + # Replace non-alphanumeric characters with underscores + sanitized = "".join(c if c.isalnum() else "_" for c in doi) + # Remove consecutive underscores + while "__" in sanitized: + sanitized = sanitized.replace("__", "_") + # Remove leading/trailing underscores + return sanitized.strip("_") + + +def split_name(full_name: str) -> tuple[str, str]: + """ + Split a full name into given and family names. + + Args: + full_name (str): The full name (e.g., "Doe, John" or "John Doe"). + + Returns: + tuple[str, str]: A tuple (given_name, family_name). + """ + normalized = normalize_string(full_name) + + if "," in normalized: + # Format: "Doe, John" + parts = normalized.split(",", 1) + family_name = parts[0].strip() + given_name = parts[1].strip() + else: + # Format: "John Doe" - assume last word is family name + parts = normalized.split() + if len(parts) == 1: + # Only one name provided + given_name = parts[0] + family_name = "" + else: + given_name = " ".join(parts[:-1]) + family_name = parts[-1] + + return given_name, family_name + + +def normalize_string(s: str) -> str: + """ + Normalize a string using Unicode NFKD normalization and convert to ASCII. + + Args: + s (str): The string to normalize. + + Returns: + str: Normalized string. + """ + import unicodedata + + # Normalize Unicode characters to decomposed form + normalized = unicodedata.normalize("NFKD", s) + + # Convert to ASCII, ignoring non-ASCII characters + ascii_str = normalized.encode("ascii", "ignore").decode("ascii") + + return ascii_str.strip()