refactor: transform monolith into modular package

- Extract 2,100+ line monolithic file into focused modules
- Create proper package structure with core, api, processing, utils
- Maintain 100% backward compatibility for all imports
- All 38 tests passing with improved coverage (67.19%)

Package structure:
- core/: Configuration, models, and metadata field definitions
- api/: HTTP client and external API processors
- processing/: Business logic for citations and metadata processing
- utils/: Validation and utility functions

Extracted classes:
- Config, ConfigData → core/config.py
- Person, Institution, License, Abstract → core/models.py
- MetadataField classes → core/metadata_fields.py
- APIClient → api/client.py
- AbstractProcessor, LicenseProcessor → api/processors.py
- CitationBuilder → processing/citation.py
- MetadataProcessor → processing/metadata.py
- NameProcessor, PIFinder, SubjectMapper → processing/utils.py
- Validation functions → utils/validation.py

Benefits achieved:
- Improved maintainability with clear separation of concerns
- Better testing capabilities with isolated components
- Enhanced development experience with modular imports
- Foundation for future scalability and plugin architecture
This commit is contained in:
Alexander Minges 2025-07-22 11:03:31 +02:00
parent da3a256848
commit b6209691c3
Signed by: Athemis
SSH key fingerprint: SHA256:TUXshgulbwL+FRYvBNo54pCsI0auROsSEgSvueKbkZ4
14 changed files with 2253 additions and 0 deletions

98
doi2dataset/__init__.py Normal file
View file

@ -0,0 +1,98 @@
"""
doi2dataset: A tool to process DOIs and generate metadata for Dataverse datasets.
This package provides functionality to:
- Validate and process DOIs
- Fetch metadata from external APIs (OpenAlex, CrossRef)
- Generate Dataverse-compatible metadata
- Upload datasets to Dataverse instances
The package is organized into several modules:
- core: Configuration, models, and metadata field definitions
- api: API clients and processors
- processing: Business logic for citation building and metadata processing
- utils: Validation and utility functions
"""
# Version information
try:
# Try to get version from setuptools_scm first (modern approach)
from importlib.metadata import version
__version__ = version("doi2dataset")
except ImportError:
# Fallback for older Python versions
try:
import pkg_resources
__version__ = pkg_resources.get_distribution("doi2dataset").version
except Exception:
__version__ = "1.0.0" # Fallback version
# Import main functionality for convenience
from .api import (
AbstractProcessor,
APIClient,
LicenseProcessor,
)
from .core import (
Abstract,
BaseMetadataField,
CompoundMetadataField,
Config,
ConfigData,
ControlledVocabularyMetadataField,
FieldType,
Institution,
License,
Person,
PrimitiveMetadataField,
)
from .processing import (
CitationBuilder,
MetadataProcessor,
NameProcessor,
PIFinder,
SubjectMapper,
)
from .utils import (
normalize_string,
sanitize_filename,
split_name,
validate_doi,
validate_email_address,
)
__all__ = [
# Version
"__version__",
# API components
"APIClient",
"AbstractProcessor",
"LicenseProcessor",
# Core classes
"Config",
"ConfigData",
"Person",
"Institution",
"License",
"Abstract",
# Metadata fields
"BaseMetadataField",
"PrimitiveMetadataField",
"ControlledVocabularyMetadataField",
"CompoundMetadataField",
"FieldType",
# Processing components
"CitationBuilder",
"MetadataProcessor",
"NameProcessor",
"PIFinder",
"SubjectMapper",
# Utilities
"validate_doi",
"validate_email_address",
"sanitize_filename",
"split_name",
"normalize_string",
]

View file

@ -0,0 +1,15 @@
"""
API components for doi2dataset.
This package contains HTTP client functionality and processors for interacting
with external APIs such as OpenAlex, CrossRef, and Dataverse.
"""
from .client import APIClient
from .processors import AbstractProcessor, LicenseProcessor
__all__ = [
"APIClient",
"AbstractProcessor",
"LicenseProcessor",
]

92
doi2dataset/api/client.py Normal file
View file

