diff --git a/doi2dataset.py b/doi2dataset.py
deleted file mode 100755
index aeaaeaf..0000000
--- a/doi2dataset.py
+++ /dev/null
@@ -1,2103 +0,0 @@
-#!/bin/env python
-"""
-doi2dataset.py
-
-This script processes DOIs to generate metadata for Dataverse datasets.
-It supports fetching data from OpenAlex and CrossRef, mapping metadata fields,
-processing author and grant information, and optionally uploading the metadata
-to a Dataverse instance.
-
-Usage:
- python doi2dataset.py [options] doi1 doi2 ...
-
-Options:
- -f, --file File containing DOIs (one per line)
- -o, --output-dir Output directory for metadata files (default: current directory)
- -d, --depositor Name of the depositor
- -s, --subject Default subject (default: "Medicine, Health and Life Sciences")
- -m, --contact-mail Contact email address
- -u, --upload Upload metadata to Dataverse
-"""
-
-import argparse
-import json
-import os
-import sys
-import unicodedata
-import warnings # TODO: Remove once the warning is stripped from idutils
-from collections.abc import Sequence
-from dataclasses import dataclass, field
-from enum import Enum
-from functools import reduce
-from pathlib import Path
-from typing import Any
-
-import dns.resolver
-import requests
-import yaml
-from email_validator import EmailNotValidError, validate_email
-from rich.console import Console
-from rich.panel import Panel
-from rich.progress import (
- BarColumn,
- Progress,
- SpinnerColumn,
- TaskID,
- TextColumn,
- TimeElapsedColumn,
-)
-from rich.table import Table
-from rich.theme import Theme
-
-# Get version from setuptools_scm
-try:
- from importlib.metadata import version
-
- __version__ = version("doi2dataset")
-except ImportError:
- # Fallback for older Python versions
- try:
- import pkg_resources
-
- __version__ = pkg_resources.get_distribution("doi2dataset").version
- except Exception:
- __version__ = "1.0.0" # Fallback version
-
-# Idutils throws an unconditional warning about deprecation of relative imports.
-# Since we are not using them, supress the warning to not confuse users
-# TODO: Remove once the warning is stripped from idutils
-warnings.filterwarnings("ignore", category=DeprecationWarning)
-
-from idutils.normalizers import ( # noqa: E402
- normalize_doi,
- normalize_orcid,
- normalize_pmid,
-)
-from idutils.validators import is_doi # noqa: E402
-
-# Icon definitions for console output
-ICONS = {
- "success": "✓", # Simple checkmark
- "error": "✗", # Simple X
- "warning": "!", # Simple exclamation
- "info": "ℹ", # Info symbol
- "processing": "⋯", # Three dots
- "done": "∎", # Filled square
- "file": "⨳", # Document symbol
- "folder": "⊞", # Folder symbol
- "clock": "◷", # Clock symbol
- "search": "⌕", # Search symbol
- "data": "≡", # Three lines
- "doi": "∾", # Link symbol
- "total": "∑", # Sum symbol
- "save": "⤓", # Save/download arrow
- "upload": "⤒", # Upload arrow
-}
-
-# Theme configuration for Rich console output
-THEME = Theme(
- {
- "info": "cyan",
- "warning": "yellow",
- "error": "red bold",
- "success": "green",
- }
-)
-
-# Available sources for metadata abstracts
-SOURCES = ["openalex", "crossref", "none"]
-
-
-def format_status(icon: str, message: str, style: str = "default") -> str:
- """
- Format a status message with an icon and a given style.
-
- Args:
- icon (str): Key for the icon character from the ICONS dictionary.
- message (str): The status message.
- style (str): The style to apply (e.g., 'default', 'info', 'warning', 'error', 'success').
-
- Returns:
- str: The formatted status message.
- """
- return f"[{style}]{ICONS[icon]} {message}[/{style}]"
-
-
-class FieldType(Enum):
- """Enum representing different Dataverse field types."""
-
- PRIMITIVE = "primitive"
- COMPOUND = "compound"
- VOCABULARY = "controlledVocabulary"
-
-
-@dataclass
-class BaseMetadataField[T]:
- """
- Base class for Dataverse metadata fields.
-
- This class defines a metadata field with a name, a value of type T, and
- a flag indicating whether multiple values are allowed. It serves as
- a template for specific metadata field implementations.
-
- Attributes:
- name (str): The name of the metadata field.
- multiple (bool): Indicates whether multiple values are allowed.
- value (T): The value stored in the field.
- type (FieldType): The type of the field, automatically set based on T.
- """
-
- name: str
- multiple: bool
- value: T
- type: FieldType = field(init=False)
- expanded_value: dict[str, str] | None = field(default=None)
-
- def __post_init__(self) -> None:
- """
- After initialization, determine the field type by calling the _set_type method.
- """
- self._set_type()
-
- def _set_type(self) -> None:
- """
- Set the `type` attribute based on the field's value.
-
- This method must be implemented by subclasses.
-
- Raises:
- NotImplementedError: If not implemented by a subclass.
- """
- raise NotImplementedError("Subclasses must implement the _set_type method.")
-
- def to_dict(self) -> dict[str, Any]:
- """
- Convert the metadata field to a dictionary representation.
-
- Returns:
- dict[str, Any]: Dictionary representation of the metadata field.
-
- Raises:
- NotImplementedError: If not implemented by a subclass.
- """
- raise NotImplementedError("Subclasses must implement the to_dict method.")
-
-
-@dataclass
-class PrimitiveMetadataField(BaseMetadataField[str]):
- """
- Metadata field representing a primitive type (e.g., string) for Dataverse.
- """
-
- def _set_type(self) -> None:
- self.type = FieldType.PRIMITIVE
-
- def to_dict(self) -> dict[str, str | bool | dict[str, str]]:
- """
- Convert the primitive metadata field to a dictionary representation.
-
- Returns:
- dict[str, str | bool]: Dictionary with field properties.
- """
-
- if self.expanded_value:
- return {
- "typeName": self.name,
- "typeClass": self.type.value,
- "multiple": self.multiple,
- "value": self.value,
- "expandedValue": self.expanded_value,
- }
- else:
- return {
- "typeName": self.name,
- "typeClass": self.type.value,
- "multiple": self.multiple,
- "value": self.value,
- }
-
-
-@dataclass
-class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]):
- """
- Metadata field for controlled vocabulary values.
- """
-
- def _set_type(self) -> None:
- self.type = FieldType.VOCABULARY
-
- def to_dict(self) -> dict[str, Any]:
- """
- Convert the controlled vocabulary metadata field to a dictionary.
-
- Returns:
- dict[str, Any]: Dictionary representation.
- """
- return {
- "typeName": self.name,
- "typeClass": self.type.value,
- "multiple": self.multiple,
- "value": self.value,
- }
-
-
-@dataclass
-class CompoundMetadataField(
- BaseMetadataField[
- Sequence[Sequence[PrimitiveMetadataField | ControlledVocabularyMetadataField]]
- ]
-):
- """
- Metadata field representing compound types, composed of multiple subfields.
- """
-
- def _set_type(self) -> None:
- self.type = FieldType.COMPOUND
-
- def to_dict(self) -> dict[str, Any]:
- """
- Convert the compound metadata field to a dictionary representation.
-
- Returns:
- dict[str, Any]: Dictionary representation of the compound field.
- """
- value_list: list[dict[str, Any]] = []
- for outer_list in self.value:
- field_dicts: list[dict[str, Any]] = []
- for field_item in outer_list:
- field_dicts.append({field_item.name: field_item.to_dict()})
- value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts))
-
- return {
- "typeName": self.name,
- "typeClass": self.type.value,
- "multiple": self.multiple,
- "value": value_list,
- }
-
-
-@dataclass
-class Institution:
- """
- Represents an institution or organization.
-
- Attributes:
- display_name (str): The name of the institution.
- ror (str): Research Organization Registry identifier (optional).
- """
-
- display_name: str
- ror: str = ""
-
- def affiliation_field(self) -> PrimitiveMetadataField:
- """
- Create a metadata field for the affiliation.
-
- Returns:
- PrimitiveMetadataField: A metadata field representing the institution,
- using ROR ID when available.
- """
- if self.ror:
- expanded_value = {
- "scheme": "http://www.grid.ac/ontology/",
- "termName": self.display_name,
- "@type": "https://schema.org/Organization",
- }
- return PrimitiveMetadataField(
- "authorAffiliation", False, self.ror, expanded_value=expanded_value
- )
- else:
- return PrimitiveMetadataField("authorAffiliation", False, self.display_name)
-
-
-@dataclass
-class Person:
- """
- Represents a person (e.g., an author or a PI).
-
- Attributes:
- family_name (str): Family name of the person.
- given_name (str): Given name of the person.
- orcid (str): ORCID identifier (optional).
- email (str): Email address (optional).
- affiliation (Institution): Affiliation of the person (optional).
- """
-
- family_name: str
- given_name: str
- orcid: str = ""
- email: str = ""
- affiliation: Institution | str = ""
-
- def to_dict(self) -> dict[str, str | list[str] | dict[str, str]]:
- """
- Convert Person to a dictionary for JSON serialization.
-
- Handles affiliations properly by checking if the affiliation
- is an Institution object or a string.
-
- Returns:
- dict: A dictionary containing the person's information including
- name, contact details, and affiliation.
- """
- return_dict: dict[str, str | list[str] | dict[str, str]] = {
- "family_name": self.family_name,
- "given_name": self.given_name,
- "orcid": self.orcid,
- "email": self.email,
- }
-
- if isinstance(self.affiliation, Institution):
- if self.affiliation.ror:
- return_dict["affiliation"] = self.affiliation.ror
- elif self.affiliation.display_name:
- return_dict["affiliation"] = self.affiliation.display_name
- else:
- return_dict["affiliation"] = ""
- else:
- return_dict["affiliation"] = self.affiliation if self.affiliation else ""
-
- return return_dict
-
- def format_name(self) -> str:
- """
- Format the name in 'Family, Given' order.
-
- Returns:
- str: Formatted name.
- """
- return f"{self.family_name}, {self.given_name}"
-
- def author_fields(
- self,
- ) -> list[PrimitiveMetadataField | ControlledVocabularyMetadataField]:
- """
- Build metadata fields for the author.
-
- The method handles both Institution objects and string values for affiliations.
- Different fields are generated depending on whether ORCID is available.
-
- Returns:
- list: List of metadata fields representing the author, including name,
- affiliation, and optionally ORCID identifier information.
- """
- affiliation_field = None
- if isinstance(self.affiliation, Institution):
- affiliation_field = self.affiliation.affiliation_field()
- else:
- affiliation_field = PrimitiveMetadataField(
- "authorAffiliation", False, self.affiliation
- )
-
- if self.orcid:
- return [
- PrimitiveMetadataField("authorName", False, self.format_name()),
- affiliation_field,
- ControlledVocabularyMetadataField(
- "authorIdentifierScheme", False, "ORCID"
- ),
- PrimitiveMetadataField("authorIdentifier", False, self.orcid),
- ]
- else:
- return [
- PrimitiveMetadataField("authorName", False, self.format_name()),
- affiliation_field,
- ]
-
- def dataset_contact_fields(self) -> list[PrimitiveMetadataField]:
- """
- Generate metadata fields for dataset contact.
-
- The method handles both Institution objects and string values for affiliations.
- Creates fields for the contact name, affiliation, and email address.
-
- Returns:
- list: List of metadata fields for the dataset contact including name,
- affiliation, and email address.
- """
-
- affiliation_field = None
- if isinstance(self.affiliation, Institution):
- affiliation_field = self.affiliation.affiliation_field()
- else:
- affiliation_field = PrimitiveMetadataField(
- "datasetContactAffiliation", False, self.affiliation
- )
-
- return [
- PrimitiveMetadataField("datasetContactName", False, self.format_name()),
- affiliation_field,
- PrimitiveMetadataField("datasetContactEmail", False, self.email),
- ]
-
-
-@dataclass
-class License:
- """
- Represents a license with name, URI, and short identifier.
-
- Attributes:
- name (str): The full name of the license.
- uri (str): The license URI.
- short (str): The short identifier of the license.
- """
-
- name: str
- uri: str
- short: str
-
-
-@dataclass
-class Abstract:
- """
- Represents an abstract with its text and source.
-
- Attributes:
- text (str): The abstract text.
- source (str): The source of the abstract ('crossref', 'openalex', or 'none').
- """
-
- text: str
- source: str
-
- def __post_init__(self):
- """
- Validate that the abstract source is one of the allowed values.
-
- Raises:
- ValueError: If source is not one of the allowed values.
- """
- allowed_sources = ["crossref", "openalex", "none"]
- if self.source not in allowed_sources:
- raise ValueError(
- f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}."
- )
-
-
-@dataclass
-class ConfigData:
- """
- Represents configuration data loaded from a YAML file with environment variable overrides.
-
- The dataverse configuration may be overridden by environment variables:
- DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE,
- DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD.
-
- Attributes:
- dataverse (dict[str, str]): Dataverse-related configuration with environment
- variable overrides applied.
- pis (list[dict[str, Any]]): List of principal investigator configurations.
- default_grants (list[dict[str, str]]): Default grant configurations.
- """
-
- dataverse: dict[str, str]
- pis: list[dict[str, Any]]
- default_grants: list[dict[str, str]]
-
-
-class Config:
- """
- Singleton class to handle configuration loading and retrieval.
-
- Supports environment variable overrides for Dataverse configuration:
- - DATAVERSE_URL: Overrides dataverse.url
- - DATAVERSE_API_TOKEN: Overrides dataverse.api_token
- - DATAVERSE_DATAVERSE: Overrides dataverse.dataverse
- - DATAVERSE_AUTH_USER: Overrides dataverse.auth_user
- - DATAVERSE_AUTH_PASSWORD: Overrides dataverse.auth_password
-
- Environment variables take precedence over config file values.
- """
-
- _instance: "Config | None" = None
- _config_data: ConfigData | None = None
-
- def __new__(cls) -> "Config":
- """
- Create and return the singleton instance of Config.
-
- Returns:
- Config: The singleton instance.
- """
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- return cls._instance
-
- @classmethod
- def load_config(cls, config_path: str | Path | None = None) -> None:
- """
- Load configuration from a YAML file with environment variable overrides.
-
- Environment variables will override corresponding config file values:
- DATAVERSE_URL, DATAVERSE_API_TOKEN, DATAVERSE_DATAVERSE,
- DATAVERSE_AUTH_USER, DATAVERSE_AUTH_PASSWORD
-
- Args:
- config_path (str | Path | None): Path to the configuration file.
- If None, the default config.yaml in the script directory is used.
-
- Raises:
- FileNotFoundError: If the configuration file does not exist.
- ValueError: If any PI email address is invalid.
- """
- if config_path is None:
- config_path = Path(__file__).parent / "config.yaml"
-
- config_path = Path(config_path)
- if not config_path.exists():
- raise FileNotFoundError(f"Config file not found: {config_path}")
-
- with open(config_path, encoding="utf-8") as f:
- config_data = yaml.safe_load(f)
-
- # Override dataverse config with environment variables if they exist
- dataverse_config = config_data.get("dataverse", {})
-
- # Check for environment variables and override config values
- env_overrides = {
- "url": os.getenv("DATAVERSE_URL"),
- "api_token": os.getenv("DATAVERSE_API_TOKEN"),
- "dataverse": os.getenv("DATAVERSE_DATAVERSE"),
- "auth_user": os.getenv("DATAVERSE_AUTH_USER"),
- "auth_password": os.getenv("DATAVERSE_AUTH_PASSWORD"),
- }
-
- # Apply environment variable overrides if they exist
- for key, env_value in env_overrides.items():
- if env_value is not None:
- dataverse_config[key] = env_value
-
- # Validate PI email addresses
- pis = config_data.get("pis", [])
- for pi in pis:
- if email := pi.get("email"):
- if not validate_email_address(email):
- raise ValueError(
- f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}"
- )
-
- cls._config_data = ConfigData(
- dataverse=dataverse_config,
- pis=config_data.get("pis", []),
- default_grants=config_data.get("default_grants", []),
- )
-
- @classmethod
- def get_config(cls) -> ConfigData:
- """
- Retrieve the loaded configuration data.
-
- Returns:
- ConfigData: The configuration data.
-
- Raises:
- RuntimeError: If the configuration could not be loaded.
- """
- if cls._config_data is None:
- cls.load_config()
- if cls._config_data is None:
- raise RuntimeError("Failed to load configuration")
- return cls._config_data
-
- @property
- def PIS(self) -> list[dict[str, Any]]:
- """
- Get PI configurations.
-
- Returns:
- list[dict[str, Any]]: List of PI configurations.
- """
- return self.get_config().pis
-
- @property
- def DEFAULT_GRANTS(self) -> list[dict[str, str]]:
- """
- Get default grant configurations.
-
- Returns:
- list[dict[str, str]]: List of default grants.
- """
- return self.get_config().default_grants
-
- @property
- def DATAVERSE(self) -> dict[str, str]:
- """
- Get Dataverse configurations with environment variable overrides applied.
-
- Returns:
- dict[str, str]: Dataverse configuration with environment variables
- taking precedence over config file values.
- """
- return self.get_config().dataverse
-
-
-class APIClient:
- """
- Client for making HTTP requests to external APIs.
-
- Attributes:
- session (requests.Session): The underlying requests session.
- """
-
- def __init__(
- self,
- contact_mail: str | None = None,
- user_agent: str = f"UDE-Doi2Dataset/{__version__}",
- token: str | None = None,
- ) -> None:
- """
- Initialize the API client with optional contact mail, user agent, and token.
-
- Args:
- contact_mail (str | None): Contact email address.
- user_agent (str): User agent string.
- token (str | None): Optional API token.
- """
- self.session = requests.Session()
- self._set_headers(contact_mail, user_agent, token)
-
- def _set_headers(
- self, contact_mail: str | None, user_agent: str, token: str | None
- ) -> None:
- """
- Set HTTP headers for the session based on contact email and token.
-
- Args:
- contact_mail (str | None): Contact email address.
- user_agent (str): User agent string.
- token (str | None): Optional API token.
- """
- if contact_mail:
- header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"}
- else:
- header = {"User-Agent": user_agent}
-
- if token:
- header["X-Dataverse-key"] = token
-
- self.session.headers.update(header)
-
- def make_request(
- self, url: str, method: str = "GET", **kwargs: Any
- ) -> requests.Response | None:
- """
- Make an HTTP request and return the response.
-
- Args:
- url (str): The URL to request.
- method (str): HTTP method to use (default: GET).
- **kwargs: Additional arguments for requests.request.
-
- Returns:
- requests.Response | None: The HTTP response, or None if the request failed.
- """
- try:
- response = self.session.request(method, url, **kwargs)
- response.raise_for_status()
- return response
- except requests.exceptions.RequestException as e:
- print(f"\n{ICONS['error']} Request failed: {str(e)}")
- return None
-
-
-class NameProcessor:
- """
- Provides utility methods for processing names.
- """
-
- @staticmethod
- def normalize_string(s: str) -> str:
- """
- Normalize a string using Unicode NFKD normalization and convert to ASCII.
-
- Args:
- s (str): The string to normalize.
-
- Returns:
- str: The normalized string.
- """
- return (
- unicodedata.normalize("NFKD", s.lower())
- .encode("ASCII", "ignore")
- .decode("ASCII")
- )
-
- @staticmethod
- def split_name(full_name: str) -> tuple[str, str]:
- """
- Split a full name into given and family names.
-
- Args:
- full_name (str): The full name (e.g., "Doe, John" or "John Doe").
-
- Returns:
- tuple[str, str]: A tuple (given_name, family_name).
- """
- if "," in full_name:
- surname, given_name = full_name.split(",", 1)
- return given_name.strip(), surname.strip()
-
- parts = full_name.strip().split()
- if len(parts) == 1:
- return "", parts[0]
-
- return " ".join(parts[:-1]), parts[-1]
-
-
-class PIFinder:
- """
- Finds principal investigators (PIs) among a list of Person objects.
- """
-
- def __init__(self, pis: list[Person]) -> None:
- """
- Initialize with a list of Person objects representing potential PIs.
-
- Args:
- pis (list[Person]): List of Person objects.
- """
- self.pis = pis
-
- def find_pi(
- self,
- family_name: str | None = None,
- orcid: str | None = None,
- given_name: str | None = None,
- ) -> Person | None:
- """
- Find a PI by ORCID or name.
-
- Args:
- family_name (str | None): Family name.
- orcid (str | None): ORCID identifier.
- given_name (str | None): Given name.
-
- Returns:
- Person | None: The matched PI or None if not found.
- """
- if orcid:
- return self._find_by_orcid(normalize_orcid(orcid))
-
- if family_name:
- return self._find_by_name(family_name, given_name)
-
- return None
-
- def _find_by_orcid(self, orcid: str) -> Person | None:
- """
- Find a PI by ORCID.
-
- Args:
- orcid (str): Normalized ORCID.
-
- Returns:
- Person | None: The matched PI or None.
- """
- for person in self.pis:
- if normalize_orcid(person.orcid) == orcid:
- return person
- return None
-
- def _find_by_name(self, family_name: str, given_name: str | None) -> Person | None:
- """
- Find a PI by family name (and optionally given name).
-
- Args:
- family_name (str): Family name.
- given_name (str | None): Given name (optional).
-
- Returns:
- Person | None: The matched PI or None.
- """
- matches: list[Person] = []
- normalized_family_name = NameProcessor.normalize_string(family_name)
-
- for person in self.pis:
- if (
- NameProcessor.normalize_string(person.family_name)
- == normalized_family_name
- ):
- matches.append(person)
-
- if not matches:
- return None
-
- if given_name:
- normalized_given_name = NameProcessor.normalize_string(given_name)
- for match in matches:
- if (
- NameProcessor.normalize_string(match.given_name)
- == normalized_given_name
- ):
- return match
- return None
-
- if len(matches) == 1:
- return matches[0]
-
- raise ValueError("Multiple matches found for family name")
-
-
-class LicenseProcessor:
- """
- Processes license information from metadata.
- """
-
- LICENSE_MAP = {
- "cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"),
- "cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"),
- "cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"),
- "cc-by-nc-sa": (
- "https://creativecommons.org/licenses/by-nc-sa/4.0/",
- "CC BY-NC-SA 4.0",
- ),
- "cc-by-nc-nd": (
- "https://creativecommons.org/licenses/by-nc-nd/4.0/",
- "CC BY-NC-ND 4.0",
- ),
- "cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"),
- "cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"),
- "pd": (
- "https://creativecommons.org/publicdomain/mark/1.0/",
- "Public Domain Mark 1.0",
- ),
- }
-
- @classmethod
- def process_license(cls, data: dict[str, Any]) -> License:
- """
- Process and return license information based on input data.
-
- Args:
- data (dict[str, Any]): Input data containing license info.
-
- Returns:
- License: Processed license information.
- """
- location = data.get("primary_location", {})
- license_short = location.get("license", "")
-
- if not license_short:
- return License(name="", uri="", short="unknown")
-
- base_license = license_short.split("/")[0].lower()
- uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short))
- return License(name=name, uri=uri, short=license_short)
-
-
-class AbstractProcessor:
- """
- Retrieves and processes abstracts from CrossRef and OpenAlex.
- """
-
- def __init__(self, api_client: APIClient, console: Console | None = None):
- """
- Initialize with an APIClient instance.
-
- Args:
- api_client (APIClient): The API client to use for requests.
- console (Console | None): Rich console instance for output.
- """
- self.api_client = api_client
- self.console = console or Console()
-
- def get_abstract(
- self, doi: str, data: dict[str, Any], license: License
- ) -> Abstract:
- """
- Get an abstract based on DOI and license permissions.
-
- Args:
- doi (str): The DOI.
- data (dict[str, Any]): Data retrieved from an external source.
- license (License): License information.
-
- Returns:
- Abstract: The abstract with its source.
- """
- license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"}
-
- if license.short in license_ok:
- self.console.print(
- f"\n{ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.",
- style="info",
- )
- crossref_abstract = self._get_crossref_abstract(doi)
- if crossref_abstract:
- return Abstract(text=crossref_abstract, source="crossref")
- else:
- self.console.print(
- f"\n{ICONS['warning']} No abstract found in CrossRef!",
- style="warning",
- )
- else:
- if license.name:
- self.console.print(
- f"\n{ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!",
- style="info",
- )
- else:
- self.console.print(
- f"\n{ICONS['info']} Custom license does not allow derivative works. Reconstructing abstract from OpenAlex!",
- style="info",
- )
-
- openalex_abstract = self._get_openalex_abstract(data)
- if openalex_abstract:
- return Abstract(text=openalex_abstract, source="openalex")
- else:
- self.console.print(
- f"\n{ICONS['warning']} No abstract found in OpenAlex!", style="warning"
- )
-
- self.console.print(
- f"\n{ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!",
- style="warning",
- )
- return Abstract(text="", source="none")
-
- def _get_crossref_abstract(self, doi: str) -> str | None:
- """
- Retrieve abstract from CrossRef API.
-
- Args:
- doi (str): The DOI.
-
- Returns:
- str | None: The abstract if found, otherwise None.
- """
- url = f"https://api.crossref.org/works/{doi}"
- response = self.api_client.make_request(url)
-
- if response and response.status_code == 200:
- abstract_raw = response.json().get("message", {}).get("abstract")
- return self._clean_jats(abstract_raw)
- return None
-
- def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None:
- """
- Retrieve abstract from OpenAlex data.
-
- Args:
- data (dict[str, Any]): Data from OpenAlex.
-
- Returns:
- str | None: The reconstructed abstract, or None if not available.
- """
- inv_index = data.get("abstract_inverted_index")
- if not inv_index:
- return None
-
- word_positions = [
- (word, pos) for word, positions in inv_index.items() for pos in positions
- ]
- sorted_words = sorted(word_positions, key=lambda x: x[1])
- return " ".join(word for word, _ in sorted_words)
-
- def _clean_jats(self, text: str | None) -> str:
- """
- Clean JATS XML tags in the abstract and convert them to HTML tags.
-
- Args:
- text (str | None): The raw abstract text containing JATS tags.
-
- Returns:
- str: The cleaned abstract text.
- """
- if not text:
- return ""
-
- # Handle list tags with sequential processing to avoid duplicate keys
- # Process ordered lists first - replace both opening and closing tags
- text = text.replace('")
- # Find and replace closing tags for ordered lists
- import re
-
- # Replace closing tags that follow ordered list openings
- # This regex matches
",
- "
", - "
", - "
This {type} was published on {publication_date} in {journal} {volume}({issue})
" - elif all([journal, publication_date, type]): - return f"This {type} was published on {publication_date} in {journal}
" - - self.console.print( - f"{ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)", - style="warning", - ) - return "" - - def _get_publication_year(self, data: dict[str, Any]) -> str: - """ - Extract the publication year from the metadata. - - Args: - data (dict[str, Any]): The metadata. - - Returns: - str: The publication year. - """ - return data.get("publication_year", "") - - def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]: - """ - Identify involved principal investigators from the metadata for use as fallback - corresponding authors. - - This method matches authors in the publication metadata against the configured - PIs and returns matching PIs. It is used as a fallback when no corresponding - authors are explicitly declared in the publication metadata. - - Args: - data (dict[str, Any]): The metadata from OpenAlex. - - Returns: - list[Person]: List of matching PIs for use as corresponding authors. - """ - involved_pis: list[Person] = [] - for authorship in data.get("authorships", []): - author = authorship.get("author", {}) - if not author: - continue - - display_name = author.get("display_name", "") - given_name, family_name = NameProcessor.split_name(display_name) - - if pi := self.pi_finder.find_pi( - family_name=family_name, - given_name=given_name, - orcid=author.get("orcid"), - ): - involved_pis.append(pi) - - return involved_pis - - def _save_output(self, metadata: dict[str, Any]) -> None: - """ - Save the generated metadata to a file or print it to the console. - - Args: - metadata (dict[str, Any]): The metadata to save. - """ - if self.output_path: - try: - # Custom JSON encoder to handle custom objects - class CustomEncoder(json.JSONEncoder): - """ - Custom JSON encoder that handles objects with to_dict method. - - This allows for proper serialization of custom classes like - Institution and Person by calling their to_dict method when - available. - - Args: - o: The object to serialize. - - Returns: - A JSON-serializable representation of the object. - """ - - def default(self, o: Any) -> Any: - if hasattr(o, "to_dict"): - return o.to_dict() - return super().default(o) - - with open(self.output_path, "w", encoding="utf-8") as f: - json.dump( - metadata, f, indent=4, ensure_ascii=False, cls=CustomEncoder - ) - self.console.print( - f"{ICONS['save']} Metadata saved in: {self.output_path}", - style="info", - ) - except Exception as e: - self.console.print( - f"{ICONS['error']} Error saving metadata: {str(e)}\n", style="error" - ) - raise - else: - self.console.print(metadata) - - -def sanitize_filename(doi: str) -> str: - """ - Convert DOI to a valid filename using only alphanumeric characters and underscores. - - Args: - doi (str): The DOI to sanitize. - - Returns: - str: Sanitized filename string. - """ - # Replace non-alphanumeric characters with underscores - sanitized = "".join(c if c.isalnum() else "_" for c in doi) - # Remove consecutive underscores - while "__" in sanitized: - sanitized = sanitized.replace("__", "_") - # Remove leading/trailing underscores - return sanitized.strip("_") - - -def print_summary(results: dict[str, list[Any]], console: Console) -> None: - """ - Print a summary table of processing results to the console. - - Args: - results (dict[str, list[Any]]): Dictionary containing success and failed DOIs. - console (Console): Rich console object for output. - """ - table = Table(title="Processing Results") - - table.add_column("Status", style="bold") - table.add_column("Count", justify="right") - table.add_column("DOIs", style="dim") - - table.add_row( - f"{ICONS['success']} Success", - str(len(results["success"])), - ", ".join(results["success"][:3]) - + ("..." if len(results["success"]) > 3 else ""), - ) - - if results["failed"]: - table.add_row( - f"{ICONS['error']} Failed", - str(len(results["failed"])), - ", ".join(doi for doi, _ in results["failed"][:3]) - + ("..." if len(results["failed"]) > 3 else ""), - ) - - console.print(Panel(table, title="Summary", border_style="blue")) - - -def validate_email_address(email: str): - """ - Validate an email address and ensure its domain has an MX record. - - Args: - email (str): The email address to validate. - - Returns: - bool: True if the email address is valid and its domain resolves, otherwise False. - """ - try: - # Basic validation - valid = validate_email(email) - email = valid.normalized - - # Check domain has MX record - domain = email.split("@")[1] - dns.resolver.resolve(domain, "MX") - - return True - except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): - return False - - -def process_doi_batch( - dois: set[str], - output_dir: Path, - depositor: str | None = None, - default_subject: str = "Medicine, Health and Life Sciences", - contact_mail: str | None = None, - upload: bool = False, - ror: bool = False, - console: Console | None = None, -) -> dict[str, list[Any]]: - """ - Process a batch of DOIs and return a summary of results. - - Args: - dois (set[str]): Set of DOIs to process. - output_dir (Path): Directory where metadata files will be saved. - depositor (str | None): Depositor name. - default_subject (str): Default subject for metadata. - contact_mail (str | None): Contact email address. - upload (bool): Flag indicating whether to upload metadata to Dataverse. - ror (bool): Flag indication whether to use ROR id for affiliation. - console (Console | None): Rich console instance for output. - - Returns: - dict[str, list[Any]]: Dictionary with keys 'success' and 'failed'. - """ - results: dict[str, list[Any]] = {"success": [], "failed": []} - - # Use provided console or create a new one - if console is None: - console = Console() - - progress_columns = [ - SpinnerColumn(), - TextColumn("[bold blue]{task.description:<50}"), - BarColumn(bar_width=None), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TextColumn("•"), # Separator - TimeElapsedColumn(), - TextColumn("•"), # Separator - TextColumn("[bold]{task.completed}/{task.total}"), - ] - - # Define steps for each DOI processing - if upload: - doi_total_steps = 4 # Fetch, Build, Upload, Save - else: - doi_total_steps = 3 # Fetch, Build, Save - - with Progress( - *progress_columns, - console=console, - transient=True, # This makes the progress bar disappear after completion - ) as progress: - # Add main task - main_task = progress.add_task("[bold blue]Processing DOIs...", total=len(dois)) - - # Add status task for current DOI - status_task = progress.add_task( - "[cyan]Current:", - total=None, # Indeterminate progress - visible=False, # Hidden initially - ) - - status_task = progress.add_task( - "[cyan]Current:", total=doi_total_steps, visible=False - ) - - for doi in dois: - try: - # Update status display - progress.update( - status_task, - description=f"[cyan]Current: [white]{doi[:50]}...", - visible=True, - completed=0, # Reset progress for new DOI - ) - - # Process the DOI - sanitized_filename = sanitize_filename(normalize_doi(doi)) - output_path = output_dir / f"{sanitized_filename}_metadata.json" - - processor = MetadataProcessor( - doi=doi, - depositor=depositor, - output_path=output_path, - default_subject=default_subject, - contact_mail=contact_mail, - upload=upload, - ror=ror, - console=console, - progress=progress, - task_id=status_task, - ) - - # Process and capture result - processor.process() - results["success"].append(doi) - - # Update progress - progress.advance(main_task) - - except Exception as e: - # Handle errors - results["failed"].append((doi, str(e))) - - # Show error but keep progress bar - progress.console.print( - f"{ICONS['error']} Error processing {doi}: {str(e)}", style="error" - ) - finally: - # Clear current status - progress.update(status_task, visible=False) - - # Print final summary - print_summary(results, console) - - return results - - -def main(): - """Main entry point for the console script.""" - console = Console(theme=THEME) - - try: - parser = argparse.ArgumentParser( - description="Process DOIs to generate metadata" - ) - parser.add_argument("dois", nargs="*", help="One or more DOIs to process") - parser.add_argument( - "-f", - "--file", - help="File containing DOIs (one per line)", - type=argparse.FileType("r"), - ) - parser.add_argument( - "-o", - "--output-dir", - help="Output directory for metadata files", - default=".", - ) - parser.add_argument( - "-d", "--depositor", help="Name of the depositor", default=None - ) - parser.add_argument( - "-s", - "--subject", - help="Default subject", - default="Medicine, Health and Life Sciences", - ) - parser.add_argument( - "-m", "--contact-mail", help="Contact email address", default=False - ) - parser.add_argument( - "-u", "--upload", help="Upload to Dataverse", action="store_true" - ) - parser.add_argument( - "-r", "--use-ror", help="Use ROR ID if available", action="store_true" - ) - - args = parser.parse_args() - - # Ensure we have either DOIs as arguments or a file - if not args.dois and not args.file: - console.print( - f"{ICONS['error']} Error: No DOIs provided. Use either command line arguments or -f/--file option.", - style="error", - ) - parser.print_help() - sys.exit(1) - - # Get DOIs from both direct arguments and file if provided - dois = set(args.dois) # Start with directly provided DOIs - if args.file: - console.print( - f"{ICONS['file']} Reading DOIs from file: {args.file.name}", - style="info", - ) - dois.update(line.strip() for line in args.file if line.strip()) - - # Create output directory if it doesn't exist - output_dir = Path(args.output_dir) - try: - output_dir.mkdir(parents=True, exist_ok=True) - console.print( - f"{ICONS['folder']} Output directory: {output_dir}\n", style="info" - ) - except Exception as e: - console.print( - f"Failed to create output directory: {str(e)}\n", style="error" - ) - sys.exit(1) - - if args.contact_mail: - if not validate_email_address(args.contact_mail): - raise ValueError(f"Not a valid email address: {args.contact_mail}") - console.print( - f"{ICONS['info']} Exposing contact email <{args.contact_mail}> to API services.\n", - style="info", - ) - - # Process DOIs and track time - process_doi_batch( - dois=dois, - output_dir=output_dir, - depositor=args.depositor, - default_subject=args.subject, - contact_mail=args.contact_mail, - upload=args.upload, - ror=args.use_ror, - console=console, - ) - - except KeyboardInterrupt: - console.print( - f"\n{ICONS['warning']} Processing interrupted by user", style="warning" - ) - sys.exit(1) - except Exception as e: - console.print( - f"\n{ICONS['error']} An unexpected error occurred: {str(e)}", style="error" - ) - sys.exit(1) - - -if __name__ == "__main__": - main()