diff --git a/doi2dataset/__init__.py b/doi2dataset/__init__.py
new file mode 100644
index 0000000..c1681f4
--- /dev/null
+++ b/doi2dataset/__init__.py
@@ -0,0 +1,98 @@
+"""
+doi2dataset: A tool to process DOIs and generate metadata for Dataverse datasets.
+
+This package provides functionality to:
+- Validate and process DOIs
+- Fetch metadata from external APIs (OpenAlex, CrossRef)
+- Generate Dataverse-compatible metadata
+- Upload datasets to Dataverse instances
+
+The package is organized into several modules:
+- core: Configuration, models, and metadata field definitions
+- api: API clients and processors
+- processing: Business logic for citation building and metadata processing
+- utils: Validation and utility functions
+"""
+
+# Version information
+try:
+ # Try to get version from setuptools_scm first (modern approach)
+ from importlib.metadata import version
+
+ __version__ = version("doi2dataset")
+except ImportError:
+ # Fallback for older Python versions
+ try:
+ import pkg_resources
+
+ __version__ = pkg_resources.get_distribution("doi2dataset").version
+ except Exception:
+ __version__ = "1.0.0" # Fallback version
+
+# Import main functionality for convenience
+from .api import (
+ AbstractProcessor,
+ APIClient,
+ LicenseProcessor,
+)
+from .core import (
+ Abstract,
+ BaseMetadataField,
+ CompoundMetadataField,
+ Config,
+ ConfigData,
+ ControlledVocabularyMetadataField,
+ FieldType,
+ Institution,
+ License,
+ Person,
+ PrimitiveMetadataField,
+)
+from .processing import (
+ CitationBuilder,
+ MetadataProcessor,
+ NameProcessor,
+ PIFinder,
+ SubjectMapper,
+)
+from .utils import (
+ normalize_string,
+ sanitize_filename,
+ split_name,
+ validate_doi,
+ validate_email_address,
+)
+
+__all__ = [
+ # Version
+ "__version__",
+ # API components
+ "APIClient",
+ "AbstractProcessor",
+ "LicenseProcessor",
+ # Core classes
+ "Config",
+ "ConfigData",
+ "Person",
+ "Institution",
+ "License",
+ "Abstract",
+ # Metadata fields
+ "BaseMetadataField",
+ "PrimitiveMetadataField",
+ "ControlledVocabularyMetadataField",
+ "CompoundMetadataField",
+ "FieldType",
+ # Processing components
+ "CitationBuilder",
+ "MetadataProcessor",
+ "NameProcessor",
+ "PIFinder",
+ "SubjectMapper",
+ # Utilities
+ "validate_doi",
+ "validate_email_address",
+ "sanitize_filename",
+ "split_name",
+ "normalize_string",
+]
diff --git a/doi2dataset/api/__init__.py b/doi2dataset/api/__init__.py
new file mode 100644
index 0000000..e534acf
--- /dev/null
+++ b/doi2dataset/api/__init__.py
@@ -0,0 +1,15 @@
+"""
+API components for doi2dataset.
+
+This package contains HTTP client functionality and processors for interacting
+with external APIs such as OpenAlex, CrossRef, and Dataverse.
+"""
+
+from .client import APIClient
+from .processors import AbstractProcessor, LicenseProcessor
+
+__all__ = [
+ "APIClient",
+ "AbstractProcessor",
+ "LicenseProcessor",
+]
diff --git a/doi2dataset/api/client.py b/doi2dataset/api/client.py
new file mode 100644
index 0000000..80eac1e
--- /dev/null
+++ b/doi2dataset/api/client.py
@@ -0,0 +1,92 @@
+"""
+API client for external service interactions.
+
+This module provides a generic HTTP client for making requests to external APIs
+like OpenAlex, CrossRef, and Dataverse with proper error handling and headers.
+"""
+
+from typing import Any
+
+import requests
+
+
+class APIClient:
+ """
+ Client for making HTTP requests to external APIs.
+
+ Attributes:
+ session (requests.Session): The underlying requests session.
+ """
+
+ def __init__(
+ self,
+ contact_mail: str | None = None,
+ user_agent: str = "doi2dataset/2.0",
+ token: str | None = None,
+ ) -> None:
+ """
+ Initialize the API client with optional contact mail, user agent, and token.
+
+ Args:
+ contact_mail (str | None): Contact email address.
+ user_agent (str): User agent string.
+ token (str | None): Optional API token.
+ """
+ self.session = requests.Session()
+ self._set_headers(contact_mail, user_agent, token)
+
+ def _set_headers(
+ self, contact_mail: str | None, user_agent: str, token: str | None
+ ) -> None:
+ """
+ Set HTTP headers for the session based on contact email and token.
+
+ Args:
+ contact_mail (str | None): Contact email address.
+ user_agent (str): User agent string.
+ token (str | None): Optional API token.
+ """
+ if contact_mail:
+ header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"}
+ else:
+ header = {"User-Agent": user_agent}
+
+ if token:
+ header["X-Dataverse-key"] = token
+
+ self.session.headers.update(header)
+
+ def make_request(
+ self, url: str, method: str = "GET", **kwargs: Any
+ ) -> requests.Response | None:
+ """
+ Make an HTTP request and return the response.
+
+ Args:
+ url (str): The URL to request.
+ method (str): HTTP method to use (default: GET).
+ **kwargs: Additional arguments for requests.request.
+
+ Returns:
+ requests.Response | None: The HTTP response, or None if the request failed.
+ """
+ try:
+ response = self.session.request(method, url, **kwargs)
+ response.raise_for_status()
+ return response
+ except requests.exceptions.RequestException:
+ # Log error - in a refactored version this should use proper logging
+ # For now, return None and let caller handle the error
+ return None
+
+ def close(self) -> None:
+ """Close the session."""
+ self.session.close()
+
+ def __enter__(self) -> "APIClient":
+ """Context manager entry."""
+ return self
+
+ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
+ """Context manager exit."""
+ self.close()
diff --git a/doi2dataset/api/processors.py b/doi2dataset/api/processors.py
new file mode 100644
index 0000000..3f8fdfc
--- /dev/null
+++ b/doi2dataset/api/processors.py
@@ -0,0 +1,230 @@
+"""
+API processors for doi2dataset.
+
+This module contains processors for handling specific types of data from external APIs,
+including license processing and abstract extraction/cleaning.
+"""
+
+import re
+from typing import Any
+
+from rich.console import Console
+
+from ..core.models import Abstract, License
+
+
+class LicenseProcessor:
+ """
+ Processes license information from metadata.
+ """
+
+ LICENSE_MAP = {
+ "cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"),
+ "cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"),
+ "cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"),
+ "cc-by-nc-sa": (
+ "https://creativecommons.org/licenses/by-nc-sa/4.0/",
+ "CC BY-NC-SA 4.0",
+ ),
+ "cc-by-nc-nd": (
+ "https://creativecommons.org/licenses/by-nc-nd/4.0/",
+ "CC BY-NC-ND 4.0",
+ ),
+ "cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"),
+ "cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"),
+ "pd": (
+ "https://creativecommons.org/publicdomain/mark/1.0/",
+ "Public Domain Mark 1.0",
+ ),
+ }
+
+ @classmethod
+ def process_license(cls, data: dict[str, Any]) -> License:
+ """
+ Process and return license information based on input data.
+
+ Args:
+ data (dict[str, Any]): Input data containing license info.
+
+ Returns:
+ License: Processed license information.
+ """
+ location = data.get("primary_location", {})
+ license_short = location.get("license", "")
+
+ if not license_short:
+ return License(name="", uri="", short="unknown")
+
+ base_license = license_short.split("/")[0].lower()
+ uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short))
+ return License(name=name, uri=uri, short=license_short)
+
+
+class AbstractProcessor:
+ """
+ Retrieves and processes abstracts from CrossRef and OpenAlex.
+ """
+
+ # Icons for console output - TODO: should be moved to a constants module
+ ICONS = {"info": "ℹ️", "warning": "⚠️", "error": "❌"}
+
+ def __init__(self, api_client, console: Console | None = None):
+ """
+ Initialize with an APIClient instance.
+
+ Args:
+ api_client: The API client to use for requests.
+ console (Console | None): Rich console instance for output.
+ """
+ self.api_client = api_client
+ self.console = console or Console()
+
+ def get_abstract(
+ self, doi: str, data: dict[str, Any], license: License
+ ) -> Abstract:
+ """
+ Get an abstract based on DOI and license permissions.
+
+ Args:
+ doi (str): The DOI.
+ data (dict[str, Any]): Data retrieved from an external source.
+ license (License): License information.
+
+ Returns:
+ Abstract: The abstract with its source.
+ """
+ license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"}
+
+ if license.short in license_ok:
+ self.console.print(
+ f"\n{self.ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.",
+ style="info",
+ )
+ crossref_abstract = self._get_crossref_abstract(doi)
+ if crossref_abstract:
+ return Abstract(text=crossref_abstract, source="crossref")
+ else:
+ self.console.print(
+ f"\n{self.ICONS['warning']} No abstract found in CrossRef!",
+ style="warning",
+ )
+ else:
+ if license.name:
+ self.console.print(
+ f"\n{self.ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!",
+ style="info",
+ )
+ else:
+ self.console.print(
+ f"\n{self.ICONS['info']} Custom license does not allow derivative works. Reconstructing abstract from OpenAlex!",
+ style="info",
+ )
+
+ openalex_abstract = self._get_openalex_abstract(data)
+ if openalex_abstract:
+ return Abstract(text=openalex_abstract, source="openalex")
+ else:
+ self.console.print(
+ f"\n{self.ICONS['warning']} No abstract found in OpenAlex!",
+ style="warning",
+ )
+
+ self.console.print(
+ f"\n{self.ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!",
+ style="warning",
+ )
+ return Abstract(text="", source="none")
+
+ def _get_crossref_abstract(self, doi: str) -> str | None:
+ """
+ Retrieve abstract from CrossRef API.
+
+ Args:
+ doi (str): The DOI.
+
+ Returns:
+ str | None: The abstract if found, otherwise None.
+ """
+ url = f"https://api.crossref.org/works/{doi}"
+ response = self.api_client.make_request(url)
+
+ if response and response.status_code == 200:
+ abstract_raw = response.json().get("message", {}).get("abstract")
+ return self._clean_jats(abstract_raw)
+ return None
+
+ def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None:
+ """
+ Retrieve abstract from OpenAlex data.
+
+ Args:
+ data (dict[str, Any]): Data from OpenAlex.
+
+ Returns:
+ str | None: The reconstructed abstract, or None if not available.
+ """
+ inv_index = data.get("abstract_inverted_index")
+ if not inv_index:
+ return None
+
+ word_positions = [
+ (word, pos) for word, positions in inv_index.items() for pos in positions
+ ]
+ sorted_words = sorted(word_positions, key=lambda x: x[1])
+ return " ".join(word for word, _ in sorted_words)
+
+ def _clean_jats(self, text: str | None) -> str:
+ """
+ Clean JATS XML tags in the abstract and convert them to HTML tags.
+
+ Args:
+ text (str | None): The raw abstract text containing JATS tags.
+
+ Returns:
+ str: The cleaned abstract text.
+ """
+ if not text:
+ return ""
+
+ # Handle list tags with sequential processing to avoid duplicate keys
+ # Process ordered lists first - replace both opening and closing tags
+ text = text.replace('', "")
+ # Find and replace closing tags for ordered lists
+ # This regex matches
that comes after
tags
+ pattern = r"(.*?)"
+ text = re.sub(pattern, r"\1
", text, flags=re.DOTALL)
+
+ # Process unordered lists second
+ text = text.replace('', " tags as unordered list closings
+ text = text.replace("", "")
+
+ # Handle other JATS tags
+ replacements = {
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "": "",
+ "
": "",
+ "": "",
+ "": "",
+ "": "",
+ "
": "",
+ "": "",
+ "
": "",
+ "": "- ",
+ "
": "",
+ "": "",
+ "
": "",
+ }
+
+ for jats_tag, html_tag in replacements.items():
+ text = text.replace(jats_tag, html_tag)
+ return text
diff --git a/doi2dataset/core/__init__.py b/doi2dataset/core/__init__.py
new file mode 100644
index 0000000..3ba39d9
--- /dev/null
+++ b/doi2dataset/core/__init__.py
@@ -0,0 +1,34 @@
+"""
+Core components for doi2dataset.
+
+This package contains the fundamental classes and utilities used throughout
+the application, including configuration management, data models, and
+metadata field definitions.
+"""
+
+from .config import Config, ConfigData
+from .metadata_fields import (
+ BaseMetadataField,
+ CompoundMetadataField,
+ ControlledVocabularyMetadataField,
+ FieldType,
+ PrimitiveMetadataField,
+)
+from .models import Abstract, Institution, License, Person
+
+__all__ = [
+ # Configuration
+ "Config",
+ "ConfigData",
+ # Models
+ "Person",
+ "Institution",
+ "License",
+ "Abstract",
+ # Metadata fields
+ "BaseMetadataField",
+ "PrimitiveMetadataField",
+ "ControlledVocabularyMetadataField",
+ "CompoundMetadataField",
+ "FieldType",
+]
diff --git a/doi2dataset/core/config.py b/doi2dataset/core/config.py
new file mode 100644
index 0000000..78fdc9d
--- /dev/null
+++ b/doi2dataset/core/config.py
@@ -0,0 +1,173 @@
+"""
+Configuration management for doi2dataset.
+
+This module provides configuration loading and management with support for
+environment variable overrides for sensitive credentials.
+"""
+
+import os
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+
+import yaml
+
+from ..utils.validation import validate_email_address
+
+
+@dataclass
+class ConfigData:
+ """
+ Represents configuration data loaded from a YAML file with environment variable overrides.
+
+ The dataverse configuration may be overridden by environment variables:
+ DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE,
+ DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD.
+
+ Attributes:
+ dataverse (dict[str, str]): Dataverse-related configuration with environment
+ variable overrides applied.
+ pis (list[dict[str, Any]]): List of principal investigator configurations.
+ default_grants (list[dict[str, str]]): Default grant configurations.
+ """
+
+ dataverse: dict[str, str]
+ pis: list[dict[str, Any]]
+ default_grants: list[dict[str, str]]
+
+
+class Config:
+ """
+ Singleton class to handle configuration loading and retrieval.
+
+ Supports environment variable overrides for Dataverse configuration:
+ - DATAVERSE_URL: Overrides dataverse.url
+ - DATAVERSE_API_TOKEN: Overrides dataverse.api_token
+ - DATAVERSE_DATAVERSE: Overrides dataverse.dataverse
+ - DATAVERSE_AUTH_USER: Overrides dataverse.auth_user
+ - DATAVERSE_AUTH_PASSWORD: Overrides dataverse.auth_password
+
+ Environment variables take precedence over config file values.
+ """
+
+ _instance: "Config | None" = None
+ _config_data: ConfigData | None = None
+
+ def __new__(cls) -> "Config":
+ """
+ Create and return the singleton instance of Config.
+
+ Returns:
+ Config: The singleton instance.
+ """
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ return cls._instance
+
+ @classmethod
+ def load_config(cls, config_path: str | Path | None = None) -> None:
+ """
+ Load configuration from a YAML file with environment variable overrides.
+
+ Environment variables will override corresponding config file values:
+ DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE,
+ DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD
+
+ Args:
+ config_path (str | Path | None): Path to the configuration file.
+ If None, the default config.yaml in the project root is used.
+
+ Raises:
+ FileNotFoundError: If the configuration file does not exist.
+ ValueError: If any PI email address is invalid.
+ """
+ if config_path is None:
+ # Look for config.yaml in the project root (two levels up from this file)
+ config_path = Path(__file__).parent.parent.parent / "config.yaml"
+
+ config_path = Path(config_path)
+ if not config_path.exists():
+ raise FileNotFoundError(f"Config file not found: {config_path}")
+
+ with open(config_path, encoding="utf-8") as f:
+ config_data = yaml.safe_load(f)
+
+ # Override dataverse config with environment variables if they exist
+ dataverse_config = config_data.get("dataverse", {})
+
+ # Check for environment variables and override config values
+ env_overrides = {
+ "url": os.getenv("DATAVERSE_URL"),
+ "api_token": os.getenv("DATAVERSE_API_TOKEN"),
+ "dataverse": os.getenv("DATAVERSE_DATAVERSE"),
+ "auth_user": os.getenv("DATAVERSE_AUTH_USER"),
+ "auth_password": os.getenv("DATAVERSE_AUTH_PASSWORD"),
+ }
+
+ # Apply environment variable overrides if they exist
+ for key, env_value in env_overrides.items():
+ if env_value is not None:
+ dataverse_config[key] = env_value
+
+ # Validate PI email addresses
+ pis = config_data.get("pis", [])
+ for pi in pis:
+ if email := pi.get("email"):
+ if not validate_email_address(email):
+ raise ValueError(
+ f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}"
+ )
+
+ cls._config_data = ConfigData(
+ dataverse=dataverse_config,
+ pis=config_data.get("pis", []),
+ default_grants=config_data.get("default_grants", []),
+ )
+
+ @classmethod
+ def get_config(cls) -> ConfigData:
+ """
+ Retrieve the loaded configuration data.
+
+ Returns:
+ ConfigData: The configuration data.
+
+ Raises:
+ RuntimeError: If the configuration could not be loaded.
+ """
+ if cls._config_data is None:
+ cls.load_config()
+ if cls._config_data is None:
+ raise RuntimeError("Failed to load configuration")
+ return cls._config_data
+
+ @property
+ def PIS(self) -> list[dict[str, Any]]:
+ """
+ Get PI configurations.
+
+ Returns:
+ list[dict[str, Any]]: List of PI configurations.
+ """
+ return self.get_config().pis
+
+ @property
+ def DEFAULT_GRANTS(self) -> list[dict[str, str]]:
+ """
+ Get default grant configurations.
+
+ Returns:
+ list[dict[str, str]]: List of default grants.
+ """
+ return self.get_config().default_grants
+
+ @property
+ def DATAVERSE(self) -> dict[str, str]:
+ """
+ Get Dataverse configurations with environment variable overrides applied.
+
+ Returns:
+ dict[str, str]: Dataverse configuration with environment variables
+ taking precedence over config file values.
+ """
+ return self.get_config().dataverse
diff --git a/doi2dataset/core/metadata_fields.py b/doi2dataset/core/metadata_fields.py
new file mode 100644
index 0000000..8c66d86
--- /dev/null
+++ b/doi2dataset/core/metadata_fields.py
@@ -0,0 +1,168 @@
+"""
+Metadata field classes for Dataverse integration.
+
+This module provides the base classes and implementations for different types
+of metadata fields used in Dataverse dataset creation.
+"""
+
+from collections.abc import Sequence
+from dataclasses import dataclass, field
+from enum import Enum
+from functools import reduce
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+ pass
+
+
+class FieldType(Enum):
+ """Enum representing different Dataverse field types."""
+
+ PRIMITIVE = "primitive"
+ COMPOUND = "compound"
+ VOCABULARY = "controlledVocabulary"
+
+
+@dataclass
+class BaseMetadataField[T]:
+ """
+ Base class for Dataverse metadata fields.
+
+ This class defines a metadata field with a name, a value of type T, and
+ a flag indicating whether multiple values are allowed. It serves as
+ a template for specific metadata field implementations.
+
+ Attributes:
+ name (str): The name of the metadata field.
+ multiple (bool): Indicates whether multiple values are allowed.
+ value (T): The value stored in the field.
+ type (FieldType): The type of the field, automatically set based on T.
+ """
+
+ name: str
+ multiple: bool
+ value: T
+ type: FieldType = field(init=False)
+ expanded_value: dict[str, str] | None = field(default=None)
+
+ def __post_init__(self) -> None:
+ """
+ After initialization, determine the field type by calling the _set_type method.
+ """
+ self._set_type()
+
+ def _set_type(self) -> None:
+ """
+ Set the `type` attribute based on the field's value.
+
+ This method must be implemented by subclasses.
+
+ Raises:
+ NotImplementedError: If not implemented by a subclass.
+ """
+ raise NotImplementedError("Subclasses must implement the _set_type method.")
+
+ def to_dict(self) -> dict[str, Any]:
+ """
+ Convert the metadata field to a dictionary representation.
+
+ Returns:
+ dict[str, Any]: Dictionary representation of the metadata field.
+
+ Raises:
+ NotImplementedError: If not implemented by a subclass.
+ """
+ raise NotImplementedError("Subclasses must implement the to_dict method.")
+
+
+@dataclass
+class PrimitiveMetadataField(BaseMetadataField[str]):
+ """
+ Metadata field representing a primitive type (e.g., string) for Dataverse.
+ """
+
+ def _set_type(self) -> None:
+ self.type = FieldType.PRIMITIVE
+
+ def to_dict(self) -> dict[str, str | bool | dict[str, str]]:
+ """
+ Convert the primitive metadata field to a dictionary representation.
+
+ Returns:
+ dict[str, str | bool]: Dictionary with field properties.
+ """
+
+ if self.expanded_value:
+ return {
+ "typeName": self.name,
+ "typeClass": self.type.value,
+ "multiple": self.multiple,
+ "value": self.value,
+ "expandedValue": self.expanded_value,
+ }
+ else:
+ return {
+ "typeName": self.name,
+ "typeClass": self.type.value,
+ "multiple": self.multiple,
+ "value": self.value,
+ }
+
+
+@dataclass
+class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]):
+ """
+ Metadata field for controlled vocabulary values.
+ """
+
+ def _set_type(self) -> None:
+ self.type = FieldType.VOCABULARY
+
+ def to_dict(self) -> dict[str, Any]:
+ """
+ Convert the controlled vocabulary metadata field to a dictionary.
+
+ Returns:
+ dict[str, Any]: Dictionary representation.
+ """
+ return {
+ "typeName": self.name,
+ "typeClass": self.type.value,
+ "multiple": self.multiple,
+ "value": self.value,
+ }
+
+
+@dataclass
+class CompoundMetadataField(
+ BaseMetadataField[
+ Sequence[Sequence["PrimitiveMetadataField | ControlledVocabularyMetadataField"]]
+ ]
+):
+ """
+ Metadata field representing compound types, composed of multiple subfields.
+ """
+
+ def _set_type(self) -> None:
+ self.type = FieldType.COMPOUND
+
+ def to_dict(self) -> dict[str, Any]:
+ """
+ Convert the compound metadata field to a dictionary representation.
+
+ Returns:
+ dict[str, Any]: Dictionary representation of the compound field.
+ """
+ value_list: list[dict[str, Any]] = []
+ for outer_list in self.value:
+ field_dicts: list[dict[str, Any]] = []
+ for field_item in outer_list:
+ field_dicts.append({field_item.name: field_item.to_dict()})
+ value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts))
+
+ return {
+ "typeName": self.name,
+ "typeClass": self.type.value,
+ "multiple": self.multiple,
+ "value": value_list,
+ }
diff --git a/doi2dataset/core/models.py b/doi2dataset/core/models.py
new file mode 100644
index 0000000..f4dd95b
--- /dev/null
+++ b/doi2dataset/core/models.py
@@ -0,0 +1,221 @@
+"""
+Core data models for doi2dataset.
+
+This module contains the fundamental data classes used throughout the application
+for representing people, institutions, licenses, and abstracts.
+"""
+
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .metadata_fields import (
+ ControlledVocabularyMetadataField,
+ PrimitiveMetadataField,
+ )
+
+
+@dataclass
+class Institution:
+ """
+ Represents an institution or organization.
+
+ Attributes:
+ display_name (str): The name of the institution.
+ ror (str): Research Organization Registry identifier (optional).
+ """
+
+ display_name: str
+ ror: str = ""
+
+ def affiliation_field(self) -> "PrimitiveMetadataField":
+ """
+ Create a metadata field for the affiliation.
+
+ Returns:
+ PrimitiveMetadataField: A metadata field representing the institution,
+ using ROR ID when available.
+ """
+ from .metadata_fields import PrimitiveMetadataField
+
+ if self.ror:
+ expanded_value = {
+ "scheme": "http://www.grid.ac/ontology/",
+ "termName": self.display_name,
+ "@type": "https://schema.org/Organization",
+ }
+ return PrimitiveMetadataField(
+ "authorAffiliation", False, self.ror, expanded_value=expanded_value
+ )
+ else:
+ return PrimitiveMetadataField("authorAffiliation", False, self.display_name)
+
+
+@dataclass
+class Person:
+ """
+ Represents a person (e.g., an author or a PI).
+
+ Attributes:
+ family_name (str): Family name of the person.
+ given_name (str): Given name of the person.
+ orcid (str): ORCID identifier (optional).
+ email (str): Email address (optional).
+ affiliation (Institution): Affiliation of the person (optional).
+ """
+
+ family_name: str
+ given_name: str
+ orcid: str = ""
+ email: str = ""
+ affiliation: Institution | str = ""
+
+ def to_dict(self) -> dict[str, str | list[str] | dict[str, str]]:
+ """
+ Convert Person to a dictionary for JSON serialization.
+
+ Handles affiliations properly by checking if the affiliation
+ is an Institution object or a string.
+
+ Returns:
+ dict: A dictionary containing the person's information including
+ name, contact details, and affiliation.
+ """
+ return_dict: dict[str, str | list[str] | dict[str, str]] = {
+ "family_name": self.family_name,
+ "given_name": self.given_name,
+ "orcid": self.orcid,
+ "email": self.email,
+ }
+
+ if isinstance(self.affiliation, Institution):
+ if self.affiliation.ror:
+ return_dict["affiliation"] = self.affiliation.ror
+ elif self.affiliation.display_name:
+ return_dict["affiliation"] = self.affiliation.display_name
+ else:
+ return_dict["affiliation"] = ""
+ else:
+ return_dict["affiliation"] = self.affiliation if self.affiliation else ""
+
+ return return_dict
+
+ def format_name(self) -> str:
+ """
+ Format the name in 'Family, Given' order.
+
+ Returns:
+ str: Formatted name.
+ """
+ return f"{self.family_name}, {self.given_name}"
+
+ def author_fields(
+ self,
+ ) -> list["PrimitiveMetadataField | ControlledVocabularyMetadataField"]:
+ """
+ Build metadata fields for the author.
+
+ The method handles both Institution objects and string values for affiliations.
+ Different fields are generated depending on whether ORCID is available.
+
+ Returns:
+ list: List of metadata fields representing the author, including name,
+ affiliation, and optionally ORCID identifier information.
+ """
+ from .metadata_fields import (
+ ControlledVocabularyMetadataField,
+ PrimitiveMetadataField,
+ )
+
+ affiliation_field = None
+ if isinstance(self.affiliation, Institution):
+ affiliation_field = self.affiliation.affiliation_field()
+ else:
+ affiliation_field = PrimitiveMetadataField(
+ "authorAffiliation", False, self.affiliation
+ )
+
+ if self.orcid:
+ return [
+ PrimitiveMetadataField("authorName", False, self.format_name()),
+ affiliation_field,
+ ControlledVocabularyMetadataField(
+ "authorIdentifierScheme", False, "ORCID"
+ ),
+ PrimitiveMetadataField("authorIdentifier", False, self.orcid),
+ ]
+ else:
+ return [
+ PrimitiveMetadataField("authorName", False, self.format_name()),
+ affiliation_field,
+ ]
+
+ def dataset_contact_fields(self) -> list["PrimitiveMetadataField"]:
+ """
+ Generate metadata fields for dataset contact.
+
+ The method handles both Institution objects and string values for affiliations.
+ Creates fields for the contact name, affiliation, and email address.
+
+ Returns:
+ list: List of metadata fields for the dataset contact including name,
+ affiliation, and email address.
+ """
+ from .metadata_fields import PrimitiveMetadataField
+
+ affiliation_field = None
+ if isinstance(self.affiliation, Institution):
+ affiliation_field = self.affiliation.affiliation_field()
+ else:
+ affiliation_field = PrimitiveMetadataField(
+ "datasetContactAffiliation", False, self.affiliation
+ )
+
+ return [
+ PrimitiveMetadataField("datasetContactName", False, self.format_name()),
+ affiliation_field,
+ PrimitiveMetadataField("datasetContactEmail", False, self.email),
+ ]
+
+
+@dataclass
+class License:
+ """
+ Represents a license with name, URI, and short identifier.
+
+ Attributes:
+ name (str): The full name of the license.
+ uri (str): The license URI.
+ short (str): The short identifier of the license.
+ """
+
+ name: str
+ uri: str
+ short: str
+
+
+@dataclass
+class Abstract:
+ """
+ Represents an abstract with its text and source.
+
+ Attributes:
+ text (str): The abstract text.
+ source (str): The source of the abstract ('crossref', 'openalex', or 'none').
+ """
+
+ text: str
+ source: str
+
+ def __post_init__(self):
+ """
+ Validate that the abstract source is one of the allowed values.
+
+ Raises:
+ ValueError: If source is not one of the allowed values.
+ """
+ allowed_sources = ["crossref", "openalex", "none"]
+ if self.source not in allowed_sources:
+ raise ValueError(
+ f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}."
+ )
diff --git a/doi2dataset/processing/__init__.py b/doi2dataset/processing/__init__.py
new file mode 100644
index 0000000..fcdb515
--- /dev/null
+++ b/doi2dataset/processing/__init__.py
@@ -0,0 +1,18 @@
+"""
+Processing components for doi2dataset.
+
+This package contains the business logic components for processing DOIs,
+building citations, processing metadata, and handling various data transformations.
+"""
+
+from .citation import CitationBuilder
+from .metadata import MetadataProcessor
+from .utils import NameProcessor, PIFinder, SubjectMapper
+
+__all__ = [
+ "NameProcessor",
+ "PIFinder",
+ "SubjectMapper",
+ "CitationBuilder",
+ "MetadataProcessor",
+]
diff --git a/doi2dataset/processing/citation.py b/doi2dataset/processing/citation.py
new file mode 100644
index 0000000..9e66732
--- /dev/null
+++ b/doi2dataset/processing/citation.py
@@ -0,0 +1,292 @@
+"""
+Citation processing for doi2dataset.
+
+This module contains the CitationBuilder class which handles building various
+citation-related metadata fields from API data.
+"""
+
+# Suppress the warning from idutils about pkg_resources
+import warnings
+from typing import Any
+
+from ..core.config import Config
+from ..core.metadata_fields import PrimitiveMetadataField
+from ..core.models import Institution, Person
+from ..processing.utils import NameProcessor, PIFinder
+
+warnings.filterwarnings(
+ "ignore", message=".*pkg_resources.*", category=DeprecationWarning
+)
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ from idutils.normalizers import normalize_orcid, normalize_pmid
+
+
+class CitationBuilder:
+ """
+ Builds various citation-related metadata fields.
+ """
+
+ def __init__(
+ self, data: dict[str, Any], doi: str, pi_finder: PIFinder, ror: bool = False
+ ) -> None:
+ """
+ Initialize the CitationBuilder with data, DOI, and a PIFinder.
+
+ Args:
+ data (dict[str, Any]): Metadata from an external source.
+ doi (str): The DOI.
+ pi_finder (PIFinder): Instance to find PI information.
+ ror (bool): Whether to use ROR identifiers for institutions.
+ """
+ self.data = data
+ self.doi = doi
+ self.ror = ror
+ self.pi_finder = pi_finder
+
+ def build_other_ids(self) -> list[list[PrimitiveMetadataField]]:
+ """
+ Build metadata fields for other identifiers (e.g., DOI, PMID).
+
+ Returns:
+ list[list[PrimitiveMetadataField]]: Nested list of identifier metadata fields.
+ """
+ other_ids = [
+ [
+ PrimitiveMetadataField("otherIdAgency", False, "doi"),
+ PrimitiveMetadataField("otherIdValue", False, self.doi),
+ ]
+ ]
+
+ if pmid := self.data.get("ids", {}).get("pmid"):
+ try:
+ normalized_pmid = normalize_pmid(pmid)
+ other_ids.append(
+ [
+ PrimitiveMetadataField("otherIdAgency", False, "pmid"),
+ PrimitiveMetadataField("otherIdValue", False, normalized_pmid),
+ ]
+ )
+ except ValueError:
+ pass
+
+ return other_ids
+
+ def build_grants(self) -> list[list[PrimitiveMetadataField]]:
+ """
+ Build metadata fields for grants.
+
+ Returns:
+ list[list[PrimitiveMetadataField]]: Nested list of grant metadata fields.
+ """
+ config = Config()
+ default_grants = config.DEFAULT_GRANTS
+
+ grants: list[list[PrimitiveMetadataField]] = []
+
+ for grant in default_grants:
+ grants.append(
+ [
+ PrimitiveMetadataField("grantNumberAgency", False, grant["funder"]),
+ PrimitiveMetadataField("grantNumberValue", False, grant["id"]),
+ ]
+ )
+
+ for grant in self.data.get("grants", []):
+ grant_funder = grant.get("funder_display_name", {})
+ grant_id = grant.get("award_id", {})
+ if not grant_funder or not grant_id:
+ continue
+
+ grants.append(
+ [
+ PrimitiveMetadataField("grantNumberAgency", False, grant_funder),
+ PrimitiveMetadataField("grantNumberValue", False, grant_id),
+ ]
+ )
+
+ return grants
+
+ def build_authors(self) -> tuple[list[Person], list[Person]]:
+ """
+ Build lists of authors and corresponding authors from the metadata.
+
+ Returns:
+ tuple: (authors, corresponding_authors)
+ """
+ authors: list[Person] = []
+ corresponding_authors: list[Person] = []
+ for authorship in self.data.get("authorships", []):
+ author = authorship.get("author", {})
+ if not author:
+ continue
+
+ author_person = self._process_author(author, authorship)
+ authors.append(author_person)
+
+ if authorship.get("is_corresponding"):
+ corresponding_entry = self._process_corresponding_author(
+ author_person, authorship
+ )
+ if corresponding_entry:
+ corresponding_authors.append(corresponding_entry)
+
+ return authors, corresponding_authors
+
+ def _process_author(
+ self, author: dict[str, Any], authorship: dict[str, Any]
+ ) -> Person:
+ """
+ Process author data and return a Person instance.
+
+ Args:
+ author (dict[str, Any]): Author data.
+ authorship (dict[str, Any]): Authorship metadata.
+
+ Returns:
+ Person: Processed author
+ """
+ display_name = author.get("display_name", "")
+ given_name, family_name = NameProcessor.split_name(display_name)
+
+ person = Person(family_name, given_name)
+
+ if affiliations := authorship.get("affiliations"):
+ affiliation = Institution(
+ affiliations[0].get("raw_affiliation_string", "").strip()
+ )
+
+ person.affiliation = affiliation
+
+ if self.ror:
+ if institutions := authorship.get("institutions"):
+ institution = institutions[0]
+ if institution.get("ror"):
+ affiliation = Institution(
+ institution.get("display_name"), institution.get("ror")
+ )
+
+ person.affiliation = affiliation
+
+ if orcid := author.get("orcid"):
+ person.orcid = normalize_orcid(orcid)
+
+ return person
+
+ def _process_corresponding_author(
+ self, author: Person, authorship: dict[str, Any]
+ ) -> Person | None:
+ """
+ Identify the corresponding author based on provided PI information.
+
+ Args:
+ author (Person): The author.
+ authorship (dict[str, Any]): Authorship metadata.
+
+ Returns:
+ Person | None: The corresponding author, or None if not found.
+ """
+ pi_matches = self.pi_finder.find_by_orcid([author])
+ return pi_matches[0] if pi_matches else None
+
+ def build_topics(self) -> list[list[PrimitiveMetadataField]]:
+ """
+ Build metadata fields for topics based on a threshold score.
+
+ Returns:
+ list[list[PrimitiveMetadataField]]: Nested list of topic metadata fields.
+ """
+ topics: list[list[PrimitiveMetadataField]] = []
+
+ for topic in self.data.get("topics", []):
+ if topic.get("score", 0) >= 0.8:
+ topic_class_value_field = PrimitiveMetadataField(
+ "topicClassValue", False, topic.get("display_name")
+ )
+ topic_class_vocab_field = PrimitiveMetadataField(
+ "topicClassVocab", False, "OpenAlex"
+ )
+ topic_class_vocab_uri_field = PrimitiveMetadataField(
+ "topicClassVocabURI", False, topic.get("id")
+ )
+
+ topics.append(
+ [
+ topic_class_value_field,
+ topic_class_vocab_field,
+ topic_class_vocab_uri_field,
+ ]
+ )
+
+ return topics
+
+ def build_keywords(self) -> list[list[PrimitiveMetadataField]]:
+ """
+ Build metadata fields for keywords from both regular keywords and MeSH terms.
+
+ Returns:
+ list[list[PrimitiveMetadataField]]: Nested list of keyword metadata fields.
+ """
+ keywords: list[list[PrimitiveMetadataField]] = []
+
+ for keyword in self.data.get("keywords", []):
+ # Filter out possibly unrelated keywords (low score)
+ if keyword.get("score", 0) >= 0.5:
+ keyword_value_field = PrimitiveMetadataField(
+ "keywordValue", False, keyword["display_name"]
+ )
+ keywords.append([keyword_value_field])
+
+ mesh_base_url = "http://id.nlm.nih.gov/mesh"
+ for mesh in self.data.get("mesh", []):
+ url = f"{mesh_base_url}/{mesh['descriptor_ui']}"
+ if mesh.get("qualifier_ui"):
+ url = f"{url}{mesh['qualifier_ui']}"
+
+ keyword_value_field = PrimitiveMetadataField(
+ "keywordValue", False, mesh["descriptor_name"]
+ )
+ keyword_term_uri_field = PrimitiveMetadataField(
+ "keywordTermURI", False, url
+ )
+ keyword_vocabulary_field = PrimitiveMetadataField(
+ "keywordVocabulary", False, "MeSH"
+ )
+ keyword_vocabulary_uri_field = PrimitiveMetadataField(
+ "keywordVocabularyURI", False, mesh_base_url
+ )
+
+ keywords.append(
+ [
+ keyword_value_field,
+ keyword_term_uri_field,
+ keyword_vocabulary_field,
+ keyword_vocabulary_uri_field,
+ ]
+ )
+
+ return keywords
+
+ def _get_publication_year(self, data: dict[str, Any]) -> str:
+ """
+ Extract publication year from data, with fallbacks.
+
+ Args:
+ data (dict[str, Any]): Publication data.
+
+ Returns:
+ str: Publication year as string.
+ """
+ # Try publication_year first
+ if pub_year := data.get("publication_year"):
+ return str(pub_year)
+
+ # Fallback to publication_date
+ if pub_date := data.get("publication_date"):
+ try:
+ return pub_date.split("-")[0]
+ except (AttributeError, IndexError):
+ pass
+
+ # Final fallback
+ return "Unknown"
diff --git a/doi2dataset/processing/metadata.py b/doi2dataset/processing/metadata.py
new file mode 100644
index 0000000..ce122d9
--- /dev/null
+++ b/doi2dataset/processing/metadata.py
@@ -0,0 +1,474 @@
+"""
+Metadata processing for doi2dataset.
+
+This module contains the MetadataProcessor class which handles the complete workflow
+of processing DOIs: fetching data, building metadata, and optionally uploading to Dataverse.
+"""
+
+import json
+import warnings
+from pathlib import Path
+from typing import Any
+
+from rich.console import Console
+from rich.progress import Progress, TaskID
+
+from ..api.client import APIClient
+from ..api.processors import AbstractProcessor, LicenseProcessor
+from ..core.config import Config
+from ..core.metadata_fields import (
+ CompoundMetadataField,
+ ControlledVocabularyMetadataField,
+ PrimitiveMetadataField,
+)
+from ..core.models import Person
+from ..processing.citation import CitationBuilder
+from ..processing.utils import NameProcessor, PIFinder, SubjectMapper
+
+# Suppress warnings from idutils
+warnings.filterwarnings(
+ "ignore", message=".*pkg_resources.*", category=DeprecationWarning
+)
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ from idutils.normalizers import normalize_doi
+ from idutils.validators import is_doi
+
+
+class MetadataProcessor:
+ """
+ Processes metadata for a given DOI by fetching data from OpenAlex,
+ building metadata blocks, and optionally uploading the dataset.
+ """
+
+ # Icons for console output - TODO: should be moved to a constants module
+ ICONS = {
+ "processing": "⚙️",
+ "success": "✅",
+ "error": "❌",
+ "warning": "⚠️",
+ "info": "ℹ️",
+ "upload": "📤",
+ "save": "💾",
+ }
+
+ def __init__(
+ self,
+ doi: str,
+ depositor: str | None = None,
+ output_path: Path | None = None,
+ default_subject: str = "Other",
+ contact_mail: str | None = None,
+ upload: bool = False,
+ ror: bool = False,
+ console: Console | None = None,
+ progress: Progress | None = None,
+ task_id: TaskID | None = None,
+ ) -> None:
+ """
+ Initialize the MetadataProcessor with configuration and processing options.
+
+ Args:
+ doi (str): The DOI to process.
+ depositor (str | None): Depositor name.
+ output_path (Path | None): Path where metadata will be saved.
+ default_subject (str): Default subject.
+ contact_mail (str | None): Contact email address.
+ ror (bool): Whether to use ROR id for affiliation
+ upload (bool): Whether to upload metadata.
+ console (Console | None): Rich console instance.
+ progress (Progress | None): Progress bar instance.
+ task_id (TaskID | None): Task ID for progress updates.
+ """
+ self.console = console or Console()
+ try:
+ self.doi = self._validate_doi(doi)
+ except ValueError as e:
+ self.console.print(f"Error: {str(e)}", style="error")
+ raise
+ self.depositor = depositor
+ self.output_path = output_path
+ self.default_subject = default_subject
+ self.api_client = APIClient(contact_mail)
+ config = Config()
+ pi_objects = [Person(**pi) for pi in config.PIS]
+ self.pi_finder = PIFinder(pi_objects)
+ self.upload = upload
+ self.ror = ror
+ self.progress = progress
+ self.task_id = task_id
+
+ @staticmethod
+ def _validate_doi(doi: str) -> str:
+ """
+ Validate and normalize a DOI.
+
+ Args:
+ doi (str): The DOI to validate.
+
+ Returns:
+ str: Normalized DOI.
+
+ Raises:
+ ValueError: If the DOI is invalid.
+ """
+ if not is_doi(doi):
+ raise ValueError(f"Invalid DOI: {doi}")
+ return normalize_doi(doi)
+
+ def _update_progress(self) -> None:
+ """
+ Advance the progress bar if enabled.
+ """
+ if self.progress and self.task_id is not None:
+ self.progress.advance(self.task_id)
+
+ def process(self) -> dict[str, Any]:
+ """
+ Process the DOI: fetch data, build metadata, optionally upload, and save output.
+
+ Returns:
+ dict[str, Any]: The constructed metadata dictionary.
+ """
+ self.console.print(
+ f"{self.ICONS['processing']} Processing DOI: {self.doi}", style="info"
+ )
+
+ data = self._fetch_data()
+ self._update_progress()
+
+ metadata = self._build_metadata(data)
+ self._update_progress()
+
+ if self.upload:
+ self._upload_data(metadata)
+ self._update_progress()
+
+ self._save_output(metadata)
+ self._update_progress()
+
+ self.console.print(
+ f"\n{self.ICONS['success']} Successfully processed: {self.doi}\n",
+ style="success",
+ )
+ return metadata
+
+ def _upload_data(self, metadata: dict[str, Any]) -> dict[str, Any]:
+ """
+ Upload the metadata to Dataverse.
+
+ Args:
+ metadata (dict[str, Any]): The metadata to upload.
+
+ Returns:
+ dict[str, Any]: The response from the Dataverse API.
+
+ Raises:
+ ValueError: If the upload fails.
+ """
+ config = Config()
+
+ token = config.DATAVERSE["api_token"]
+ client = APIClient(token=token)
+ url = f"{config.DATAVERSE['url']}/api/dataverses/{config.DATAVERSE['dataverse']}/datasets?doNotValidate=true"
+ auth = (config.DATAVERSE["auth_user"], config.DATAVERSE["auth_password"])
+
+ response = client.make_request(url, method="POST", auth=auth, json=metadata)
+
+ if response is None or response.status_code != 201:
+ self.console.print(
+ f"\n{self.ICONS['error']} Failed to upload to Dataverse: {url}",
+ style="error",
+ )
+ raise ValueError(f"Failed to upload to Dataverse: {url}")
+ else:
+ perma = response.json().get("data", {}).get("persistentId", "")
+ self.console.print(
+ f"{self.ICONS['upload']} Dataset uploaded to: {config.DATAVERSE['dataverse']} with ID {perma}",
+ style="info",
+ )
+
+ return response.json()
+
+ def _fetch_data(self) -> dict[str, Any]:
+ """
+ Fetch metadata from OpenAlex for the given DOI.
+
+ Returns:
+ dict[str, Any]: The fetched data.
+
+ Raises:
+ ValueError: If data fetching fails.
+ """
+ url = f"https://api.openalex.org/works/https://doi.org/{self.doi}"
+ response = self.api_client.make_request(url)
+
+ if response is None or response.status_code != 200:
+ self.console.print(
+ f"\n{self.ICONS['error']} Failed to fetch data for DOI: {self.doi}",
+ style="error",
+ )
+ raise ValueError(f"Failed to fetch data for DOI: {self.doi}")
+
+ return response.json()
+
+ def _build_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
+ """
+ Construct the complete metadata dictionary from fetched data.
+
+ Args:
+ data (dict[str, Any]): The data retrieved from OpenAlex.
+
+ Returns:
+ dict[str, Any]: The complete metadata dictionary.
+ """
+ license_info = LicenseProcessor.process_license(data)
+ abstract_processor = AbstractProcessor(self.api_client, self.console)
+ abstract = abstract_processor.get_abstract(self.doi, data, license_info)
+ citation_builder = CitationBuilder(data, self.doi, self.pi_finder, self.ror)
+
+ authors, corresponding_authors = citation_builder.build_authors()
+
+ author_fields: list[
+ list[PrimitiveMetadataField | ControlledVocabularyMetadataField]
+ ] = []
+ corresponding_author_fields: list[list[PrimitiveMetadataField]] = []
+ for author in authors:
+ author_fields.append(author.author_fields())
+
+ if not corresponding_authors:
+ self.console.print(
+ f"{self.ICONS['warning']} No corresponding authors explicitly declared; PIs are used as a fallback!",
+ style="warning",
+ )
+ pis = self._get_involved_pis(data)
+ corresponding_authors: list[Person] = []
+ for pi in pis:
+ corresponding_authors.append(pi)
+
+ for corresponding_author in corresponding_authors:
+ corresponding_author_fields.append(
+ corresponding_author.dataset_contact_fields()
+ )
+
+ description = self._build_description(data, abstract)
+
+ grants = citation_builder.build_grants()
+
+ return_dict: dict[str, Any] = {
+ "datasetVersion": {
+ "metadataBlocks": {
+ "citation": {
+ "fields": [
+ PrimitiveMetadataField(
+ "title", False, data.get("title", "")
+ ).to_dict(),
+ PrimitiveMetadataField(
+ "distributionDate",
+ False,
+ data.get("publication_date", ""),
+ ).to_dict(),
+ CompoundMetadataField(
+ "otherId", True, citation_builder.build_other_ids()
+ ).to_dict(),
+ CompoundMetadataField(
+ "dsDescription",
+ True,
+ [
+ [
+ PrimitiveMetadataField(
+ "dsDescriptionValue", False, description
+ )
+ ]
+ ],
+ ).to_dict(),
+ ControlledVocabularyMetadataField(
+ "subject",
+ True,
+ SubjectMapper.map_subjects([self.default_subject]),
+ ).to_dict(),
+ CompoundMetadataField(
+ "topicClassification",
+ True,
+ citation_builder.build_topics(),
+ ).to_dict(),
+ CompoundMetadataField(
+ "keyword", True, citation_builder.build_keywords()
+ ).to_dict(),
+ PrimitiveMetadataField(
+ "depositor",
+ False,
+ self.depositor
+ or data.get("primary_location", {})
+ .get("source", {})
+ .get("display_name", ""),
+ ).to_dict(),
+ PrimitiveMetadataField(
+ "alternativeURL", False, f"https://doi.org/{self.doi}"
+ ).to_dict(),
+ CompoundMetadataField(
+ "author", True, author_fields
+ ).to_dict(),
+ CompoundMetadataField(
+ "datasetContact", True, corresponding_author_fields
+ ).to_dict(),
+ CompoundMetadataField(
+ "grantNumber", True, grants
+ ).to_dict(),
+ ],
+ "displayName": "Citation Metadata",
+ }
+ },
+ "files": [],
+ }
+ }
+
+ if license_info.name:
+ return_dict["datasetVersion"]["license"] = {
+ "name": license_info.name,
+ "uri": license_info.uri,
+ }
+ else:
+ return_dict["datasetVersion"]["termsOfUse"] = (
+ f"All rights reserved. Copyright © {self._get_publication_year(data)}, [TODO: Insert copyright holder here!]"
+ )
+
+ return return_dict
+
+ def _build_description(self, data: dict[str, Any], abstract) -> str:
+ """
+ Build the description field by combining a header and the abstract.
+
+ Args:
+ data (dict[str, Any]): The metadata.
+ abstract: The abstract object.
+
+ Returns:
+ str: The full description.
+ """
+ head = self._build_description_head(data)
+ return f"{head}{abstract.text}"
+
+ def _build_description_head(self, data: dict[str, Any]) -> str:
+ """
+ Build the header for the description based on publication details.
+
+ Args:
+ data (dict[str, Any]): The metadata.
+
+ Returns:
+ str: The HTML header string.
+ """
+ journal = data.get("primary_location", {}).get("source", {}).get("display_name")
+ publication_date = data.get("publication_date")
+ volume = data.get("biblio", {}).get("volume")
+ issue = data.get("biblio", {}).get("issue")
+ doc_type = data.get("type")
+
+ if all([journal, publication_date, volume, issue, doc_type]):
+ return f"This {doc_type} was published on {publication_date} in {journal} {volume}({issue})
"
+ elif all([journal, publication_date, doc_type]):
+ return f"This {doc_type} was published on {publication_date} in {journal}
"
+
+ self.console.print(
+ f"{self.ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)",
+ style="warning",
+ )
+ return ""
+
+ def _get_publication_year(self, data: dict[str, Any]) -> str | int:
+ """
+ Extract the publication year from the metadata.
+
+ Args:
+ data (dict[str, Any]): The metadata.
+
+ Returns:
+ str | int: The publication year or empty string.
+ """
+ return data.get("publication_year", "")
+
+ def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]:
+ """
+ Identify involved principal investigators from the metadata for use as fallback
+ corresponding authors.
+
+ This method matches authors in the publication metadata against the configured
+ PIs and returns matching PIs. It is used as a fallback when no corresponding
+ authors are explicitly declared in the publication metadata.
+
+ Args:
+ data (dict[str, Any]): The metadata from OpenAlex.
+
+ Returns:
+ list[Person]: List of matching PIs for use as corresponding authors.
+ """
+ involved_pis: list[Person] = []
+ authors_in_publication = []
+
+ # Build list of authors from publication
+ for authorship in data.get("authorships", []):
+ author = authorship.get("author", {})
+ if not author:
+ continue
+
+ display_name = author.get("display_name", "")
+ given_name, family_name = NameProcessor.split_name(display_name)
+
+ person = Person(family_name, given_name)
+ if orcid := author.get("orcid"):
+ person.orcid = orcid
+
+ authors_in_publication.append(person)
+
+ # Find PIs that match authors in the publication
+ involved_pis = self.pi_finder.find_by_orcid(authors_in_publication)
+
+ return involved_pis
+
+ def _save_output(self, metadata: dict[str, Any]) -> None:
+ """
+ Save the generated metadata to a file or print it to the console.
+
+ Args:
+ metadata (dict[str, Any]): The metadata to save.
+ """
+ if self.output_path:
+ try:
+ # Custom JSON encoder to handle custom objects
+ class CustomEncoder(json.JSONEncoder):
+ """
+ Custom JSON encoder that handles objects with to_dict method.
+
+ This allows for proper serialization of custom classes like
+ Institution and Person by calling their to_dict method when
+ available.
+
+ Args:
+ o: The object to serialize.
+
+ Returns:
+ A JSON-serializable representation of the object.
+ """
+
+ def default(self, o: Any) -> Any:
+ if hasattr(o, "to_dict"):
+ return o.to_dict()
+ return super().default(o)
+
+ with open(self.output_path, "w", encoding="utf-8") as f:
+ json.dump(
+ metadata, f, indent=4, ensure_ascii=False, cls=CustomEncoder
+ )
+ self.console.print(
+ f"{self.ICONS['save']} Metadata saved in: {self.output_path}",
+ style="info",
+ )
+ except Exception as e:
+ self.console.print(
+ f"{self.ICONS['error']} Error saving metadata: {str(e)}\n",
+ style="error",
+ )
+ raise
+ else:
+ self.console.print(metadata)
diff --git a/doi2dataset/processing/utils.py b/doi2dataset/processing/utils.py
new file mode 100644
index 0000000..3f1dd2b
--- /dev/null
+++ b/doi2dataset/processing/utils.py
@@ -0,0 +1,289 @@
+"""
+Processing utilities for doi2dataset.
+
+This module contains utility classes and functions used for processing
+names, finding PIs, mapping subjects, and other business logic operations.
+"""
+
+import unicodedata
+import warnings
+from typing import Any
+
+from ..core.models import Person
+
+# Suppress warnings from idutils
+warnings.filterwarnings(
+ "ignore", message=".*pkg_resources.*", category=DeprecationWarning
+)
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ from idutils.normalizers import normalize_orcid
+
+
+class NameProcessor:
+ """
+ Provides utility methods for processing names.
+ """
+
+ @staticmethod
+ def normalize_string(s: str) -> str:
+ """
+ Normalize a string using Unicode NFKD normalization and convert to ASCII.
+
+ Args:
+ s (str): The string to normalize.
+
+ Returns:
+ str: The normalized string.
+ """
+ return (
+ unicodedata.normalize("NFKD", s.lower())
+ .encode("ASCII", "ignore")
+ .decode("ASCII")
+ )
+
+ @staticmethod
+ def split_name(full_name: str) -> tuple[str, str]:
+ """
+ Split a full name into given and family names.
+
+ Args:
+ full_name (str): The full name (e.g., "Doe, John" or "John Doe").
+
+ Returns:
+ tuple[str, str]: A tuple (given_name, family_name).
+ """
+ if "," in full_name:
+ surname, given_name = full_name.split(",", 1)
+ return given_name.strip(), surname.strip()
+
+ parts = full_name.strip().split()
+ if len(parts) == 1:
+ return "", parts[0]
+
+ return " ".join(parts[:-1]), parts[-1]
+
+
+class PIFinder:
+ """
+ Finds principal investigators (PIs) among a list of Person objects.
+ """
+
+ def __init__(self, pis: list[Person]) -> None:
+ """
+ Initialize with a list of Person objects representing potential PIs.
+
+ Args:
+ pis (list[Person]): List of Person objects.
+ """
+ self.pis = pis
+
+ def find_by_orcid(self, authors: list[Person]) -> list[Person]:
+ """
+ Find PIs by ORCID identifier among the authors.
+
+ Args:
+ authors (list[Person]): List of author Person objects.
+
+ Returns:
+ list[Person]: List of Person objects that are PIs based on ORCID matching.
+ """
+ if not self.pis or not authors:
+ return []
+
+ pi_orcids = {pi.orcid for pi in self.pis if pi.orcid}
+ if not pi_orcids:
+ return []
+
+ return [author for author in authors if author.orcid in pi_orcids]
+
+ def find_corresponding_authors(self, authors: list[Person]) -> list[Person]:
+ """
+ Find corresponding authors by checking for email addresses and PI matching.
+
+ Args:
+ authors (list[Person]): List of author Person objects.
+
+ Returns:
+ list[Person]: List of corresponding authors.
+ """
+ # First, try to find authors with email addresses
+ authors_with_email = [author for author in authors if author.email]
+
+ if authors_with_email:
+ # If we have PIs configured, prefer PI matches
+ pi_matches = self.find_by_orcid(authors_with_email)
+ if pi_matches:
+ return pi_matches
+
+ # Otherwise return all authors with email addresses
+ return authors_with_email
+
+ # Fallback: look for PI matches even without email
+ pi_matches = self.find_by_orcid(authors)
+ if pi_matches:
+ return pi_matches
+
+ # Last resort: return first author if no other criteria match
+ return authors[:1] if authors else []
+
+ def find_pi(
+ self,
+ family_name: str | None = None,
+ given_name: str | None = None,
+ orcid: str | None = None,
+ ) -> Person | None:
+ """
+ Find a PI by name and/or ORCID.
+
+ Args:
+ family_name (str | None): Family name to match.
+ given_name (str | None): Given name to match.
+ orcid (str | None): ORCID to match.
+
+ Returns:
+ Person | None: The matched PI or None.
+ """
+ if orcid:
+ return self._find_by_orcid(orcid)
+
+ # Fallback to name matching if no ORCID
+ for person in self.pis:
+ name_match = True
+ if family_name and person.family_name.lower() != family_name.lower():
+ name_match = False
+ if given_name and person.given_name.lower() != given_name.lower():
+ name_match = False
+ if name_match:
+ return person
+
+ return None
+
+ def _find_by_orcid(self, orcid: str) -> Person | None:
+ """
+ Find a PI by ORCID.
+
+ Args:
+ orcid (str): Normalized ORCID.
+
+ Returns:
+ Person | None: The matched PI or None.
+ """
+ try:
+ normalized_orcid = normalize_orcid(orcid)
+ for person in self.pis:
+ if person.orcid and normalize_orcid(person.orcid) == normalized_orcid:
+ return person
+ except Exception:
+ # If ORCID normalization fails, try direct string comparison
+ for person in self.pis:
+ if person.orcid == orcid:
+ return person
+ return None
+
+
+class SubjectMapper:
+ """
+ Maps subject names from input data to controlled vocabulary.
+ """
+
+ CONTROLLED_VOCAB = {
+ "Agricultural Sciences": "Agricultural Sciences",
+ "Arts and Humanities": "Arts and Humanities",
+ "Astronomy": "Astronomy and Astrophysics",
+ "Astrophysics": "Astronomy and Astrophysics",
+ "Business": "Business and Management",
+ "Business and Management": "Business and Management",
+ "Chemistry": "Chemistry",
+ "Computer Science": "Computer and Information Science",
+ "Computer and Information Science": "Computer and Information Science",
+ "Earth Sciences": "Earth and Environmental Sciences",
+ "Earth and Environmental Sciences": "Earth and Environmental Sciences",
+ "Engineering": "Engineering",
+ "Law": "Law",
+ "Life Sciences": "Medicine, Health and Life Sciences",
+ "Mathematical Sciences": "Mathematical Sciences",
+ "Mathematics": "Mathematical Sciences",
+ "Medicine": "Medicine, Health and Life Sciences",
+ "Medicine, Health and Life Sciences": "Medicine, Health and Life Sciences",
+ "Physics": "Physics",
+ "Psychology": "Psychology",
+ "Social Sciences": "Social Sciences",
+ "Other": "Other",
+ }
+
+ @classmethod
+ def map_subjects(cls, subjects: list[str]) -> list[str]:
+ """
+ Map a list of subject strings to controlled vocabulary terms.
+
+ Args:
+ subjects (list[str]): List of subject strings to map.
+
+ Returns:
+ list[str]: List of mapped controlled vocabulary terms.
+ """
+ mapped = []
+ for subject in subjects:
+ # Try exact match first
+ if subject in cls.CONTROLLED_VOCAB:
+ mapped_subject = cls.CONTROLLED_VOCAB[subject]
+ if mapped_subject not in mapped:
+ mapped.append(mapped_subject)
+ else:
+ # Try partial matching
+ subject_lower = subject.lower()
+ for key, value in cls.CONTROLLED_VOCAB.items():
+ if (
+ subject_lower in key.lower()
+ or key.lower() in subject_lower
+ and value not in mapped
+ ):
+ mapped.append(value)
+ break
+ else:
+ # No match found, add "Other" if not already present
+ if "Other" not in mapped:
+ mapped.append("Other")
+
+ return mapped if mapped else ["Other"]
+
+ @classmethod
+ def map_single_subject(cls, subject: str) -> str:
+ """
+ Map a single subject string to a controlled vocabulary term.
+
+ Args:
+ subject (str): Subject string to map.
+
+ Returns:
+ str: Mapped controlled vocabulary term.
+ """
+ mapped_subjects = cls.map_subjects([subject])
+ return mapped_subjects[0] if mapped_subjects else "Other"
+
+ @classmethod
+ def get_subjects(
+ cls, data: dict[str, Any], fallback_subject: str = "Other"
+ ) -> list[str]:
+ """
+ Extract and map subjects from input data.
+
+ Args:
+ data (dict[str, Any]): The input metadata.
+ fallback_subject (str): Fallback subject if none found.
+
+ Returns:
+ list[str]: List of mapped subject names.
+ """
+
+ topics = data.get("topics", [])
+ subject_collection: list[str] = []
+
+ for topic in topics:
+ for field_type in ["subfield", "field", "domain"]:
+ if field_name := topic.get(field_type, {}).get("display_name"):
+ subject_collection.append(field_name)
+
+ mapped_subjects = cls.map_subjects(subject_collection)
+ return mapped_subjects if mapped_subjects else [fallback_subject]
diff --git a/doi2dataset/utils/__init__.py b/doi2dataset/utils/__init__.py
new file mode 100644
index 0000000..e9fa8c7
--- /dev/null
+++ b/doi2dataset/utils/__init__.py
@@ -0,0 +1,22 @@
+"""
+Utility functions and helpers for doi2dataset.
+
+This package contains validation functions, string processing utilities,
+and other helper functions used throughout the application.
+"""
+
+from .validation import (
+ normalize_string,
+ sanitize_filename,
+ split_name,
+ validate_doi,
+ validate_email_address,
+)
+
+__all__ = [
+ "validate_doi",
+ "validate_email_address",
+ "sanitize_filename",
+ "split_name",
+ "normalize_string",
+]
diff --git a/doi2dataset/utils/validation.py b/doi2dataset/utils/validation.py
new file mode 100644
index 0000000..f336351
--- /dev/null
+++ b/doi2dataset/utils/validation.py
@@ -0,0 +1,127 @@
+"""
+Validation utilities for doi2dataset.
+
+This module provides validation functions for DOIs, email addresses,
+and other data validation needs.
+"""
+
+import warnings
+
+import dns.resolver
+from email_validator import EmailNotValidError, validate_email
+
+# Suppress the warning from idutils about pkg_resources
+warnings.filterwarnings(
+ "ignore", message=".*pkg_resources.*", category=DeprecationWarning
+)
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ from idutils.validators import is_doi
+
+
+def validate_doi(doi: str) -> bool:
+ """
+ Validate a DOI using the idutils library.
+
+ Args:
+ doi (str): The DOI to validate.
+
+ Returns:
+ bool: True if the DOI is valid, False otherwise.
+ """
+ return is_doi(doi)
+
+
+def validate_email_address(email: str) -> bool:
+ """
+ Validate an email address and ensure its domain has an MX record.
+
+ Args:
+ email (str): The email address to validate.
+
+ Returns:
+ bool: True if the email address is valid and its domain resolves, otherwise False.
+ """
+ try:
+ # Basic validation
+ valid = validate_email(email)
+ email = valid.normalized
+
+ # Check domain has MX record
+ domain = email.split("@")[1]
+ dns.resolver.resolve(domain, "MX")
+
+ return True
+ except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
+ return False
+
+
+def sanitize_filename(doi: str) -> str:
+ """
+ Convert DOI to a valid filename using only alphanumeric characters and underscores.
+
+ Args:
+ doi (str): The DOI to sanitize.
+
+ Returns:
+ str: Sanitized filename string.
+ """
+ # Replace non-alphanumeric characters with underscores
+ sanitized = "".join(c if c.isalnum() else "_" for c in doi)
+ # Remove consecutive underscores
+ while "__" in sanitized:
+ sanitized = sanitized.replace("__", "_")
+ # Remove leading/trailing underscores
+ return sanitized.strip("_")
+
+
+def split_name(full_name: str) -> tuple[str, str]:
+ """
+ Split a full name into given and family names.
+
+ Args:
+ full_name (str): The full name (e.g., "Doe, John" or "John Doe").
+
+ Returns:
+ tuple[str, str]: A tuple (given_name, family_name).
+ """
+ normalized = normalize_string(full_name)
+
+ if "," in normalized:
+ # Format: "Doe, John"
+ parts = normalized.split(",", 1)
+ family_name = parts[0].strip()
+ given_name = parts[1].strip()
+ else:
+ # Format: "John Doe" - assume last word is family name
+ parts = normalized.split()
+ if len(parts) == 1:
+ # Only one name provided
+ given_name = parts[0]
+ family_name = ""
+ else:
+ given_name = " ".join(parts[:-1])
+ family_name = parts[-1]
+
+ return given_name, family_name
+
+
+def normalize_string(s: str) -> str:
+ """
+ Normalize a string using Unicode NFKD normalization and convert to ASCII.
+
+ Args:
+ s (str): The string to normalize.
+
+ Returns:
+ str: Normalized string.
+ """
+ import unicodedata
+
+ # Normalize Unicode characters to decomposed form
+ normalized = unicodedata.normalize("NFKD", s)
+
+ # Convert to ASCII, ignoring non-ASCII characters
+ ascii_str = normalized.encode("ascii", "ignore").decode("ascii")
+
+ return ascii_str.strip()