@ -0,0 +1,92 @@
"""
API client for external service interactions.
This module provides a generic HTTP client for making requests to external APIs
like OpenAlex, CrossRef, and Dataverse with proper error handling and headers.
"""
from typing import Any
import requests
class APIClient:
"""
Client for making HTTP requests to external APIs.
Attributes:
session (requests.Session): The underlying requests session.
"""
def __init__(
self,
contact_mail: str | None = None,
user_agent: str = "doi2dataset/2.0",
token: str | None = None,
) -> None:
"""
Initialize the API client with optional contact mail, user agent, and token.
Args:
contact_mail (str | None): Contact email address.
user_agent (str): User agent string.
token (str | None): Optional API token.
"""
self.session = requests.Session()
self._set_headers(contact_mail, user_agent, token)
def _set_headers(
self, contact_mail: str | None, user_agent: str, token: str | None
) -> None:
"""
Set HTTP headers for the session based on contact email and token.
Args:
contact_mail (str | None): Contact email address.
user_agent (str): User agent string.
token (str | None): Optional API token.
"""
if contact_mail:
header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"}
else:
header = {"User-Agent": user_agent}
if token:
header["X-Dataverse-key"] = token
self.session.headers.update(header)
def make_request(
self, url: str, method: str = "GET", **kwargs: Any
) -> requests.Response | None:
"""
Make an HTTP request and return the response.
Args:
url (str): The URL to request.
method (str): HTTP method to use (default: GET).
**kwargs: Additional arguments for requests.request.
Returns:
requests.Response | None: The HTTP response, or None if the request failed.
"""
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException:
# Log error - in a refactored version this should use proper logging
# For now, return None and let caller handle the error
return None
def close(self) -> None:
"""Close the session."""
self.session.close()
def __enter__(self) -> "APIClient":
"""Context manager entry."""
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Context manager exit."""
self.close()

View file

@ -0,0 +1,230 @@
"""
API processors for doi2dataset.
This module contains processors for handling specific types of data from external APIs,
including license processing and abstract extraction/cleaning.
"""
import re
from typing import Any
from rich.console import Console
from ..core.models import Abstract, License
class LicenseProcessor:
"""
Processes license information from metadata.
"""
LICENSE_MAP = {
"cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"),
"cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"),
"cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"),
"cc-by-nc-sa": (
"https://creativecommons.org/licenses/by-nc-sa/4.0/",
"CC BY-NC-SA 4.0",
),
"cc-by-nc-nd": (
"https://creativecommons.org/licenses/by-nc-nd/4.0/",
"CC BY-NC-ND 4.0",
),
"cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"),
"cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"),
"pd": (
"https://creativecommons.org/publicdomain/mark/1.0/",
"Public Domain Mark 1.0",
),
}
@classmethod
def process_license(cls, data: dict[str, Any]) -> License:
"""
Process and return license information based on input data.
Args:
data (dict[str, Any]): Input data containing license info.
Returns:
License: Processed license information.
"""
location = data.get("primary_location", {})
license_short = location.get("license", "")
if not license_short:
return License(name="", uri="", short="unknown")
base_license = license_short.split("/")[0].lower()
uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short))
return License(name=name, uri=uri, short=license_short)
class AbstractProcessor:
"""
Retrieves and processes abstracts from CrossRef and OpenAlex.
"""
# Icons for console output - TODO: should be moved to a constants module
ICONS = {"info": "", "warning": "⚠️", "error": ""}
def __init__(self, api_client, console: Console | None = None):
"""
Initialize with an APIClient instance.
Args:
api_client: The API client to use for requests.
console (Console | None): Rich console instance for output.
"""
self.api_client = api_client
self.console = console or Console()
def get_abstract(
self, doi: str, data: dict[str, Any], license: License
) -> Abstract:
"""
Get an abstract based on DOI and license permissions.
Args:
doi (str): The DOI.
data (dict[str, Any]): Data retrieved from an external source.
license (License): License information.
Returns:
Abstract: The abstract with its source.
"""
license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"}
if license.short in license_ok:
self.console.print(
f"\n{self.ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.",
style="info",
)
crossref_abstract = self._get_crossref_abstract(doi)
if crossref_abstract:
return Abstract(text=crossref_abstract, source="crossref")
else:
self.console.print(
f"\n{self.ICONS['warning']} No abstract found in CrossRef!",
style="warning",
)
else:
if license.name:
self.console.print(
f"\n{self.ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!",
style="info",
)
else:
self.console.print(
f"\n{self.ICONS['info']} Custom license does not allow derivative works. Reconstructing abstract from OpenAlex!",
style="info",
)
openalex_abstract = self._get_openalex_abstract(data)
if openalex_abstract:
return Abstract(text=openalex_abstract, source="openalex")
else:
self.console.print(
f"\n{self.ICONS['warning']} No abstract found in OpenAlex!",
style="warning",
)
self.console.print(
f"\n{self.ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!",
style="warning",
)
return Abstract(text="", source="none")
def _get_crossref_abstract(self, doi: str) -> str | None:
"""
Retrieve abstract from CrossRef API.
Args:
doi (str): The DOI.
Returns:
str | None: The abstract if found, otherwise None.
"""
url = f"https://api.crossref.org/works/{doi}"
response = self.api_client.make_request(url)
if response and response.status_code == 200:
abstract_raw = response.json().get("message", {}).get("abstract")
return self._clean_jats(abstract_raw)
return None
def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None:
"""
Retrieve abstract from OpenAlex data.
Args:
data (dict[str, Any]): Data from OpenAlex.
Returns:
str | None: The reconstructed abstract, or None if not available.
"""
inv_index = data.get("abstract_inverted_index")
if not inv_index:
return None
word_positions = [
(word, pos) for word, positions in inv_index.items() for pos in positions
]
sorted_words = sorted(word_positions, key=lambda x: x[1])
return " ".join(word for word, _ in sorted_words)
def _clean_jats(self, text: str | None) -> str:
"""
Clean JATS XML tags in the abstract and convert them to HTML tags.
Args:
text (str | None): The raw abstract text containing JATS tags.
Returns:
str: The cleaned abstract text.
"""
if not text:
return ""
# Handle list tags with sequential processing to avoid duplicate keys
# Process ordered lists first - replace both opening and closing tags
text = text.replace('<jats:list list-type="order">', "<ol>")
# Find and replace closing tags for ordered lists
# This regex matches </jats:list> that comes after <ol> tags
pattern = r"(<ol>.*?)</jats:list>"
text = re.sub(pattern, r"\1</ol>", text, flags=re.DOTALL)
# Process unordered lists second
text = text.replace('<jats:list list-type="bullet">', "<ul>")
# Replace remaining </jats:list> tags as unordered list closings
text = text.replace("</jats:list>", "</ul>")
# Handle other JATS tags
replacements = {
"<jats:italic>": "<i>",
"</jats:italic>": "</i>",
"<jats:bold>": "<b>",
"</jats:bold>": "</b>",
"<jats:sup>": "<sup>",
"</jats:sup>": "</sup>",
"<jats:sub>": "<sub>",
"</jats:sub>": "</sub>",
"<jats:underline>": "<u>",
"</jats:underline>": "</u>",
"<jats:monospace>": "<code>",
"</jats:monospace>": "</code>",
"<jats:sc>": "<small>",
"</jats:sc>": "</small>",
"<jats:p>": "<p>",
"</jats:p>": "</p>",
"<jats:title>": "<h2>",
"</jats:title>": "</h2>",
"<jats:list-item>": "<li>",
"</jats:list-item>": "</li>",
"<jats:blockquote>": "<blockquote>",
"</jats:blockquote>": "</blockquote>",
}
for jats_tag, html_tag in replacements.items():
text = text.replace(jats_tag, html_tag)
return text

View file

@ -0,0 +1,34 @@
"""
Core components for doi2dataset.
This package contains the fundamental classes and utilities used throughout
the application, including configuration management, data models, and
metadata field definitions.
"""
from .config import Config, ConfigData
from .metadata_fields import (
BaseMetadataField,
CompoundMetadataField,
ControlledVocabularyMetadataField,
FieldType,
PrimitiveMetadataField,
)
from .models import Abstract, Institution, License, Person
__all__ = [
# Configuration
"Config",
"ConfigData",
# Models
"Person",
"Institution",
"License",
"Abstract",
# Metadata fields
"BaseMetadataField",
"PrimitiveMetadataField",
"ControlledVocabularyMetadataField",
"CompoundMetadataField",
"FieldType",
]

173
doi2dataset/core/config.py Normal file
View file

@ -0,0 +1,173 @@
"""
Configuration management for doi2dataset.
This module provides configuration loading and management with support for
environment variable overrides for sensitive credentials.
"""
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
from ..utils.validation import validate_email_address
@dataclass
class ConfigData:
"""
Represents configuration data loaded from a YAML file with environment variable overrides.
The dataverse configuration may be overridden by environment variables:
DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE,
DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD.
Attributes:
dataverse (dict[str, str]): Dataverse-related configuration with environment
variable overrides applied.
pis (list[dict[str, Any]]): List of principal investigator configurations.
default_grants (list[dict[str, str]]): Default grant configurations.
"""
dataverse: dict[str, str]
pis: list[dict[str, Any]]
default_grants: list[dict[str, str]]
class Config:
"""
Singleton class to handle configuration loading and retrieval.
Supports environment variable overrides for Dataverse configuration:
- DATAVERSE_URL: Overrides dataverse.url
- DATAVERSE_API_TOKEN: Overrides dataverse.api_token
- DATAVERSE_DATAVERSE: Overrides dataverse.dataverse
- DATAVERSE_AUTH_USER: Overrides dataverse.auth_user
- DATAVERSE_AUTH_PASSWORD: Overrides dataverse.auth_password
Environment variables take precedence over config file values.
"""
_instance: "Config | None" = None
_config_data: ConfigData | None = None
def __new__(cls) -> "Config":
"""
Create and return the singleton instance of Config.
Returns:
Config: The singleton instance.
"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def load_config(cls, config_path: str | Path | None = None) -> None:
"""
Load configuration from a YAML file with environment variable overrides.
Environment variables will override corresponding config file values:
DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE,
DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD
Args:
config_path (str | Path | None): Path to the configuration file.
If None, the default config.yaml in the project root is used.
Raises:
FileNotFoundError: If the configuration file does not exist.
ValueError: If any PI email address is invalid.
"""
if config_path is None:
# Look for config.yaml in the project root (two levels up from this file)
config_path = Path(__file__).parent.parent.parent / "config.yaml"
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, encoding="utf-8") as f:
config_data = yaml.safe_load(f)
# Override dataverse config with environment variables if they exist
dataverse_config = config_data.get("dataverse", {})
# Check for environment variables and override config values
env_overrides = {
"url": os.getenv("DATAVERSE_URL"),
"api_token": os.getenv("DATAVERSE_API_TOKEN"),
"dataverse": os.getenv("DATAVERSE_DATAVERSE"),
"auth_user": os.getenv("DATAVERSE_AUTH_USER"),
"auth_password": os.getenv("DATAVERSE_AUTH_PASSWORD"),
}
# Apply environment variable overrides if they exist
for key, env_value in env_overrides.items():
if env_value is not None:
dataverse_config[key] = env_value
# Validate PI email addresses
pis = config_data.get("pis", [])
for pi in pis:
if email := pi.get("email"):
if not validate_email_address(email):
raise ValueError(
f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}"
)
cls._config_data = ConfigData(
dataverse=dataverse_config,
pis=config_data.get("pis", []),
default_grants=config_data.get("default_grants", []),
)
@classmethod
def get_config(cls) -> ConfigData:
"""
Retrieve the loaded configuration data.
Returns:
ConfigData: The configuration data.
Raises:
RuntimeError: If the configuration could not be loaded.
"""
if cls._config_data is None:
cls.load_config()
if cls._config_data is None:
raise RuntimeError("Failed to load configuration")
return cls._config_data
@property
def PIS(self) -> list[dict[str, Any]]:
"""
Get PI configurations.
Returns:
list[dict[str, Any]]: List of PI configurations.
"""
return self.get_config().pis
@property
def DEFAULT_GRANTS(self) -> list[dict[str, str]]:
"""
Get default grant configurations.
Returns:
list[dict[str, str]]: List of default grants.
"""
return self.get_config().default_grants
@property
def DATAVERSE(self) -> dict[str, str]:
"""
Get Dataverse configurations with environment variable overrides applied.
Returns:
dict[str, str]: Dataverse configuration with environment variables
taking precedence over config file values.
"""
return self.get_config().dataverse

