1733 lines
58 KiB
Python
Executable file
1733 lines
58 KiB
Python
Executable file
#!/bin/env python
|
||
"""
|
||
doi2dataset.py
|
||
|
||
This script processes DOIs to generate metadata for Dataverse datasets.
|
||
It supports fetching data from OpenAlex and CrossRef, mapping metadata fields,
|
||
processing author and grant information, and optionally uploading the metadata
|
||
to a Dataverse instance.
|
||
|
||
Usage:
|
||
python doi2dataset.py [options] doi1 doi2 ...
|
||
|
||
Options:
|
||
-f, --file File containing DOIs (one per line)
|
||
-o, --output-dir Output directory for metadata files (default: current directory)
|
||
-d, --depositor Name of the depositor
|
||
-s, --subject Default subject (default: "Medicine, Health and Life Sciences")
|
||
-m, --contact-mail Contact email address
|
||
-u, --upload Upload metadata to Dataverse
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import sys
|
||
import unicodedata
|
||
import warnings # TODO: Remove once the warning is stripped from idutils
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from enum import Enum
|
||
from functools import reduce
|
||
from pathlib import Path
|
||
from typing import Any, Sequence
|
||
|
||
import dns.resolver
|
||
import requests
|
||
import yaml
|
||
from email_validator import EmailNotValidError, validate_email
|
||
from rich.console import Console
|
||
from rich.panel import Panel
|
||
from rich.progress import (
|
||
BarColumn,
|
||
Progress,
|
||
SpinnerColumn,
|
||
TaskID,
|
||
TextColumn,
|
||
TimeElapsedColumn,
|
||
)
|
||
from rich.table import Table
|
||
from rich.theme import Theme
|
||
|
||
# Idutils throws an unconditional warning about deprecation of relative imports.
|
||
# Since we are not using them, supress the warning to not confuse users
|
||
# TODO: Remove once the warning is stripped from idutils
|
||
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||
|
||
from idutils.normalizers import normalize_doi, normalize_orcid, normalize_pmid
|
||
from idutils.validators import is_doi
|
||
|
||
# Script version
|
||
VERSION = "1.0"
|
||
|
||
# Icon definitions for console output
|
||
ICONS = {
|
||
'success': "✓", # Simple checkmark
|
||
'error': "✗", # Simple X
|
||
'warning': "!", # Simple exclamation
|
||
'info': "ℹ", # Info symbol
|
||
'processing': "⋯", # Three dots
|
||
'done': "∎", # Filled square
|
||
'file': "⨳", # Document symbol
|
||
'folder': "⊞", # Folder symbol
|
||
'clock': "◷", # Clock symbol
|
||
'search': "⌕", # Search symbol
|
||
'data': "≡", # Three lines
|
||
'doi': "∾", # Link symbol
|
||
'total': "∑", # Sum symbol
|
||
'save': "⤓", # Save/download arrow
|
||
'upload': "⤒" # Upload arrow
|
||
}
|
||
|
||
# Theme configuration for Rich console output
|
||
THEME = Theme({
|
||
"info": "cyan",
|
||
"warning": "yellow",
|
||
"error": "red bold",
|
||
"success": "green",
|
||
})
|
||
|
||
# Available sources for metadata abstracts
|
||
SOURCES = ["openalex", "crossref", "none"]
|
||
|
||
def format_status(icon: str, message: str, style: str = "default") -> str:
|
||
"""
|
||
Format a status message with an icon and a given style.
|
||
|
||
Args:
|
||
icon (str): Key for the icon character from the ICONS dictionary.
|
||
message (str): The status message.
|
||
style (str): The style to apply (e.g., 'default', 'info', 'warning', 'error', 'success').
|
||
|
||
Returns:
|
||
str: The formatted status message.
|
||
"""
|
||
return f"[{style}]{ICONS[icon]} {message}[/{style}]"
|
||
|
||
class FieldType(Enum):
|
||
"""Enum representing different Dataverse field types."""
|
||
PRIMITIVE = "primitive"
|
||
COMPOUND = "compound"
|
||
VOCABULARY = "controlledVocabulary"
|
||
|
||
@dataclass
|
||
class Phase:
|
||
"""
|
||
Represents a project phase with a defined time span.
|
||
|
||
Attributes:
|
||
name (str): The name of the project phase.
|
||
start (int): The start year of the project phase.
|
||
end (int): The end year of the project phase.
|
||
"""
|
||
|
||
name: str
|
||
start: int
|
||
end: int
|
||
|
||
def check_year(self, year: int) -> bool:
|
||
"""
|
||
Checks whether a given year falls within the project's phase boundaries.
|
||
|
||
Args:
|
||
year (int): The year to check.
|
||
|
||
Returns:
|
||
bool: True if the year is within the phase boundaries, otherwise False.
|
||
"""
|
||
|
||
if self.start <= year <= self.end:
|
||
return True
|
||
return False
|
||
|
||
@dataclass
|
||
class BaseMetadataField[T]:
|
||
"""
|
||
Base class for Dataverse metadata fields.
|
||
|
||
This class defines a metadata field with a name, a value of type T, and
|
||
a flag indicating whether multiple values are allowed. It serves as
|
||
a template for specific metadata field implementations.
|
||
|
||
Attributes:
|
||
name (str): The name of the metadata field.
|
||
multiple (bool): Indicates whether multiple values are allowed.
|
||
value (T): The value stored in the field.
|
||
type (FieldType): The type of the field, automatically set based on T.
|
||
"""
|
||
name: str
|
||
multiple: bool
|
||
value: T
|
||
type: FieldType = field(init=False)
|
||
|
||
def __post_init__(self) -> None:
|
||
"""
|
||
After initialization, determine the field type by calling the _set_type method.
|
||
"""
|
||
self._set_type()
|
||
|
||
def _set_type(self) -> None:
|
||
"""
|
||
Set the `type` attribute based on the field's value.
|
||
|
||
This method must be implemented by subclasses.
|
||
|
||
Raises:
|
||
NotImplementedError: If not implemented by a subclass.
|
||
"""
|
||
raise NotImplementedError("Subclasses must implement the _set_type method.")
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
"""
|
||
Convert the metadata field to a dictionary representation.
|
||
|
||
Returns:
|
||
dict[str, Any]: Dictionary representation of the metadata field.
|
||
|
||
Raises:
|
||
NotImplementedError: If not implemented by a subclass.
|
||
"""
|
||
raise NotImplementedError("Subclasses must implement the to_dict method.")
|
||
|
||
@dataclass
|
||
class PrimitiveMetadataField(BaseMetadataField[str]):
|
||
"""
|
||
Metadata field representing a primitive type (e.g., string) for Dataverse.
|
||
"""
|
||
def _set_type(self) -> None:
|
||
self.type = FieldType.PRIMITIVE
|
||
|
||
def to_dict(self) -> dict[str, str | bool]:
|
||
"""
|
||
Convert the primitive metadata field to a dictionary representation.
|
||
|
||
Returns:
|
||
dict[str, str | bool]: Dictionary with field properties.
|
||
"""
|
||
return {
|
||
"typeName": self.name,
|
||
"typeClass": self.type.value,
|
||
"multiple": self.multiple,
|
||
"value": self.value,
|
||
}
|
||
|
||
@dataclass
|
||
class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]):
|
||
"""
|
||
Metadata field for controlled vocabulary values.
|
||
"""
|
||
def _set_type(self) -> None:
|
||
self.type = FieldType.VOCABULARY
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
"""
|
||
Convert the controlled vocabulary metadata field to a dictionary.
|
||
|
||
Returns:
|
||
dict[str, Any]: Dictionary representation.
|
||
"""
|
||
return {
|
||
"typeName": self.name,
|
||
"typeClass": self.type.value,
|
||
"multiple": self.multiple,
|
||
"value": self.value,
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class CompoundMetadataField(
|
||
BaseMetadataField[Sequence[Sequence[PrimitiveMetadataField | ControlledVocabularyMetadataField]]]
|
||
):
|
||
"""
|
||
Metadata field representing compound types, composed of multiple subfields.
|
||
"""
|
||
def _set_type(self) -> None:
|
||
self.type = FieldType.COMPOUND
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
"""
|
||
Convert the compound metadata field to a dictionary representation.
|
||
|
||
Returns:
|
||
dict[str, Any]: Dictionary representation of the compound field.
|
||
"""
|
||
value_list: list[dict[str, Any]] = []
|
||
for outer_list in self.value:
|
||
field_dicts: list[dict[str, Any]] = []
|
||
for field_item in outer_list:
|
||
field_dicts.append({field_item.name: field_item.to_dict()})
|
||
value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts))
|
||
|
||
return {
|
||
"typeName": self.name,
|
||
"typeClass": self.type.value,
|
||
"multiple": self.multiple,
|
||
"value": value_list
|
||
}
|
||
|
||
@dataclass
|
||
class Person:
|
||
"""
|
||
Represents a person (e.g., an author or a PI).
|
||
|
||
Attributes:
|
||
family_name (str): Family name of the person.
|
||
given_name (str): Given name of the person.
|
||
orcid (str): ORCID identifier (optional).
|
||
email (str): Email address (optional).
|
||
affiliation (str): Affiliation of the person (optional).
|
||
project (list[str]): List of associated projects.
|
||
"""
|
||
family_name: str
|
||
given_name: str
|
||
orcid: str = ""
|
||
email: str = ""
|
||
affiliation: str = ""
|
||
project: list[str] = field(default_factory=list)
|
||
|
||
def format_name(self) -> str:
|
||
"""
|
||
Format the name in 'Family, Given' order.
|
||
|
||
Returns:
|
||
str: Formatted name.
|
||
"""
|
||
return f"{self.family_name}, {self.given_name}"
|
||
|
||
def author_fields(self) -> list[PrimitiveMetadataField | ControlledVocabularyMetadataField]:
|
||
"""
|
||
Build metadata fields for an author.
|
||
|
||
Returns:
|
||
list: List of metadata fields representing the author.
|
||
"""
|
||
if self.orcid:
|
||
return [
|
||
PrimitiveMetadataField("authorName", False, self.format_name()),
|
||
PrimitiveMetadataField("authorAffiliation", False, self.affiliation),
|
||
ControlledVocabularyMetadataField("authorIdentifierScheme", False, "ORCID"),
|
||
PrimitiveMetadataField("authorIdentifier", False, self.orcid)
|
||
]
|
||
else:
|
||
return [
|
||
PrimitiveMetadataField("authorName", False, self.format_name()),
|
||
PrimitiveMetadataField("authorAffiliation", False, self.affiliation)
|
||
]
|
||
|
||
def dataset_contact_fields(self) -> list[PrimitiveMetadataField]:
|
||
"""
|
||
Build metadata fields for dataset contact information.
|
||
|
||
Returns:
|
||
list: List of metadata fields for the dataset contact.
|
||
"""
|
||
return [
|
||
PrimitiveMetadataField("datasetContactName", False, self.format_name()),
|
||
PrimitiveMetadataField("datasetContactAffiliation", False, self.affiliation),
|
||
PrimitiveMetadataField("datasetContactEmail", False, self.email)
|
||
]
|
||
|
||
@dataclass
|
||
class License:
|
||
"""
|
||
Represents a license with name, URI, and short identifier.
|
||
|
||
Attributes:
|
||
name (str): The full name of the license.
|
||
uri (str): The license URI.
|
||
short (str): The short identifier of the license.
|
||
"""
|
||
name: str
|
||
uri: str
|
||
short: str
|
||
|
||
@dataclass
|
||
class Abstract:
|
||
"""
|
||
Represents an abstract with its text and source.
|
||
|
||
Attributes:
|
||
text (str): The abstract text.
|
||
source (str): The source of the abstract ('crossref', 'openalex', or 'none').
|
||
"""
|
||
text: str
|
||
source: str
|
||
|
||
def __post_init__(self):
|
||
allowed_sources = ["crossref", "openalex", "none"]
|
||
if self.source not in allowed_sources:
|
||
raise ValueError(f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}.")
|
||
|
||
@dataclass
|
||
class ConfigData:
|
||
"""
|
||
Represents configuration data loaded from a YAML file.
|
||
|
||
Attributes:
|
||
dataverse (dict[str, str]): Dataverse-related configuration.
|
||
phase (dict[str, dict[str, int]]): Mapping of project phases.
|
||
pis (list[dict[str, Any]]): List of principal investigator configurations.
|
||
default_grants (list[dict[str, str]]): Default grant configurations.
|
||
"""
|
||
dataverse: dict[str, str]
|
||
phase: dict[str, dict[str, int]]
|
||
pis: list[dict[str, Any]]
|
||
default_grants: list[dict[str, str]]
|
||
|
||
class Config:
|
||
"""
|
||
Singleton class to handle configuration loading and retrieval.
|
||
"""
|
||
_instance: 'Config | None' = None
|
||
_config_data: ConfigData | None = None
|
||
|
||
def __new__(cls) -> 'Config':
|
||
"""
|
||
Create and return the singleton instance of Config.
|
||
|
||
Returns:
|
||
Config: The singleton instance.
|
||
"""
|
||
if cls._instance is None:
|
||
cls._instance = super(Config, cls).__new__(cls)
|
||
return cls._instance
|
||
|
||
@classmethod
|
||
def load_config(cls, config_path: str | Path | None = None) -> None:
|
||
"""
|
||
Load configuration from a YAML file.
|
||
|
||
Args:
|
||
config_path (str | Path | None): Path to the configuration file.
|
||
If None, the default config.yaml in the script directory is used.
|
||
|
||
Raises:
|
||
FileNotFoundError: If the configuration file does not exist.
|
||
ValueError: If any PI email address is invalid.
|
||
"""
|
||
if config_path is None:
|
||
config_path = Path(__file__).parent / "config.yaml"
|
||
|
||
config_path = Path(config_path)
|
||
if not config_path.exists():
|
||
raise FileNotFoundError(f"Config file not found: {config_path}")
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config_data = yaml.safe_load(f)
|
||
|
||
# Validate PI email addresses
|
||
pis = config_data.get('pis', [])
|
||
for pi in pis:
|
||
if email := pi.get('email'):
|
||
if not validate_email_address(email):
|
||
raise ValueError(f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}")
|
||
|
||
cls._config_data = ConfigData(
|
||
dataverse=config_data.get('dataverse', {}),
|
||
phase=config_data.get('phase', {}),
|
||
pis=config_data.get('pis', []),
|
||
default_grants=config_data.get('default_grants', [])
|
||
)
|
||
|
||
@classmethod
|
||
def get_config(cls) -> ConfigData:
|
||
"""
|
||
Retrieve the loaded configuration data.
|
||
|
||
Returns:
|
||
ConfigData: The configuration data.
|
||
|
||
Raises:
|
||
RuntimeError: If the configuration could not be loaded.
|
||
"""
|
||
if cls._config_data is None:
|
||
cls.load_config()
|
||
if cls._config_data is None:
|
||
raise RuntimeError("Failed to load configuration")
|
||
return cls._config_data
|
||
|
||
@property
|
||
def PHASE(self) -> dict[str, dict[str, int]]:
|
||
"""
|
||
Get phase configuration.
|
||
|
||
Returns:
|
||
dict[str, dict[str, int]]: Mapping of phases.
|
||
"""
|
||
return self.get_config().phase
|
||
|
||
@property
|
||
def PIS(self) -> list[dict[str, Any]]:
|
||
"""
|
||
Get PI configurations.
|
||
|
||
Returns:
|
||
list[dict[str, Any]]: List of PI configurations.
|
||
"""
|
||
return self.get_config().pis
|
||
|
||
@property
|
||
def DEFAULT_GRANTS(self) -> list[dict[str, str]]:
|
||
"""
|
||
Get default grant configurations.
|
||
|
||
Returns:
|
||
list[dict[str, str]]: List of default grants.
|
||
"""
|
||
return self.get_config().default_grants
|
||
|
||
@property
|
||
def DATAVERSE(self) -> dict[str, str]:
|
||
"""
|
||
Get Dataverse configurations.
|
||
|
||
Returns:
|
||
dict[str, str]: Dataverse configuration.
|
||
"""
|
||
return self.get_config().dataverse
|
||
|
||
class APIClient:
|
||
"""
|
||
Client for making HTTP requests to external APIs.
|
||
|
||
Attributes:
|
||
session (requests.Session): The underlying requests session.
|
||
"""
|
||
def __init__(self, contact_mail: str | None = None, user_agent: str = f"UDE-Doi2Dataset/{VERSION}", token: str | None = None) -> None:
|
||
"""
|
||
Initialize the API client with optional contact mail, user agent, and token.
|
||
|
||
Args:
|
||
contact_mail (str | None): Contact email address.
|
||
user_agent (str): User agent string.
|
||
token (str | None): Optional API token.
|
||
"""
|
||
self.session = requests.Session()
|
||
self._set_headers(contact_mail, user_agent, token)
|
||
|
||
def _set_headers(self, contact_mail: str | None, user_agent: str, token: str | None) -> None:
|
||
"""
|
||
Set HTTP headers for the session based on contact email and token.
|
||
|
||
Args:
|
||
contact_mail (str | None): Contact email address.
|
||
user_agent (str): User agent string.
|
||
token (str | None): Optional API token.
|
||
"""
|
||
if contact_mail:
|
||
header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"}
|
||
else:
|
||
header = {"User-Agent": user_agent}
|
||
|
||
if token:
|
||
header["X-Dataverse-key"] = token
|
||
|
||
self.session.headers.update(header)
|
||
|
||
def make_request(self, url: str, method: str = "GET", **kwargs: Any) -> requests.Response | None:
|
||
"""
|
||
Make an HTTP request and return the response.
|
||
|
||
Args:
|
||
url (str): The URL to request.
|
||
method (str): HTTP method to use (default: GET).
|
||
**kwargs: Additional arguments for requests.request.
|
||
|
||
Returns:
|
||
requests.Response | None: The HTTP response, or None if the request failed.
|
||
"""
|
||
try:
|
||
response = self.session.request(method, url, **kwargs)
|
||
response.raise_for_status()
|
||
return response
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"\n{ICONS['error']} Request failed: {str(e)}")
|
||
return None
|
||
|
||
class NameProcessor:
|
||
"""
|
||
Provides utility methods for processing names.
|
||
"""
|
||
@staticmethod
|
||
def normalize_string(s: str) -> str:
|
||
"""
|
||
Normalize a string using Unicode NFKD normalization and convert to ASCII.
|
||
|
||
Args:
|
||
s (str): The string to normalize.
|
||
|
||
Returns:
|
||
str: The normalized string.
|
||
"""
|
||
return unicodedata.normalize("NFKD", s.lower()).encode("ASCII", "ignore").decode("ASCII")
|
||
|
||
@staticmethod
|
||
def split_name(full_name: str) -> tuple[str, str]:
|
||
"""
|
||
Split a full name into given and family names.
|
||
|
||
Args:
|
||
full_name (str): The full name (e.g., "Doe, John" or "John Doe").
|
||
|
||
Returns:
|
||
tuple[str, str]: A tuple (given_name, family_name).
|
||
"""
|
||
if "," in full_name:
|
||
surname, given_name = full_name.split(",", 1)
|
||
return given_name.strip(), surname.strip()
|
||
|
||
parts = full_name.strip().split()
|
||
if len(parts) == 1:
|
||
return "", parts[0]
|
||
|
||
return " ".join(parts[:-1]), parts[-1]
|
||
|
||
class PIFinder:
|
||
"""
|
||
Finds principal investigators (PIs) among a list of Person objects.
|
||
"""
|
||
def __init__(self, pis: list[Person]) -> None:
|
||
"""
|
||
Initialize with a list of Person objects representing potential PIs.
|
||
|
||
Args:
|
||
pis (list[Person]): List of Person objects.
|
||
"""
|
||
self.pis = pis
|
||
|
||
def find_pi(self, family_name: str | None = None, orcid: str | None = None, given_name: str | None = None) -> Person | None:
|
||
"""
|
||
Find a PI by ORCID or name.
|
||
|
||
Args:
|
||
family_name (str | None): Family name.
|
||
orcid (str | None): ORCID identifier.
|
||
given_name (str | None): Given name.
|
||
|
||
Returns:
|
||
Person | None: The matched PI or None if not found.
|
||
"""
|
||
if orcid:
|
||
return self._find_by_orcid(normalize_orcid(orcid))
|
||
|
||
if family_name:
|
||
return self._find_by_name(family_name, given_name)
|
||
|
||
return None
|
||
|
||
def _find_by_orcid(self, orcid: str) -> Person | None:
|
||
"""
|
||
Find a PI by ORCID.
|
||
|
||
Args:
|
||
orcid (str): Normalized ORCID.
|
||
|
||
Returns:
|
||
Person | None: The matched PI or None.
|
||
"""
|
||
for person in self.pis:
|
||
if normalize_orcid(person.orcid) == orcid:
|
||
return person
|
||
return None
|
||
|
||
def _find_by_name(self, family_name: str, given_name: str | None) -> Person | None:
|
||
"""
|
||
Find a PI by family name (and optionally given name).
|
||
|
||
Args:
|
||
family_name (str): Family name.
|
||
given_name (str | None): Given name (optional).
|
||
|
||
Returns:
|
||
Person | None: The matched PI or None.
|
||
"""
|
||
matches: list[Person] = []
|
||
normalized_family_name = NameProcessor.normalize_string(family_name)
|
||
|
||
for person in self.pis:
|
||
if NameProcessor.normalize_string(person.family_name) == normalized_family_name:
|
||
matches.append(person)
|
||
|
||
if not matches:
|
||
return None
|
||
|
||
if given_name:
|
||
normalized_given_name = NameProcessor.normalize_string(given_name)
|
||
for match in matches:
|
||
if NameProcessor.normalize_string(match.given_name) == normalized_given_name:
|
||
return match
|
||
return None
|
||
|
||
if len(matches) == 1:
|
||
return matches[0]
|
||
|
||
raise ValueError("Multiple matches found for family name")
|
||
|
||
class LicenseProcessor:
|
||
"""
|
||
Processes license information from metadata.
|
||
"""
|
||
LICENSE_MAP = {
|
||
"cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"),
|
||
"cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"),
|
||
"cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"),
|
||
"cc-by-nc-sa": ("https://creativecommons.org/licenses/by-nc-sa/4.0/", "CC BY-NC-SA 4.0"),
|
||
"cc-by-nc-nd": ("https://creativecommons.org/licenses/by-nc-nd/4.0/", "CC BY-NC-ND 4.0"),
|
||
"cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"),
|
||
"cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"),
|
||
"pd": ("https://creativecommons.org/publicdomain/mark/1.0/", "Public Domain Mark 1.0"),
|
||
}
|
||
|
||
@classmethod
|
||
def process_license(cls, data: dict[str, Any]) -> License:
|
||
"""
|
||
Process and return license information based on input data.
|
||
|
||
Args:
|
||
data (dict[str, Any]): Input data containing license info.
|
||
|
||
Returns:
|
||
License: Processed license information.
|
||
"""
|
||
location = data.get("primary_location", {})
|
||
license_short = location.get("license", "")
|
||
|
||
if not license_short:
|
||
return License(name="", uri="", short="unknown")
|
||
|
||
base_license = license_short.split("/")[0].lower()
|
||
uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short))
|
||
return License(name=name, uri=uri, short=license_short)
|
||
|
||
class AbstractProcessor:
|
||
"""
|
||
Retrieves and processes abstracts from CrossRef and OpenAlex.
|
||
"""
|
||
def __init__(self, api_client: APIClient):
|
||
"""
|
||
Initialize with an APIClient instance.
|
||
|
||
Args:
|
||
api_client (APIClient): The API client to use for requests.
|
||
"""
|
||
self.api_client = api_client
|
||
|
||
def get_abstract(self, doi: str, data: dict[str, Any], license: License) -> Abstract:
|
||
"""
|
||
Get an abstract based on DOI and license permissions.
|
||
|
||
Args:
|
||
doi (str): The DOI.
|
||
data (dict[str, Any]): Data retrieved from an external source.
|
||
license (License): License information.
|
||
|
||
Returns:
|
||
Abstract: The abstract with its source.
|
||
"""
|
||
license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"}
|
||
|
||
if license.short in license_ok:
|
||
console.print(f"\n{ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.", style="info")
|
||
crossref_abstract = self._get_crossref_abstract(doi)
|
||
if crossref_abstract:
|
||
return Abstract(text=crossref_abstract, source="crossref")
|
||
else:
|
||
console.print(f"\n{ICONS['warning']} No abstract found in CrossRef!", style="warning")
|
||
else:
|
||
console.print(f"\n{ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!", style="info")
|
||
|
||
|
||
openalex_abstract = self._get_openalex_abstract(data)
|
||
if openalex_abstract:
|
||
return Abstract(text=openalex_abstract, source="openalex")
|
||
else:
|
||
console.print(f"\n{ICONS['warning']} No abstract found in OpenAlex!", style="warning")
|
||
|
||
console.print(f"\n{ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!", style="warning")
|
||
return Abstract(text="", source="none")
|
||
|
||
def _get_crossref_abstract(self, doi: str) -> str | None:
|
||
"""
|
||
Retrieve abstract from CrossRef API.
|
||
|
||
Args:
|
||
doi (str): The DOI.
|
||
|
||
Returns:
|
||
str | None: The abstract if found, otherwise None.
|
||
"""
|
||
url = f"https://api.crossref.org/works/{doi}"
|
||
response = self.api_client.make_request(url)
|
||
|
||
if response and response.status_code == 200:
|
||
abstract_raw = response.json().get("message", {}).get("abstract")
|
||
return self._clean_jats(abstract_raw)
|
||
return None
|
||
|
||
def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None:
|
||
"""
|
||
Retrieve abstract from OpenAlex data.
|
||
|
||
Args:
|
||
data (dict[str, Any]): Data from OpenAlex.
|
||
|
||
Returns:
|
||
str | None: The reconstructed abstract, or None if not available.
|
||
"""
|
||
inv_index = data.get("abstract_inverted_index")
|
||
if not inv_index:
|
||
return None
|
||
|
||
word_positions = [(word, pos) for word, positions in inv_index.items() for pos in positions]
|
||
sorted_words = sorted(word_positions, key=lambda x: x[1])
|
||
return " ".join(word for word, _ in sorted_words)
|
||
|
||
def _clean_jats(self, text: str | None) -> str:
|
||
"""
|
||
Clean JATS XML tags in the abstract and convert them to HTML tags.
|
||
|
||
Args:
|
||
text (str | None): The raw abstract text containing JATS tags.
|
||
|
||
Returns:
|
||
str: The cleaned abstract text.
|
||
"""
|
||
if not text:
|
||
return ""
|
||
|
||
replacements = {
|
||
"<jats:italic>": "<i>",
|
||
"</jats:italic>": "</i>",
|
||
"<jats:bold>": "<b>",
|
||
"</jats:bold>": "</b>",
|
||
"<jats:sup>": "<sup>",
|
||
"</jats:sup>": "</sup>",
|
||
"<jats:sub>": "<sub>",
|
||
"</jats:sub>": "</sub>",
|
||
"<jats:underline>": "<u>",
|
||
"</jats:underline>": "</u>",
|
||
"<jats:monospace>": "<code>",
|
||
"</jats:monospace>": "</code>",
|
||
"<jats:sc>": "<small>",
|
||
"</jats:sc>": "</small>",
|
||
"<jats:p>": "<p>",
|
||
"</jats:p>": "</p>",
|
||
"<jats:title>": "<h2>",
|
||
"</jats:title>": "</h2>",
|
||
'<jats:list list-type="bullet">': "<ul>",
|
||
"</jats:list>": "</ul>",
|
||
'<jats:list list-type="order">': "<ol>",
|
||
"</jats:list>": "</ol>",
|
||
"<jats:list-item>": "<li>",
|
||
"</jats:list-item>": "</li>",
|
||
"<jats:blockquote>": "<blockquote>",
|
||
"</jats:blockquote>": "</blockquote>",
|
||
}
|
||
|
||
for jats_tag, html_tag in replacements.items():
|
||
text = text.replace(jats_tag, html_tag)
|
||
return text
|
||
|
||
class SubjectMapper:
|
||
"""
|
||
Maps subject names from input data to controlled vocabulary.
|
||
"""
|
||
CONTROLLED_VOCAB = {
|
||
"Agricultural Sciences": "Agricultural Sciences",
|
||
"Arts and Humanities": "Arts and Humanities",
|
||
"Astronomy": "Astronomy and Astrophysics",
|
||
"Astrophysics": "Astronomy and Astrophysics",
|
||
"Business": "Business and Management",
|
||
"Management": "Business and Management",
|
||
"Chemistry": "Chemistry",
|
||
"Computer Science": "Computer and Information Science",
|
||
"Information Science": "Computer and Information Science",
|
||
"Earth Sciences": "Earth and Environmental Sciences",
|
||
"Environmental Sciences": "Earth and Environmental Sciences",
|
||
"Engineering": "Engineering",
|
||
"Law": "Law",
|
||
"Mathematics": "Mathematical Sciences",
|
||
"Medicine": "Medicine, Health and Life Sciences",
|
||
"Health Sciences": "Medicine, Health and Life Sciences",
|
||
"Life Sciences": "Medicine, Health and Life Sciences",
|
||
"Physics": "Physics",
|
||
"Social Sciences": "Social Sciences",
|
||
}
|
||
|
||
@classmethod
|
||
def get_subjects(cls, data: dict[str, Any], fallback_subject: str = "Other") -> list[str]:
|
||
"""
|
||
Extract and map subjects from input data.
|
||
|
||
Args:
|
||
data (dict[str, Any]): The input metadata.
|
||
fallback_subject (str): Fallback subject if none found.
|
||
|
||
Returns:
|
||
list[str]: List of mapped subject names.
|
||
"""
|
||
topics = data.get("topics", [])
|
||
subject_collection: list[Any] = []
|
||
|
||
for topic in topics:
|
||
for field_type in ["subfield", "field", "domain"]:
|
||
if field_name := topic.get(field_type, {}).get("display_name"):
|
||
subject_collection.append(field_name)
|
||
|
||
mapped_subjects = cls.map_subjects(subject_collection)
|
||
return mapped_subjects if mapped_subjects else [fallback_subject]
|
||
|
||
|
||
@classmethod
|
||
def map_subjects(cls, subjects: list[str]) -> list[str]:
|
||
"""
|
||
Map given subjects to valid controlled vocabulary terms.
|
||
|
||
Args:
|
||
subjects (list[str]): List of subjects.
|
||
|
||
Returns:
|
||
list[str]: List of valid subjects.
|
||
"""
|
||
valid_subjects: set[str] = set()
|
||
for subject in subjects:
|
||
if mapped_subject := cls.CONTROLLED_VOCAB.get(subject):
|
||
valid_subjects.add(mapped_subject)
|
||
return list(valid_subjects)
|
||
|
||
class CitationBuilder:
|
||
"""
|
||
Builds various citation-related metadata fields.
|
||
"""
|
||
def __init__(self, data: dict[str, Any], doi: str, pi_finder: PIFinder) -> None:
|
||
"""
|
||
Initialize the CitationBuilder with data, DOI, and a PIFinder.
|
||
|
||
Args:
|
||
data (dict[str, Any]): Metadata from an external source.
|
||
doi (str): The DOI.
|
||
pi_finder (PIFinder): Instance to find PI information.
|
||
"""
|
||
self.data = data
|
||
self.doi = doi
|
||
self.pi_finder = pi_finder
|
||
|
||
def build_other_ids(self) -> list[list[PrimitiveMetadataField]]:
|
||
"""
|
||
Build metadata fields for other identifiers (e.g., DOI, PMID).
|
||
|
||
Returns:
|
||
list[list[PrimitiveMetadataField]]: Nested list of identifier metadata fields.
|
||
"""
|
||
other_ids = [[
|
||
PrimitiveMetadataField("otherIdAgency", False, "doi"),
|
||
PrimitiveMetadataField("otherIdValue", False, self.doi)
|
||
]]
|
||
|
||
if pmid := self.data.get("ids", {}).get("pmid"):
|
||
try:
|
||
normalized_pmid = normalize_pmid(pmid)
|
||
other_ids.append([
|
||
PrimitiveMetadataField("otherIdAgency", False, "pmid"),
|
||
PrimitiveMetadataField("otherIdValue", False, normalized_pmid)
|
||
])
|
||
except ValueError:
|
||
pass
|
||
|
||
return other_ids
|
||
|
||
def build_grants(self) -> list[list[PrimitiveMetadataField]]:
|
||
"""
|
||
Build metadata fields for grants.
|
||
|
||
Returns:
|
||
list[list[PrimitiveMetadataField]]: Nested list of grant metadata fields.
|
||
"""
|
||
config = Config()
|
||
default_grants = config.DEFAULT_GRANTS
|
||
|
||
grants: list[list[PrimitiveMetadataField]] = []
|
||
|
||
for grant in default_grants:
|
||
grants.append([PrimitiveMetadataField("grantNumberAgency", False, grant["funder"]), PrimitiveMetadataField("grantNumberValue", False, grant["id"])])
|
||
|
||
for grant in self.data.get("grants", []):
|
||
grant_funder = grant.get("funder_display_name", {})
|
||
grant_id = grant.get("award_id", {})
|
||
if not grant_funder or not grant_id:
|
||
continue
|
||
|
||
grants.append([PrimitiveMetadataField("grantNumberAgency", False, grant_funder), PrimitiveMetadataField("grantNumberValue", False, grant_id)])
|
||
|
||
return grants
|
||
|
||
|
||
def build_authors(self) -> tuple[list[Person], list[Person]]:
|
||
"""
|
||
Build lists of authors and corresponding authors from the metadata.
|
||
|
||
Returns:
|
||
tuple: (authors, corresponding_authors)
|
||
"""
|
||
authors: list[Person] = []
|
||
corresponding_authors: list[Person] = []
|
||
for authorship in self.data.get("authorships", []):
|
||
author = authorship.get("author", {})
|
||
if not author:
|
||
continue
|
||
|
||
author_person = self._process_author(author, authorship)
|
||
authors.append(author_person)
|
||
|
||
if authorship.get("is_corresponding"):
|
||
corresponding_entry = self._process_corresponding_author(author_person, authorship)
|
||
if corresponding_entry:
|
||
corresponding_authors.append(corresponding_entry)
|
||
|
||
return authors, corresponding_authors
|
||
|
||
|
||
def _process_author(self, author: dict[str, Any], authorship: dict[str, Any]) -> Person:
|
||
"""
|
||
Process author data and return a Person instance.
|
||
|
||
Args:
|
||
author (dict[str, Any]): Author data.
|
||
authorship (dict[str, Any]): Authorship metadata.
|
||
|
||
Returns:
|
||
Person: Processed author.
|
||
"""
|
||
display_name = author.get("display_name", "")
|
||
given_name, family_name = NameProcessor.split_name(display_name)
|
||
|
||
person = Person(family_name, given_name)
|
||
|
||
if affiliations := authorship.get("affiliations"):
|
||
affiliation = affiliations[0].get("raw_affiliation_string", "").strip()
|
||
person.affiliation = affiliation
|
||
|
||
if orcid := author.get("orcid"):
|
||
person.orcid = normalize_orcid(orcid)
|
||
|
||
return person
|
||
|
||
|
||
def _process_corresponding_author(self, author: Person, authorship: dict[str, Any]) -> Person | None:
|
||
"""
|
||
Identify the corresponding author based on provided PI information.
|
||
|
||
Args:
|
||
author (Person): The author.
|
||
authorship (dict[str, Any]): Authorship metadata.
|
||
|
||
Returns:
|
||
Person | None: The corresponding author, or None if not found.
|
||
"""
|
||
pi = self.pi_finder.find_pi(
|
||
family_name=author.family_name,
|
||
given_name=author.given_name,
|
||
orcid=author.orcid
|
||
)
|
||
|
||
if not pi:
|
||
return None
|
||
|
||
return pi
|
||
|
||
def build_topics(self) -> list[list[PrimitiveMetadataField]]:
|
||
"""
|
||
Build metadata fields for topics based on a threshold score.
|
||
|
||
Returns:
|
||
list[list[PrimitiveMetadataField]]: Nested list of topic metadata fields.
|
||
"""
|
||
topics: list[list[PrimitiveMetadataField]] = []
|
||
|
||
for topic in self.data.get("topics", []):
|
||
if topic.get("score") >= 0.8:
|
||
if name := topic.get("display_name"):
|
||
topics.append([PrimitiveMetadataField("topicClassValue", False, name)])
|
||
|
||
return topics
|
||
|
||
|
||
def build_keywords(self) -> list[list[PrimitiveMetadataField]]:
|
||
"""
|
||
Build metadata fields for keywords from both regular keywords and MeSH terms.
|
||
|
||
Returns:
|
||
list[list[PrimitiveMetadataField]]: Nested list of keyword metadata fields.
|
||
"""
|
||
keywords: list[list[PrimitiveMetadataField]] = []
|
||
|
||
for keyword in self.data.get("keywords", []):
|
||
# Filter out possibly unrelated keywords (low score)
|
||
if keyword["score"] >= 0.5:
|
||
keyword_value_field = PrimitiveMetadataField("keywordValue", False, keyword["display_name"])
|
||
keywords.append([keyword_value_field])
|
||
|
||
mesh_base_url = "http://id.nlm.nih.gov/mesh"
|
||
for mesh in self.data.get("mesh", []):
|
||
url = f"{mesh_base_url}/{mesh['descriptor_ui']}"
|
||
if mesh["qualifier_ui"]:
|
||
url = f"{url}{mesh['qualifier_ui']}"
|
||
|
||
|
||
keyword_value_field = PrimitiveMetadataField("keywordValue", False, mesh["descriptor_name"])
|
||
keyword_term_uri_field = PrimitiveMetadataField("keywordTermURI", False, url)
|
||
keyword_vocabulary_field = PrimitiveMetadataField("keywordVocabulary", False, "MeSH")
|
||
keyword_vocabulary_uri_field = PrimitiveMetadataField("keywordVocabularyURI", False, mesh_base_url)
|
||
|
||
keywords.append([keyword_value_field, keyword_term_uri_field, keyword_vocabulary_field, keyword_vocabulary_uri_field])
|
||
|
||
return keywords
|
||
|
||
class MetadataProcessor:
|
||
"""
|
||
Processes metadata for a given DOI by fetching data from OpenAlex,
|
||
building metadata blocks, and optionally uploading the dataset.
|
||
"""
|
||
def __init__(
|
||
self,
|
||
doi: str,
|
||
depositor: str | None = None,
|
||
output_path: Path | None = None,
|
||
default_subject: str = "Other",
|
||
contact_mail: str | None = None,
|
||
upload: bool = False,
|
||
console: Console | None = None,
|
||
progress: Progress | None = None,
|
||
task_id: TaskID | None = None
|
||
) -> None:
|
||
"""
|
||
Initialize the MetadataProcessor with configuration and processing options.
|
||
|
||
Args:
|
||
doi (str): The DOI to process.
|
||
depositor (str | None): Depositor name.
|
||
output_path (Path | None): Path where metadata will be saved.
|
||
default_subject (str): Default subject.
|
||
contact_mail (str | None): Contact email address.
|
||
upload (bool): Whether to upload metadata.
|
||
console (Console | None): Rich console instance.
|
||
progress (Progress | None): Progress bar instance.
|
||
task_id (TaskID | None): Task ID for progress updates.
|
||
"""
|
||
self.console = console or Console()
|
||
try:
|
||
self.doi = self._validate_doi(doi)
|
||
except ValueError as e:
|
||
print(f"Error: {str(e)}")
|
||
raise
|
||
self.depositor = depositor
|
||
self.output_path = output_path
|
||
self.default_subject = default_subject
|
||
self.api_client = APIClient(contact_mail)
|
||
config = Config()
|
||
pi_objects = [Person(**pi) for pi in config.PIS]
|
||
self.pi_finder = PIFinder(pi_objects)
|
||
self.upload = upload
|
||
self.progress = progress
|
||
self.task_id = task_id
|
||
|
||
@staticmethod
|
||
def _validate_doi(doi: str) -> str:
|
||
"""
|
||
Validate and normalize a DOI.
|
||
|
||
Args:
|
||
doi (str): The DOI to validate.
|
||
|
||
Returns:
|
||
str: Normalized DOI.
|
||
|
||
Raises:
|
||
ValueError: If the DOI is invalid.
|
||
"""
|
||
if not is_doi(doi):
|
||
raise ValueError(f"Invalid DOI: {doi}")
|
||
return normalize_doi(doi)
|
||
|
||
def _update_progress(self) -> None:
|
||
"""
|
||
Advance the progress bar if enabled.
|
||
"""
|
||
if self.progress and self.task_id is not None:
|
||
self.progress.advance(self.task_id)
|
||
|
||
def process(self) -> dict[str, Any]:
|
||
"""
|
||
Process the DOI: fetch data, build metadata, optionally upload, and save output.
|
||
|
||
Returns:
|
||
dict[str, Any]: The constructed metadata dictionary.
|
||
"""
|
||
self.console.print(f"{ICONS['processing']} Processing DOI: {self.doi}", style="info")
|
||
|
||
data = self._fetch_data()
|
||
self._update_progress()
|
||
|
||
metadata = self._build_metadata(data)
|
||
self._update_progress()
|
||
|
||
if self.upload:
|
||
self._upload_data(metadata)
|
||
self._update_progress()
|
||
|
||
self._save_output(metadata)
|
||
self._update_progress()
|
||
|
||
self.console.print(f"\n{ICONS['success']} Successfully processed: {self.doi}\n", style="success")
|
||
return metadata
|
||
|
||
def _upload_data(self, metadata: dict[str, Any]) -> dict[str, Any]:
|
||
"""
|
||
Upload the metadata to Dataverse.
|
||
|
||
Args:
|
||
metadata (dict[str, Any]): The metadata to upload.
|
||
|
||
Returns:
|
||
dict[str, Any]: The response from the Dataverse API.
|
||
|
||
Raises:
|
||
ValueError: If the upload fails.
|
||
"""
|
||
config = Config()
|
||
|
||
token = config.DATAVERSE['api_token']
|
||
client = APIClient(token=token)
|
||
url = f"{config.DATAVERSE['url']}/api/dataverses/{config.DATAVERSE['dataverse']}/datasets?doNotValidate=true"
|
||
auth = (config.DATAVERSE['auth_user'], config.DATAVERSE['auth_password'])
|
||
|
||
response = client.make_request(url, method="POST", auth=auth, json=metadata)
|
||
|
||
if response is None or response.status_code != 201:
|
||
self.console.print(f"\n{ICONS['error']} Failed to upload to Dataverse: {url}", style="error")
|
||
raise ValueError(f"Failed to upload to Dataverse: {url}")
|
||
else:
|
||
perma = response.json().get("data", {}).get("persistentId", "")
|
||
self.console.print(f"{ICONS['upload']} Dataset uploaded to: {config.DATAVERSE['dataverse']} with ID {perma}", style="info")
|
||
|
||
return response.json()
|
||
|
||
def _fetch_data(self) -> dict[str, Any]:
|
||
"""
|
||
Fetch metadata from OpenAlex for the given DOI.
|
||
|
||
Returns:
|
||
dict[str, Any]: The fetched data.
|
||
|
||
Raises:
|
||
ValueError: If data fetching fails.
|
||
"""
|
||
url = f"https://api.openalex.org/works/https://doi.org/{self.doi}"
|
||
response = self.api_client.make_request(url)
|
||
|
||
if response is None or response.status_code != 200:
|
||
self.console.print(f"\n{ICONS['error']} Failed to fetch data for DOI: {self.doi}", style="error")
|
||
raise ValueError(f"Failed to fetch data for DOI: {self.doi}")
|
||
|
||
return response.json()
|
||
|
||
def _build_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
|
||
"""
|
||
Construct the complete metadata dictionary from fetched data.
|
||
|
||
Args:
|
||
data (dict[str, Any]): The data retrieved from OpenAlex.
|
||
|
||
Returns:
|
||
dict[str, Any]: The complete metadata dictionary.
|
||
"""
|
||
license_info = LicenseProcessor.process_license(data)
|
||
abstract_processor = AbstractProcessor(self.api_client)
|
||
abstract = abstract_processor.get_abstract(self.doi, data, license_info)
|
||
citation_builder = CitationBuilder(data, self.doi, self.pi_finder)
|
||
|
||
authors, corresponding_authors = citation_builder.build_authors()
|
||
author_fields: list[list[PrimitiveMetadataField | ControlledVocabularyMetadataField]] = []
|
||
corresponding_author_fields: list[list[PrimitiveMetadataField]] = []
|
||
for author in authors:
|
||
author_fields.append(author.author_fields())
|
||
|
||
if not corresponding_authors:
|
||
self.console.print(f"{ICONS['warning']} No corresponding authors explicitly declared; PIs are used as a fallback!", style="warning")
|
||
pis = self._get_involved_pis(data)
|
||
corresponding_authors: list[Person]
|
||
for pi in pis:
|
||
corresponding_authors.append(pi)
|
||
|
||
for corresponding_author in corresponding_authors:
|
||
corresponding_author_fields.append(corresponding_author.dataset_contact_fields())
|
||
|
||
description = self._build_description(data, abstract)
|
||
|
||
grants = citation_builder.build_grants()
|
||
|
||
return_dict: dict[str, Any] = {
|
||
"datasetVersion": {
|
||
"metadataBlocks": {
|
||
"citation": {
|
||
"fields": [
|
||
PrimitiveMetadataField("title", False, data.get("title", "")).to_dict(),
|
||
PrimitiveMetadataField("distributionDate", False, data.get("publication_date", "")).to_dict(),
|
||
CompoundMetadataField("otherId", True, citation_builder.build_other_ids()).to_dict(),
|
||
CompoundMetadataField("dsDescription", True, [[PrimitiveMetadataField("dsDescriptionValue", False, description)]]).to_dict(),
|
||
ControlledVocabularyMetadataField("subject", True, SubjectMapper.get_subjects(data, self.default_subject)).to_dict(),
|
||
CompoundMetadataField("topicClassification", True, citation_builder.build_topics()).to_dict(),
|
||
CompoundMetadataField("keyword", True, citation_builder.build_keywords()).to_dict(),
|
||
PrimitiveMetadataField("depositor", False, self.depositor or data["primary_location"]["source"].get("display_name", "")).to_dict(),
|
||
PrimitiveMetadataField("alternativeURL", False, f"https://doi.org/{self.doi}").to_dict(),
|
||
CompoundMetadataField("author", True, author_fields).to_dict(),
|
||
CompoundMetadataField("datasetContact", True, corresponding_author_fields).to_dict(),
|
||
CompoundMetadataField("grantNumber", True, grants).to_dict()
|
||
],
|
||
"displayName": "Citation Metadata"
|
||
},
|
||
"crc1430_org_v1": self._build_organization_metadata(data)
|
||
},
|
||
"files": []
|
||
}
|
||
}
|
||
|
||
if license_info.name:
|
||
return_dict["datasetVersion"]["license"] = {
|
||
"name": license_info.name,
|
||
"uri": license_info.uri
|
||
},
|
||
else:
|
||
return_dict["datasetVersion"]["termsOfUse"] = f"All rights reserved. Copyright © {self._get_publication_year(data)}, [TODO: Insert copyright holder here!]"
|
||
|
||
return return_dict
|
||
|
||
def _build_description(self, data: dict[str, Any], abstract: Abstract) -> str:
|
||
"""
|
||
Build the description field by combining a header and the abstract.
|
||
|
||
Args:
|
||
data (dict[str, Any]): The metadata.
|
||
abstract (Abstract): The abstract object.
|
||
|
||
Returns:
|
||
str: The full description.
|
||
"""
|
||
head = self._build_description_head(data)
|
||
return f"{head}{abstract.text}"
|
||
|
||
def _build_description_head(self, data: dict[str, Any]) -> str:
|
||
"""
|
||
Build the header for the description based on publication details.
|
||
|
||
Args:
|
||
data (dict[str, Any]): The metadata.
|
||
|
||
Returns:
|
||
str: The HTML header string.
|
||
"""
|
||
journal = data.get("primary_location", {}).get("source", {}).get("display_name")
|
||
publication_date = data.get("publication_date")
|
||
volume = data.get("biblio", {}).get("volume")
|
||
issue = data.get("biblio", {}).get("issue")
|
||
type = data.get("type")
|
||
|
||
if all([journal, publication_date, volume, issue, type]):
|
||
return f"<p>This {type} was published on {publication_date} in <i>{journal}</i> {volume}({issue})</p>"
|
||
elif all([journal, publication_date, type]):
|
||
return f"<p>This {type} was published on {publication_date} in <i>{journal}</i></p>"
|
||
|
||
self.console.print(f"{ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)", style="warning")
|
||
return ""
|
||
|
||
def _get_publication_year(self, data: dict[str, Any]) -> str:
|
||
"""
|
||
Extract the publication year from the metadata.
|
||
|
||
Args:
|
||
data (dict[str, Any]): The metadata.
|
||
|
||
Returns:
|
||
str: The publication year.
|
||
"""
|
||
return data.get("publication_year", "")
|
||
|
||
def _build_organization_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
|
||
"""
|
||
Build organization metadata fields (phase, project, PI names).
|
||
|
||
Args:
|
||
data (dict[str, Any]): The metadata.
|
||
|
||
Returns:
|
||
dict[str, Any]: Organization metadata.
|
||
"""
|
||
publication_year = self._get_publication_year(data)
|
||
if publication_year:
|
||
phases = self._get_phases(int(publication_year))
|
||
else:
|
||
phases = []
|
||
|
||
pis = self._get_involved_pis(data)
|
||
projects: list[str] = []
|
||
for pi in pis:
|
||
for project in pi.project:
|
||
projects.append(project)
|
||
|
||
pi_names: list[str] = []
|
||
for pi in pis:
|
||
pi_names.append(pi.format_name())
|
||
|
||
# Deduplicate projects and PI names
|
||
unique_projects = list(set(projects))
|
||
unique_pi_names = list(set(pi_names))
|
||
|
||
return {
|
||
"fields": [
|
||
ControlledVocabularyMetadataField("crc1430OrgV1Phase", True, phases).to_dict(),
|
||
ControlledVocabularyMetadataField("crc1430OrgV1Project", True, unique_projects).to_dict(),
|
||
ControlledVocabularyMetadataField("crc1430OrgV1PI", True, unique_pi_names).to_dict()
|
||
]
|
||
}
|
||
|
||
def _get_phases(self, year: int) -> list[str]:
|
||
"""
|
||
Determine the project phases matching a given publication year.
|
||
|
||
Args:
|
||
year (int): The publication year.
|
||
|
||
Returns:
|
||
list[str]: List of matching phase names.
|
||
"""
|
||
config = Config()
|
||
matching_phases: list[str] = []
|
||
for phase_name, phase_info in config.PHASE.items():
|
||
phase = Phase(phase_name, phase_info["start"], phase_info["end"])
|
||
if phase.check_year(year):
|
||
matching_phases.append(phase.name)
|
||
return matching_phases
|
||
|
||
def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]:
|
||
"""
|
||
Identify involved principal investigators from the metadata.
|
||
|
||
Args:
|
||
data (dict[str, Any]): The metadata.
|
||
|
||
Returns:
|
||
list[Person]: List of PIs.
|
||
"""
|
||
involved_pis: list[Person] = []
|
||
for authorship in data.get("authorships", []):
|
||
author = authorship.get("author", {})
|
||
if not author:
|
||
continue
|
||
|
||
display_name = author.get("display_name", "")
|
||
given_name, family_name = NameProcessor.split_name(display_name)
|
||
|
||
if pi := self.pi_finder.find_pi(
|
||
family_name=family_name,
|
||
given_name=given_name,
|
||
orcid=author.get("orcid")
|
||
):
|
||
involved_pis.append(pi)
|
||
|
||
return involved_pis
|
||
|
||
def _save_output(self, metadata: dict[str, Any]) -> None:
|
||
"""
|
||
Save the generated metadata to a file or print it to the console.
|
||
|
||
Args:
|
||
metadata (dict[str, Any]): The metadata to save.
|
||
"""
|
||
if self.output_path:
|
||
try:
|
||
with open(self.output_path, "w", encoding="utf-8") as f:
|
||
json.dump(metadata, f, indent=4, ensure_ascii=False)
|
||
self.console.print(f"{ICONS['save']} Metadata saved in: {self.output_path}", style="info")
|
||
except Exception as e:
|
||
self.console.print(f"{ICONS['error']} Error saving metadata: {str(e)}\n", style="error")
|
||
raise
|
||
else:
|
||
self.console.print(metadata)
|
||
|
||
def sanitize_filename(doi: str) -> str:
|
||
"""
|
||
Convert DOI to a valid filename using only alphanumeric characters and underscores.
|
||
|
||
Args:
|
||
doi (str): The DOI to sanitize.
|
||
|
||
Returns:
|
||
str: Sanitized filename string.
|
||
"""
|
||
# Replace non-alphanumeric characters with underscores
|
||
sanitized = ''.join(c if c.isalnum() else '_' for c in doi)
|
||
# Remove consecutive underscores
|
||
while '__' in sanitized:
|
||
sanitized = sanitized.replace('__', '_')
|
||
# Remove leading/trailing underscores
|
||
return sanitized.strip('_')
|
||
|
||
def print_summary(results: dict[str, list[Any]], console: Console) -> None:
|
||
"""
|
||
Print a summary table of processing results to the console.
|
||
|
||
Args:
|
||
results (dict[str, list[Any]]): Dictionary containing success and failed DOIs.
|
||
console (Console): Rich console object for output.
|
||
"""
|
||
table = Table(title="Processing Results")
|
||
|
||
table.add_column("Status", style="bold")
|
||
table.add_column("Count", justify="right")
|
||
table.add_column("DOIs", style="dim")
|
||
|
||
table.add_row(
|
||
f"{ICONS['success']} Success",
|
||
str(len(results["success"])),
|
||
", ".join(results["success"][:3]) + ("..." if len(results["success"]) > 3 else "")
|
||
)
|
||
|
||
if results["failed"]:
|
||
table.add_row(
|
||
f"{ICONS['error']} Failed",
|
||
str(len(results["failed"])),
|
||
", ".join(doi for doi, _ in results["failed"][:3]) +
|
||
("..." if len(results["failed"]) > 3 else "")
|
||
)
|
||
|
||
console.print(Panel(table, title="Summary", border_style="blue"))
|
||
|
||
def validate_email_address(email: str):
|
||
"""
|
||
Validate an email address and ensure its domain has an MX record.
|
||
|
||
Args:
|
||
email (str): The email address to validate.
|
||
|
||
Returns:
|
||
bool: True if the email address is valid and its domain resolves, otherwise False.
|
||
"""
|
||
try:
|
||
# Basic validation
|
||
valid = validate_email(email)
|
||
email = valid.normalized
|
||
|
||
# Check domain has MX record
|
||
domain = email.split('@')[1]
|
||
dns.resolver.resolve(domain, 'MX')
|
||
|
||
return True
|
||
except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
|
||
return False
|
||
|
||
def process_doi_batch(
|
||
dois: set[str],
|
||
output_dir: Path,
|
||
depositor: str | None = None,
|
||
default_subject: str = "Medicine, Health and Life Sciences",
|
||
contact_mail: str | None = None,
|
||
upload: bool = False
|
||
) -> dict[str, list[Any]]:
|
||
"""
|
||
Process a batch of DOIs and return a summary of results.
|
||
|
||
Args:
|
||
dois (set[str]): Set of DOIs to process.
|
||
output_dir (Path): Directory where metadata files will be saved.
|
||
depositor (str | None): Depositor name.
|
||
default_subject (str): Default subject for metadata.
|
||
contact_mail (str | None): Contact email address.
|
||
upload (bool): Flag indicating whether to upload metadata to Dataverse.
|
||
|
||
Returns:
|
||
dict[str, list[Any]]: Dictionary with keys 'success' and 'failed'.
|
||
"""
|
||
results: dict[str, list[Any]] = {"success": [], "failed": []}
|
||
|
||
progress_columns = [
|
||
SpinnerColumn(),
|
||
TextColumn("[bold blue]{task.description:<50}"),
|
||
BarColumn(bar_width=None),
|
||
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
||
TextColumn("•"), # Separator
|
||
TimeElapsedColumn(),
|
||
TextColumn("•"), # Separator
|
||
TextColumn("[bold]{task.completed}/{task.total}"),
|
||
]
|
||
|
||
# Define steps for each DOI processing
|
||
if upload:
|
||
doi_total_steps = 4 # Fetch, Build, Upload, Save
|
||
else:
|
||
doi_total_steps = 3 # Fetch, Build, Save
|
||
|
||
with Progress(
|
||
*progress_columns,
|
||
console=console,
|
||
transient=True # This makes the progress bar disappear after completion
|
||
) as progress:
|
||
# Add main task
|
||
main_task = progress.add_task(
|
||
"[bold blue]Processing DOIs...",
|
||
total=len(dois)
|
||
)
|
||
|
||
# Add status task for current DOI
|
||
status_task = progress.add_task(
|
||
"[cyan]Current:",
|
||
total=None, # Indeterminate progress
|
||
visible=False # Hidden initially
|
||
)
|
||
|
||
status_task = progress.add_task(
|
||
"[cyan]Current:",
|
||
total=doi_total_steps,
|
||
visible=False
|
||
)
|
||
|
||
for doi in dois:
|
||
try:
|
||
# Update status display
|
||
progress.update(
|
||
status_task,
|
||
description=f"[cyan]Current: [white]{doi[:50]}...",
|
||
visible=True,
|
||
completed=0 # Reset progress for new DOI
|
||
)
|
||
|
||
# Process the DOI
|
||
sanitized_filename = sanitize_filename(normalize_doi(doi))
|
||
output_path = output_dir / f"{sanitized_filename}_metadata.json"
|
||
|
||
processor = MetadataProcessor(
|
||
doi=doi,
|
||
depositor=depositor,
|
||
output_path=output_path,
|
||
default_subject=default_subject,
|
||
contact_mail=contact_mail,
|
||
upload=upload,
|
||
console=console,
|
||
progress=progress,
|
||
task_id=status_task
|
||
)
|
||
|
||
# Process and capture result
|
||
processor.process()
|
||
results["success"].append(doi)
|
||
|
||
# Update progress
|
||
progress.advance(main_task)
|
||
|
||
except Exception as e:
|
||
# Handle errors
|
||
results["failed"].append((doi, str(e)))
|
||
|
||
# Show error but keep progress bar
|
||
progress.console.print(
|
||
f"{ICONS['error']} Error processing {doi}: {str(e)}",
|
||
style="error"
|
||
)
|
||
finally:
|
||
# Clear current status
|
||
progress.update(status_task, visible=False)
|
||
|
||
# Print final summary
|
||
print_summary(results, console)
|
||
|
||
return results
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
console = Console(theme=THEME)
|
||
|
||
try:
|
||
|
||
parser = argparse.ArgumentParser(description="Process DOIs to generate metadata")
|
||
parser.add_argument(
|
||
"dois",
|
||
nargs="*",
|
||
help="One or more DOIs to process"
|
||
)
|
||
parser.add_argument(
|
||
"-f", "--file",
|
||
help="File containing DOIs (one per line)",
|
||
type=argparse.FileType('r')
|
||
)
|
||
parser.add_argument(
|
||
"-o", "--output-dir",
|
||
help="Output directory for metadata files",
|
||
default="."
|
||
)
|
||
parser.add_argument(
|
||
"-d", "--depositor",
|
||
help="Name of the depositor",
|
||
default=None
|
||
)
|
||
parser.add_argument(
|
||
"-s", "--subject",
|
||
help="Default subject",
|
||
default="Medicine, Health and Life Sciences"
|
||
)
|
||
parser.add_argument(
|
||
"-m", "--contact-mail",
|
||
help="Contact email address",
|
||
default=False
|
||
)
|
||
parser.add_argument(
|
||
"-u", "--upload",
|
||
help="Upload to Dataverse",
|
||
action='store_true'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Ensure we have either DOIs as arguments or a file
|
||
if not args.dois and not args.file:
|
||
console.print(f"{ICONS['error']} Error: No DOIs provided. Use either command line arguments or -f/--file option.", style="error")
|
||
parser.print_help()
|
||
sys.exit(1)
|
||
|
||
# Get DOIs from both direct arguments and file if provided
|
||
dois = set(args.dois) # Start with directly provided DOIs
|
||
if args.file:
|
||
console.print(f"{ICONS['file']} Reading DOIs from file: {args.file.name}", style="info")
|
||
dois.update(line.strip() for line in args.file if line.strip())
|
||
|
||
# Create output directory if it doesn't exist
|
||
output_dir = Path(args.output_dir)
|
||
try:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
console.print(f"{ICONS['folder']} Output directory: {output_dir}\n", style="info")
|
||
except Exception as e:
|
||
console.print(f"Failed to create output directory: {str(e)}\n", style="error")
|
||
sys.exit(1)
|
||
|
||
if args.contact_mail:
|
||
if not validate_email_address(args.contact_mail):
|
||
raise ValueError(f"Not a valid email address: {args.contact_mail}")
|
||
console.print(f"{ICONS['info']} Exposing contact email <{args.contact_mail}> to API services.\n", style="info")
|
||
|
||
# Process DOIs and track time
|
||
start_time = datetime.now()
|
||
results = process_doi_batch(
|
||
dois=dois,
|
||
output_dir=output_dir,
|
||
depositor=args.depositor,
|
||
default_subject=args.subject,
|
||
contact_mail=args.contact_mail,
|
||
upload=args.upload
|
||
)
|
||
|
||
|
||
|
||
except KeyboardInterrupt:
|
||
console.print(f"\n{ICONS['warning']} Processing interrupted by user", style="warning")
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
console.print(f"\n{ICONS['error']} An unexpected error occurred: {str(e)}", style="error")
|
||
sys.exit(1)
|