View file

@ -0,0 +1,168 @@
"""
Metadata field classes for Dataverse integration.
This module provides the base classes and implementations for different types
of metadata fields used in Dataverse dataset creation.
"""
from collections.abc import Sequence
from dataclasses import dataclass, field
from enum import Enum
from functools import reduce
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
pass
class FieldType(Enum):
"""Enum representing different Dataverse field types."""
PRIMITIVE = "primitive"
COMPOUND = "compound"
VOCABULARY = "controlledVocabulary"
@dataclass
class BaseMetadataField[T]:
"""
Base class for Dataverse metadata fields.
This class defines a metadata field with a name, a value of type T, and
a flag indicating whether multiple values are allowed. It serves as
a template for specific metadata field implementations.
Attributes:
name (str): The name of the metadata field.
multiple (bool): Indicates whether multiple values are allowed.
value (T): The value stored in the field.
type (FieldType): The type of the field, automatically set based on T.
"""
name: str
multiple: bool
value: T
type: FieldType = field(init=False)
expanded_value: dict[str, str] | None = field(default=None)
def __post_init__(self) -> None:
"""
After initialization, determine the field type by calling the _set_type method.
"""
self._set_type()
def _set_type(self) -> None:
"""
Set the `type` attribute based on the field's value.
This method must be implemented by subclasses.
Raises:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement the _set_type method.")
def to_dict(self) -> dict[str, Any]:
"""
Convert the metadata field to a dictionary representation.
Returns:
dict[str, Any]: Dictionary representation of the metadata field.
Raises:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement the to_dict method.")
@dataclass
class PrimitiveMetadataField(BaseMetadataField[str]):
"""
Metadata field representing a primitive type (e.g., string) for Dataverse.
"""
def _set_type(self) -> None:
self.type = FieldType.PRIMITIVE
def to_dict(self) -> dict[str, str | bool | dict[str, str]]:
"""
Convert the primitive metadata field to a dictionary representation.
Returns:
dict[str, str | bool]: Dictionary with field properties.
"""
if self.expanded_value:
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
"expandedValue": self.expanded_value,
}
else:
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
}
@dataclass
class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]):
"""
Metadata field for controlled vocabulary values.
"""
def _set_type(self) -> None:
self.type = FieldType.VOCABULARY
def to_dict(self) -> dict[str, Any]:
"""
Convert the controlled vocabulary metadata field to a dictionary.
Returns:
dict[str, Any]: Dictionary representation.
"""
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
}
@dataclass
class CompoundMetadataField(
BaseMetadataField[
Sequence[Sequence["PrimitiveMetadataField | ControlledVocabularyMetadataField"]]
]
):
"""
Metadata field representing compound types, composed of multiple subfields.
"""
def _set_type(self) -> None:
self.type = FieldType.COMPOUND
def to_dict(self) -> dict[str, Any]:
"""
Convert the compound metadata field to a dictionary representation.
Returns:
dict[str, Any]: Dictionary representation of the compound field.
"""
value_list: list[dict[str, Any]] = []
for outer_list in self.value:
field_dicts: list[dict[str, Any]] = []
for field_item in outer_list:
field_dicts.append({field_item.name: field_item.to_dict()})
value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts))
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": value_list,
}

221
doi2dataset/core/models.py Normal file
View file

@ -0,0 +1,221 @@
"""
Core data models for doi2dataset.
This module contains the fundamental data classes used throughout the application
for representing people, institutions, licenses, and abstracts.
"""
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .metadata_fields import (
ControlledVocabularyMetadataField,
PrimitiveMetadataField,
)
@dataclass
class Institution:
"""
Represents an institution or organization.
Attributes:
display_name (str): The name of the institution.
ror (str): Research Organization Registry identifier (optional).
"""
display_name: str
ror: str = ""
def affiliation_field(self) -> "PrimitiveMetadataField":
"""
Create a metadata field for the affiliation.
Returns:
PrimitiveMetadataField: A metadata field representing the institution,
using ROR ID when available.
"""
from .metadata_fields import PrimitiveMetadataField
if self.ror:
expanded_value = {
"scheme": "http://www.grid.ac/ontology/",
"termName": self.display_name,
"@type": "https://schema.org/Organization",
}
return PrimitiveMetadataField(
"authorAffiliation", False, self.ror, expanded_value=expanded_value
)
else:
return PrimitiveMetadataField("authorAffiliation", False, self.display_name)
@dataclass
class Person:
"""
Represents a person (e.g., an author or a PI).
Attributes:
family_name (str): Family name of the person.
given_name (str): Given name of the person.
orcid (str): ORCID identifier (optional).
email (str): Email address (optional).
affiliation (Institution): Affiliation of the person (optional).
"""
family_name: str
given_name: str
orcid: str = ""
email: str = ""
affiliation: Institution | str = ""
def to_dict(self) -> dict[str, str | list[str] | dict[str, str]]:
"""
Convert Person to a dictionary for JSON serialization.
Handles affiliations properly by checking if the affiliation
is an Institution object or a string.
Returns:
dict: A dictionary containing the person's information including
name, contact details, and affiliation.
"""
return_dict: dict[str, str | list[str] | dict[str, str]] = {
"family_name": self.family_name,
"given_name": self.given_name,
"orcid": self.orcid,
"email": self.email,
}
if isinstance(self.affiliation, Institution):
if self.affiliation.ror:
return_dict["affiliation"] = self.affiliation.ror
elif self.affiliation.display_name:
return_dict["affiliation"] = self.affiliation.display_name
else:
return_dict["affiliation"] = ""
else:
return_dict["affiliation"] = self.affiliation if self.affiliation else ""
return return_dict
def format_name(self) -> str:
"""
Format the name in 'Family, Given' order.
Returns:
str: Formatted name.
"""
return f"{self.family_name}, {self.given_name}"
def author_fields(
self,
) -> list["PrimitiveMetadataField | ControlledVocabularyMetadataField"]:
"""
Build metadata fields for the author.
The method handles both Institution objects and string values for affiliations.
Different fields are generated depending on whether ORCID is available.
Returns:
list: List of metadata fields representing the author, including name,
affiliation, and optionally ORCID identifier information.
"""
from .metadata_fields import (
ControlledVocabularyMetadataField,
PrimitiveMetadataField,
)
affiliation_field = None
if isinstance(self.affiliation, Institution):
affiliation_field = self.affiliation.affiliation_field()
else:
affiliation_field = PrimitiveMetadataField(
"authorAffiliation", False, self.affiliation
)
if self.orcid:
return [
PrimitiveMetadataField("authorName", False, self.format_name()),
affiliation_field,
ControlledVocabularyMetadataField(
"authorIdentifierScheme", False, "ORCID"
),
PrimitiveMetadataField("authorIdentifier", False, self.orcid),
]
else:
return [
PrimitiveMetadataField("authorName", False, self.format_name()),
affiliation_field,
]
def dataset_contact_fields(self) -> list["PrimitiveMetadataField"]:
"""
Generate metadata fields for dataset contact.
The method handles both Institution objects and string values for affiliations.
Creates fields for the contact name, affiliation, and email address.
Returns:
list: List of metadata fields for the dataset contact including name,
affiliation, and email address.
"""
from .metadata_fields import PrimitiveMetadataField
affiliation_field = None
if isinstance(self.affiliation, Institution):
affiliation_field = self.affiliation.affiliation_field()
else:
affiliation_field = PrimitiveMetadataField(
"datasetContactAffiliation", False, self.affiliation
)
return [
PrimitiveMetadataField("datasetContactName", False, self.format_name()),
affiliation_field,
PrimitiveMetadataField("datasetContactEmail", False, self.email),
]
@dataclass
class License:
"""
Represents a license with name, URI, and short identifier.
Attributes:
name (str): The full name of the license.
uri (str): The license URI.
short (str): The short identifier of the license.
"""
name: str
uri: str
short: str
@dataclass
class Abstract:
"""
Represents an abstract with its text and source.
Attributes:
text (str): The abstract text.
source (str): The source of the abstract ('crossref', 'openalex', or 'none').
"""
text: str
source: str
def __post_init__(self):
"""
Validate that the abstract source is one of the allowed values.
Raises:
ValueError: If source is not one of the allowed values.
"""
allowed_sources = ["crossref", "openalex", "none"]
if self.source not in allowed_sources:
raise ValueError(
f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}."
)

View file

@ -0,0 +1,18 @@
"""
Processing components for doi2dataset.
This package contains the business logic components for processing DOIs,
building citations, processing metadata, and handling various data transformations.
"""
from .citation import CitationBuilder
from .metadata import MetadataProcessor
from .utils import NameProcessor, PIFinder, SubjectMapper
__all__ = [
"NameProcessor",
"PIFinder",
"SubjectMapper",
"CitationBuilder",
"MetadataProcessor",
]

View file

@ -0,0 +1,292 @@
"""
Citation processing for doi2dataset.
This module contains the CitationBuilder class which handles building various
citation-related metadata fields from API data.
"""
# Suppress the warning from idutils about pkg_resources
import warnings
from typing import Any
from ..core.config import Config
from ..core.metadata_fields import PrimitiveMetadataField
from ..core.models import Institution, Person
from ..processing.utils import NameProcessor, PIFinder
warnings.filterwarnings(
"ignore", message=".*pkg_resources.*", category=DeprecationWarning
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from idutils.normalizers import normalize_orcid, normalize_pmid
class CitationBuilder:
"""
Builds various citation-related metadata fields.
"""
def __init__(
self, data: dict[str, Any], doi: str, pi_finder: PIFinder, ror: bool = False
) -> None:
"""
Initialize the CitationBuilder with data, DOI, and a PIFinder.
Args:
data (dict[str, Any]): Metadata from an external source.
doi (str): The DOI.
pi_finder (PIFinder): Instance to find PI information.
ror (bool): Whether to use ROR identifiers for institutions.
"""
self.data = data
self.doi = doi
self.ror = ror
self.pi_finder = pi_finder
def build_other_ids(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for other identifiers (e.g., DOI, PMID).
Returns:
list[list[PrimitiveMetadataField]]: Nested list of identifier metadata fields.
"""
other_ids = [
[
PrimitiveMetadataField("otherIdAgency", False, "doi"),
PrimitiveMetadataField("otherIdValue", False, self.doi),
]
]
if pmid := self.data.get("ids", {}).get("pmid"):
try:
normalized_pmid = normalize_pmid(pmid)
other_ids.append(
[
PrimitiveMetadataField("otherIdAgency", False, "pmid"),
PrimitiveMetadataField("otherIdValue", False, normalized_pmid),
]
)
except ValueError:
pass
return other_ids
def build_grants(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for grants.
Returns:
list[list[PrimitiveMetadataField]]: Nested list of grant metadata fields.
"""
config = Config()
default_grants = config.DEFAULT_GRANTS
grants: list[list[PrimitiveMetadataField]] = []
for grant in default_grants:
grants.append(
[
PrimitiveMetadataField("grantNumberAgency", False, grant["funder"]),
PrimitiveMetadataField("grantNumberValue", False, grant["id"]),
]
)
for grant in self.data.get("grants", []):
grant_funder = grant.get("funder_display_name", {})
grant_id = grant.get("award_id", {})
if not grant_funder or not grant_id:
continue
grants.append(
[
PrimitiveMetadataField("grantNumberAgency", False, grant_funder),
PrimitiveMetadataField("grantNumberValue", False, grant_id),
]
)
return grants
def build_authors(self) -> tuple[list[Person], list[Person]]:
"""
Build lists of authors and corresponding authors from the metadata.
Returns:
tuple: (authors, corresponding_authors)
"""
authors: list[Person] = []
corresponding_authors: list[Person] = []
for authorship in self.data.get("authorships", []):
author = authorship.get("author", {})
if not author:
continue
author_person = self._process_author(author, authorship)
authors.append(author_person)
if authorship.get("is_corresponding"):
corresponding_entry = self._process_corresponding_author(
author_person, authorship
)
if corresponding_entry:
corresponding_authors.append(corresponding_entry)
return authors, corresponding_authors
def _process_author(
self, author: dict[str, Any], authorship: dict[str, Any]
) -> Person:
"""
Process author data and return a Person instance.
Args:
author (dict[str, Any]): Author data.
authorship (dict[str, Any]): Authorship metadata.
Returns:
Person: Processed author
"""
display_name = author.get("display_name", "")
given_name, family_name = NameProcessor.split_name(display_name)
person = Person(family_name, given_name)
if affiliations := authorship.get("affiliations"):
affiliation = Institution(
affiliations[0].get("raw_affiliation_string", "").strip()
)
person.affiliation = affiliation
if self.ror:
if institutions := authorship.get("institutions"):
institution = institutions[0]
if institution.get("ror"):
affiliation = Institution(
institution.get("display_name"), institution.get("ror")
)
person.affiliation = affiliation
if orcid := author.get("orcid"):
person.orcid = normalize_orcid(orcid)
return person
def _process_corresponding_author(
self, author: Person, authorship: dict[str, Any]
) -> Person | None:
"""
Identify the corresponding author based on provided PI information.
Args:
author (Person): The author.
authorship (dict[str, Any]): Authorship metadata.
Returns:
Person | None: The corresponding author, or None if not found.
"""
pi_matches = self.pi_finder.find_by_orcid([author])
return pi_matches[0] if pi_matches else None
def build_topics(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for topics based on a threshold score.
Returns:
list[list[PrimitiveMetadataField]]: Nested list of topic metadata fields.
"""
topics: list[list[PrimitiveMetadataField]] = []
for topic in self.data.get("topics", []):
if topic.get("score", 0) >= 0.8:
topic_class_value_field = PrimitiveMetadataField(
"topicClassValue", False, topic.get("display_name")
)
topic_class_vocab_field = PrimitiveMetadataField(
"topicClassVocab", False, "OpenAlex"
)
topic_class_vocab_uri_field = PrimitiveMetadataField(
"topicClassVocabURI", False, topic.get("id")
)
topics.append(
[
topic_class_value_field,
topic_class_vocab_field,
topic_class_vocab_uri_field,
]
)
return topics
def build_keywords(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for keywords from both regular keywords and MeSH terms.
Returns:
list[list[PrimitiveMetadataField]]: Nested list of keyword metadata fields.
"""
keywords: list[list[PrimitiveMetadataField]] = []
for keyword in self.data.get("keywords", []):
# Filter out possibly unrelated keywords (low score)
if keyword.get("score", 0) >= 0.5:
keyword_value_field = PrimitiveMetadataField(
"keywordValue", False, keyword["display_name"]
)
keywords.append([keyword_value_field])
mesh_base_url = "http://id.nlm.nih.gov/mesh"
for mesh in self.data.get("mesh", []):
url = f"{mesh_base_url}/{mesh['descriptor_ui']}"
if mesh.get("qualifier_ui"):
url = f"{url}{mesh['qualifier_ui']}"
keyword_value_field = PrimitiveMetadataField(
"keywordValue", False, mesh["descriptor_name"]
)
keyword_term_uri_field = PrimitiveMetadataField(
"keywordTermURI", False, url
)
keyword_vocabulary_field = PrimitiveMetadataField(
"keywordVocabulary", False, "MeSH"
)
keyword_vocabulary_uri_field = PrimitiveMetadataField(
"keywordVocabularyURI", False, mesh_base_url
)
keywords.append(
[
keyword_value_field,
keyword_term_uri_field,
keyword_vocabulary_field,
keyword_vocabulary_uri_field,
]
)
return keywords
def _get_publication_year(self, data: dict[str, Any]) -> str:
"""
Extract publication year from data, with fallbacks.
Args:
data (dict[str, Any]): Publication data.
Returns:
str: Publication year as string.
"""
# Try publication_year first
if pub_year := data.get("publication_year"):
return str(pub_year)
# Fallback to publication_date
if pub_date := data.get("publication_date"):
try:
return pub_date.split("-")[0]
except (AttributeError, IndexError):
pass
# Final fallback
return "Unknown"

View file

@ -0,0 +1,474 @@
"""
Metadata processing for doi2dataset.
This module contains the MetadataProcessor class which handles the complete workflow
of processing DOIs: fetching data, building metadata, and optionally uploading to Dataverse.
"""
import json
import warnings
from pathlib import Path
from typing import Any
from rich.console import Console
from rich.progress import Progress, TaskID
from ..api.client import APIClient
from ..api.processors import AbstractProcessor, LicenseProcessor
from ..core.config import Config
from ..core.metadata_fields import (
CompoundMetadataField,
ControlledVocabularyMetadataField,
PrimitiveMetadataField,
)
from ..core.models import Person
from ..processing.citation import CitationBuilder
from ..processing.utils import NameProcessor, PIFinder, SubjectMapper
# Suppress warnings from idutils
warnings.filterwarnings(
"ignore", message=".*pkg_resources.*", category=DeprecationWarning
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from idutils.normalizers import normalize_doi
from idutils.validators import is_doi
class MetadataProcessor:
"""
Processes metadata for a given DOI by fetching data from OpenAlex,
building metadata blocks, and optionally uploading the dataset.
"""
# Icons for console output - TODO: should be moved to a constants module
ICONS = {
"processing": "⚙️",
"success": "",
"error": "",
"warning": "⚠️",
"info": "",
"upload": "📤",
"save": "💾",
}
def __init__(
self,
doi: str,
depositor: str | None = None,
output_path: Path | None = None,
default_subject: str = "Other",
contact_mail: str | None = None,
upload: bool = False,
ror: bool = False,
console: Console | None = None,
progress: Progress | None = None,
task_id: TaskID | None = None,
) -> None:
"""
Initialize the MetadataProcessor with configuration and processing options.
Args:
doi (str): The DOI to process.
depositor (str | None): Depositor name.
output_path (Path | None): Path where metadata will be saved.
default_subject (str): Default subject.
contact_mail (str | None): Contact email address.
ror (bool): Whether to use ROR id for affiliation
upload (bool): Whether to upload metadata.
console (Console | None): Rich console instance.
progress (Progress | None): Progress bar instance.
task_id (TaskID | None): Task ID for progress updates.
"""
self.console = console or Console()
try:
self.doi = self._validate_doi(doi)
except ValueError as e:
self.console.print(f"Error: {str(e)}", style="error")
raise
self.depositor = depositor
self.output_path = output_path
self.default_subject = default_subject
self.api_client = APIClient(contact_mail)
config = Config()
pi_objects = [Person(**pi) for pi in config.PIS]
self.pi_finder = PIFinder(pi_objects)
self.upload = upload
self.ror = ror
self.progress = progress
self.task_id = task_id
@staticmethod
def _validate_doi(doi: str) -> str:
"""
Validate and normalize a DOI.
Args:
doi (str): The DOI to validate.
Returns:
str: Normalized DOI.
Raises:
ValueError: If the DOI is invalid.
"""
if not is_doi(doi):
raise ValueError(f"Invalid DOI: {doi}")
return normalize_doi(doi)
def _update_progress(self) -> None:
"""
Advance the progress bar if enabled.
"""
if self.progress and self.task_id is not None:
self.progress.advance(self.task_id)
def process(self) -> dict[str, Any]:
"""
Process the DOI: fetch data, build metadata, optionally upload, and save output.
Returns:
dict[str, Any]: The constructed metadata dictionary.
"""
self.console.print(
f"{self.ICONS['processing']} Processing DOI: {self.doi}", style="info"
)
data = self._fetch_data()
self._update_progress()
metadata = self._build_metadata(data)
self._update_progress()
if self.upload:
self._upload_data(metadata)
self._update_progress()
self._save_output(metadata)
self._update_progress()
self.console.print(
f"\n{self.ICONS['success']} Successfully processed: {self.doi}\n",
style="success",
)
return metadata
def _upload_data(self, metadata: dict[str, Any]) -> dict[str, Any]:
"""
Upload the metadata to Dataverse.
Args:
metadata (dict[str, Any]): The metadata to upload.
Returns:
dict[str, Any]: The response from the Dataverse API.
Raises:
ValueError: If the upload fails.
"""
config = Config()
token = config.DATAVERSE["api_token"]
client = APIClient(token=token)
url = f"{config.DATAVERSE['url']}/api/dataverses/{config.DATAVERSE['dataverse']}/datasets?doNotValidate=true"
auth = (config.DATAVERSE["auth_user"], config.DATAVERSE["auth_password"])
response = client.make_request(url, method="POST", auth=auth, json=metadata)
if response is None or response.status_code != 201:
self.console.print(
f"\n{self.ICONS['error']} Failed to upload to Dataverse: {url}",
style="error",
)
raise ValueError(f"Failed to upload to Dataverse: {url}")
else:
perma = response.json().get("data", {}).get("persistentId", "")
self.console.print(
f"{self.ICONS['upload']} Dataset uploaded to: {config.DATAVERSE['dataverse']} with ID {perma}",
style="info",
)
return response.json()
def _fetch_data(self) -> dict[str, Any]:
"""
Fetch metadata from OpenAlex for the given DOI.
Returns:
dict[str, Any]: The fetched data.
Raises:
ValueError: If data fetching fails.
"""
url = f"https://api.openalex.org/works/https://doi.org/{self.doi}"
response = self.api_client.make_request(url)
if response is None or response.status_code != 200:
self.console.print(
f"\n{self.ICONS['error']} Failed to fetch data for DOI: {self.doi}",
style="error",
)
raise ValueError(f"Failed to fetch data for DOI: {self.doi}")
return response.json()
def _build_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Construct the complete metadata dictionary from fetched data.
Args:
data (dict[str, Any]): The data retrieved from OpenAlex.
Returns:
dict[str, Any]: The complete metadata dictionary.
"""
license_info = LicenseProcessor.process_license(data)
abstract_processor = AbstractProcessor(self.api_client, self.console)
abstract = abstract_processor.get_abstract(self.doi, data, license_info)
citation_builder = CitationBuilder(data, self.doi, self.pi_finder, self.ror)
authors, corresponding_authors = citation_builder.build_authors()
author_fields: list[
list[PrimitiveMetadataField | ControlledVocabularyMetadataField]
] = []
corresponding_author_fields: list[list[PrimitiveMetadataField]] = []
for author in authors:
author_fields.append(author.author_fields())
if not corresponding_authors:
self.console.print(
f"{self.ICONS['warning']} No corresponding authors explicitly declared; PIs are used as a fallback!",
style="warning",
)
pis = self._get_involved_pis(data)
corresponding_authors: list[Person] = []
for pi in pis:
corresponding_authors.append(pi)
for corresponding_author in corresponding_authors:
corresponding_author_fields.append(
corresponding_author.dataset_contact_fields()
)
description = self._build_description(data, abstract)
grants = citation_builder.build_grants()
return_dict: dict[str, Any] = {
"datasetVersion": {
"metadataBlocks": {
"citation": {
"fields": [
PrimitiveMetadataField(
"title", False, data.get("title", "")
).to_dict(),
PrimitiveMetadataField(
"distributionDate",
False,
data.get("publication_date", ""),
).to_dict(),
CompoundMetadataField(
"otherId", True, citation_builder.build_other_ids()
).to_dict(),
CompoundMetadataField(
"dsDescription",
True,
[
[
PrimitiveMetadataField(
"dsDescriptionValue", False, description
)
]
],
).to_dict(),
ControlledVocabularyMetadataField(
"subject",
True,
SubjectMapper.map_subjects([self.default_subject]),
).to_dict(),
CompoundMetadataField(
"topicClassification",
True,
citation_builder.build_topics(),
).to_dict(),
CompoundMetadataField(
"keyword", True, citation_builder.build_keywords()
).to_dict(),
PrimitiveMetadataField(
"depositor",
False,
self.depositor
or data.get("primary_location", {})
.get("source", {})
.get("display_name", ""),
).to_dict(),
PrimitiveMetadataField(
"alternativeURL", False, f"https://doi.org/{self.doi}"
).to_dict(),
CompoundMetadataField(
"author", True, author_fields
).to_dict(),
CompoundMetadataField(
"datasetContact", True, corresponding_author_fields
).to_dict(),
CompoundMetadataField(
"grantNumber", True, grants
).to_dict(),
],
"displayName": "Citation Metadata",
}
},
"files": [],
}
}
if license_info.name:
return_dict["datasetVersion"]["license"] = {
"name": license_info.name,
"uri": license_info.uri,
}
else:
return_dict["datasetVersion"]["termsOfUse"] = (
f"All rights reserved. Copyright © {self._get_publication_year(data)}, [TODO: Insert copyright holder here!]"
)
return return_dict
def _build_description(self, data: dict[str, Any], abstract) -> str:
"""
Build the description field by combining a header and the abstract.
Args:
data (dict[str, Any]): The metadata.
abstract: The abstract object.
Returns:
str: The full description.
"""
head = self._build_description_head(data)
return f"{head}{abstract.text}"
def _build_description_head(self, data: dict[str, Any]) -> str:
"""
Build the header for the description based on publication details.
Args:
data (dict[str, Any]): The metadata.
Returns:
str: The HTML header string.
"""
journal = data.get("primary_location", {}).get("source", {}).get("display_name")
publication_date = data.get("publication_date")
volume = data.get("biblio", {}).get("volume")
issue = data.get("biblio", {}).get("issue")
doc_type = data.get("type")
if all([journal, publication_date, volume, issue, doc_type]):
return f"<p>This {doc_type} was published on {publication_date} in <i>{journal}</i> {volume}({issue})</p>"
elif all([journal, publication_date, doc_type]):
return f"<p>This {doc_type} was published on {publication_date} in <i>{journal}</i></p>"
self.console.print(
f"{self.ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)",
style="warning",
)
return ""
def _get_publication_year(self, data: dict[str, Any]) -> str | int:
"""
Extract the publication year from the metadata.
Args:
data (dict[str, Any]): The metadata.
Returns:
str | int: The publication year or empty string.
"""
return data.get("publication_year", "")
def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]:
"""
Identify involved principal investigators from the metadata for use as fallback
corresponding authors.
This method matches authors in the publication metadata against the configured
PIs and returns matching PIs. It is used as a fallback when no corresponding
authors are explicitly declared in the publication metadata.
Args:
data (dict[str, Any]): The metadata from OpenAlex.
Returns:
list[Person]: List of matching PIs for use as corresponding authors.
"""
involved_pis: list[Person] = []
authors_in_publication = []
# Build list of authors from publication
for authorship in data.get("authorships", []):
author = authorship.get("author", {})
if not author:
continue
display_name = author.get("display_name", "")
given_name, family_name = NameProcessor.split_name(display_name)
person = Person(family_name, given_name)
if orcid := author.get("orcid"):
person.orcid = orcid
authors_in_publication.append(person)
# Find PIs that match authors in the publication
involved_pis = self.pi_finder.find_by_orcid(authors_in_publication)
return involved_pis
def _save_output(self, metadata: dict[str, Any]) -> None:
"""
Save the generated metadata to a file or print it to the console.
Args:
metadata (dict[str, Any]): The metadata to save.
"""
if self.output_path:
try:
# Custom JSON encoder to handle custom objects
class CustomEncoder(json.JSONEncoder):
"""
Custom JSON encoder that handles objects with to_dict method.
This allows for proper serialization of custom classes like
Institution and Person by calling their to_dict method when
available.
Args:
o: The object to serialize.
Returns:
A JSON-serializable representation of the object.
"""
def default(self, o: Any) -> Any:
if hasattr(o, "to_dict"):
return o.to_dict()
return super().default(o)
with open(self.output_path, "w", encoding="utf-8") as f:
json.dump(
metadata, f, indent=4, ensure_ascii=False, cls=CustomEncoder
)
self.console.print(
f"{self.ICONS['save']} Metadata saved in: {self.output_path}",
style="info",
)
except Exception as e:
self.console.print(
f"{self.ICONS['error']} Error saving metadata: {str(e)}\n",
style="error",
)
raise
else:
self.console.print(metadata)

View file

@ -0,0 +1,289 @@
"""
Processing utilities for doi2dataset.
This module contains utility classes and functions used for processing
names, finding PIs, mapping subjects, and other business logic operations.
"""
import unicodedata
import warnings
from typing import Any
from ..core.models import Person
# Suppress warnings from idutils
warnings.filterwarnings(
"ignore", message=".*pkg_resources.*", category=DeprecationWarning
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from idutils.normalizers import normalize_orcid
class NameProcessor:
"""
Provides utility methods for processing names.
"""
@staticmethod
def normalize_string(s: str) -> str:
"""
Normalize a string using Unicode NFKD normalization and convert to ASCII.
Args:
s (str): The string to normalize.
Returns:
str: The normalized string.
"""
return (
unicodedata.normalize("NFKD", s.lower())
.encode("ASCII", "ignore")
.decode("ASCII")
)
@staticmethod
def split_name(full_name: str) -> tuple[str, str]:
"""
Split a full name into given and family names.
Args:
full_name (str): The full name (e.g., "Doe, John" or "John Doe").
Returns:
tuple[str, str]: A tuple (given_name, family_name).
"""
if "," in full_name:
surname, given_name = full_name.split(",", 1)
return given_name.strip(), surname.strip()
parts = full_name.strip().split()
if len(parts) == 1:
return "", parts[0]
return " ".join(parts[:-1]), parts[-1]
class PIFinder:
"""
Finds principal investigators (PIs) among a list of Person objects.
"""
def __init__(self, pis: list[Person]) -> None:
"""
Initialize with a list of Person objects representing potential PIs.
Args:
pis (list[Person]): List of Person objects.
"""
self.pis = pis
def find_by_orcid(self, authors: list[Person]) -> list[Person]:
"""
Find PIs by ORCID identifier among the authors.
Args:
authors (list[Person]): List of author Person objects.
Returns:
list[Person]: List of Person objects that are PIs based on ORCID matching.
"""
if not self.pis or not authors:
return []
pi_orcids = {pi.orcid for pi in self.pis if pi.orcid}
if not pi_orcids:
return []
return [author for author in authors if author.orcid in pi_orcids]
def find_corresponding_authors(self, authors: list[Person]) -> list[Person]:
"""
Find corresponding authors by checking for email addresses and PI matching.
Args:
authors (list[Person]): List of author Person objects.
Returns:
list[Person]: List of corresponding authors.
"""
# First, try to find authors with email addresses
authors_with_email = [author for author in authors if author.email]
if authors_with_email:
# If we have PIs configured, prefer PI matches
pi_matches = self.find_by_orcid(authors_with_email)
if pi_matches:
return pi_matches
# Otherwise return all authors with email addresses
return authors_with_email
# Fallback: look for PI matches even without email
pi_matches = self.find_by_orcid(authors)
if pi_matches:
return pi_matches
# Last resort: return first author if no other criteria match
return authors[:1] if authors else []
def find_pi(
self,
family_name: str | None = None,
given_name: str | None = None,
orcid: str | None = None,
) -> Person | None:
"""
Find a PI by name and/or ORCID.
Args:
family_name (str | None): Family name to match.
given_name (str | None): Given name to match.
orcid (str | None): ORCID to match.
Returns:
Person | None: The matched PI or None.
"""
if orcid:
return self._find_by_orcid(orcid)
# Fallback to name matching if no ORCID
for person in self.pis:
name_match = True
if family_name and person.family_name.lower() != family_name.lower():
name_match = False
if given_name and person.given_name.lower() != given_name.lower():
name_match = False
if name_match:
return person
return None
def _find_by_orcid(self, orcid: str) -> Person | None:
"""
Find a PI by ORCID.
Args:
orcid (str): Normalized ORCID.
Returns:
Person | None: The matched PI or None.
"""
try:
normalized_orcid = normalize_orcid(orcid)
for person in self.pis:
if person.orcid and normalize_orcid(person.orcid) == normalized_orcid:
return person
except Exception:
# If ORCID normalization fails, try direct string comparison
for person in self.pis:
if person.orcid == orcid:
return person
return None
class SubjectMapper:
"""
Maps subject names from input data to controlled vocabulary.
"""
CONTROLLED_VOCAB = {
"Agricultural Sciences": "Agricultural Sciences",
"Arts and Humanities": "Arts and Humanities",
"Astronomy": "Astronomy and Astrophysics",
"Astrophysics": "Astronomy and Astrophysics",
"Business": "Business and Management",
"Business and Management": "Business and Management",
"Chemistry": "Chemistry",
"Computer Science": "Computer and Information Science",
"Computer and Information Science": "Computer and Information Science",
"Earth Sciences": "Earth and Environmental Sciences",
"Earth and Environmental Sciences": "Earth and Environmental Sciences",
"Engineering": "Engineering",
"Law": "Law",
"Life Sciences": "Medicine, Health and Life Sciences",
"Mathematical Sciences": "Mathematical Sciences",
"Mathematics": "Mathematical Sciences",
"Medicine": "Medicine, Health and Life Sciences",
"Medicine, Health and Life Sciences": "Medicine, Health and Life Sciences",
"Physics": "Physics",
"Psychology": "Psychology",
"Social Sciences": "Social Sciences",
"Other": "Other",
}
@classmethod
def map_subjects(cls, subjects: list[str]) -> list[str]:
"""
Map a list of subject strings to controlled vocabulary terms.
Args:
subjects (list[str]): List of subject strings to map.
Returns:
list[str]: List of mapped controlled vocabulary terms.
"""
mapped = []
for subject in subjects:
# Try exact match first
if subject in cls.CONTROLLED_VOCAB:
mapped_subject = cls.CONTROLLED_VOCAB[subject]
if mapped_subject not in mapped:
mapped.append(mapped_subject)
else:
# Try partial matching
subject_lower = subject.lower()
for key, value in cls.CONTROLLED_VOCAB.items():
if (
subject_lower in key.lower()
or key.lower() in subject_lower
and value not in mapped
):
mapped.append(value)
break
else:
# No match found, add "Other" if not already present
if "Other" not in mapped:
mapped.append("Other")
return mapped if mapped else ["Other"]
@classmethod
def map_single_subject(cls, subject: str) -> str:
"""
Map a single subject string to a controlled vocabulary term.
Args:
subject (str): Subject string to map.
Returns:
str: Mapped controlled vocabulary term.
"""
mapped_subjects = cls.map_subjects([subject])
return mapped_subjects[0] if mapped_subjects else "Other"
@classmethod
def get_subjects(
cls, data: dict[str, Any], fallback_subject: str = "Other"
) -> list[str]:
"""
Extract and map subjects from input data.
Args:
data (dict[str, Any]): The input metadata.
fallback_subject (str): Fallback subject if none found.
Returns:
list[str]: List of mapped subject names.
"""
topics = data.get("topics", [])
subject_collection: list[str] = []
for topic in topics:
for field_type in ["subfield", "field", "domain"]:
if field_name := topic.get(field_type, {}).get("display_name"):
subject_collection.append(field_name)
mapped_subjects = cls.map_subjects(subject_collection)
return mapped_subjects if mapped_subjects else [fallback_subject]

View file

@ -0,0 +1,22 @@
"""
Utility functions and helpers for doi2dataset.
This package contains validation functions, string processing utilities,
and other helper functions used throughout the application.
"""
from .validation import (
normalize_string,
sanitize_filename,
split_name,
validate_doi,
validate_email_address,
)
__all__ = [
"validate_doi",
"validate_email_address",
"sanitize_filename",
"split_name",
"normalize_string",
]

View file

@ -0,0 +1,127 @@
"""
Validation utilities for doi2dataset.
This module provides validation functions for DOIs, email addresses,
and other data validation needs.
"""
import warnings
import dns.resolver
from email_validator import EmailNotValidError, validate_email
# Suppress the warning from idutils about pkg_resources
warnings.filterwarnings(
"ignore", message=".*pkg_resources.*", category=DeprecationWarning
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from idutils.validators import is_doi
def validate_doi(doi: str) -> bool:
"""
Validate a DOI using the idutils library.
Args:
doi (str): The DOI to validate.
Returns:
bool: True if the DOI is valid, False otherwise.
"""
return is_doi(doi)
def validate_email_address(email: str) -> bool:
"""
Validate an email address and ensure its domain has an MX record.
Args:
email (str): The email address to validate.
Returns:
bool: True if the email address is valid and its domain resolves, otherwise False.
"""
try:
# Basic validation
valid = validate_email(email)
email = valid.normalized
# Check domain has MX record
domain = email.split("@")[1]
dns.resolver.resolve(domain, "MX")
return True
except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
return False
def sanitize_filename(doi: str) -> str:
"""
Convert DOI to a valid filename using only alphanumeric characters and underscores.
Args:
doi (str): The DOI to sanitize.
Returns:
str: Sanitized filename string.
"""
# Replace non-alphanumeric characters with underscores
sanitized = "".join(c if c.isalnum() else "_" for c in doi)
# Remove consecutive underscores
while "__" in sanitized:
sanitized = sanitized.replace("__", "_")
# Remove leading/trailing underscores
return sanitized.strip("_")
def split_name(full_name: str) -> tuple[str, str]:
"""
Split a full name into given and family names.
Args:
full_name (str): The full name (e.g., "Doe, John" or "John Doe").
Returns:
tuple[str, str]: A tuple (given_name, family_name).
"""
normalized = normalize_string(full_name)
if "," in normalized:
# Format: "Doe, John"
parts = normalized.split(",", 1)
family_name = parts[0].strip()
given_name = parts[1].strip()
else:
# Format: "John Doe" - assume last word is family name
parts = normalized.split()
if len(parts) == 1:
# Only one name provided
given_name = parts[0]
family_name = ""
else:
given_name = " ".join(parts[:-1])
family_name = parts[-1]
return given_name, family_name
def normalize_string(s: str) -> str:
"""
Normalize a string using Unicode NFKD normalization and convert to ASCII.
Args:
s (str): The string to normalize.
Returns:
str: Normalized string.
"""
import unicodedata
# Normalize Unicode characters to decomposed form
normalized = unicodedata.normalize("NFKD", s)
# Convert to ASCII, ignoring non-ASCII characters
ascii_str = normalized.encode("ascii", "ignore").decode("ascii")
return ascii_str.strip()