doi2dataset/doi2dataset.py

1733 lines
58 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/env python
"""
doi2dataset.py
This script processes DOIs to generate metadata for Dataverse datasets.
It supports fetching data from OpenAlex and CrossRef, mapping metadata fields,
processing author and grant information, and optionally uploading the metadata
to a Dataverse instance.
Usage:
python doi2dataset.py [options] doi1 doi2 ...
Options:
-f, --file File containing DOIs (one per line)
-o, --output-dir Output directory for metadata files (default: current directory)
-d, --depositor Name of the depositor
-s, --subject Default subject (default: "Medicine, Health and Life Sciences")
-m, --contact-mail Contact email address
-u, --upload Upload metadata to Dataverse
"""
import argparse
import json
import sys
import unicodedata
import warnings # TODO: Remove once the warning is stripped from idutils
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from functools import reduce
from pathlib import Path
from typing import Any, Sequence
import dns.resolver
import requests
import yaml
from email_validator import EmailNotValidError, validate_email
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TaskID,
TextColumn,
TimeElapsedColumn,
)
from rich.table import Table
from rich.theme import Theme
# Idutils throws an unconditional warning about deprecation of relative imports.
# Since we are not using them, supress the warning to not confuse users
# TODO: Remove once the warning is stripped from idutils
warnings.filterwarnings("ignore", category=DeprecationWarning)
from idutils.normalizers import normalize_doi, normalize_orcid, normalize_pmid
from idutils.validators import is_doi
# Script version
VERSION = "1.0"
# Icon definitions for console output
ICONS = {
'success': "", # Simple checkmark
'error': "", # Simple X
'warning': "!", # Simple exclamation
'info': "", # Info symbol
'processing': "", # Three dots
'done': "", # Filled square
'file': "", # Document symbol
'folder': "", # Folder symbol
'clock': "", # Clock symbol
'search': "", # Search symbol
'data': "", # Three lines
'doi': "", # Link symbol
'total': "", # Sum symbol
'save': "", # Save/download arrow
'upload': "" # Upload arrow
}
# Theme configuration for Rich console output
THEME = Theme({
"info": "cyan",
"warning": "yellow",
"error": "red bold",
"success": "green",
})
# Available sources for metadata abstracts
SOURCES = ["openalex", "crossref", "none"]
def format_status(icon: str, message: str, style: str = "default") -> str:
"""
Format a status message with an icon and a given style.
Args:
icon (str): Key for the icon character from the ICONS dictionary.
message (str): The status message.
style (str): The style to apply (e.g., 'default', 'info', 'warning', 'error', 'success').
Returns:
str: The formatted status message.
"""
return f"[{style}]{ICONS[icon]} {message}[/{style}]"
class FieldType(Enum):
"""Enum representing different Dataverse field types."""
PRIMITIVE = "primitive"
COMPOUND = "compound"
VOCABULARY = "controlledVocabulary"
@dataclass
class Phase:
"""
Represents a project phase with a defined time span.
Attributes:
name (str): The name of the project phase.
start (int): The start year of the project phase.
end (int): The end year of the project phase.
"""
name: str
start: int
end: int
def check_year(self, year: int) -> bool:
"""
Checks whether a given year falls within the project's phase boundaries.
Args:
year (int): The year to check.
Returns:
bool: True if the year is within the phase boundaries, otherwise False.
"""
if self.start <= year <= self.end:
return True
return False
@dataclass
class BaseMetadataField[T]:
"""
Base class for Dataverse metadata fields.
This class defines a metadata field with a name, a value of type T, and
a flag indicating whether multiple values are allowed. It serves as
a template for specific metadata field implementations.
Attributes:
name (str): The name of the metadata field.
multiple (bool): Indicates whether multiple values are allowed.
value (T): The value stored in the field.
type (FieldType): The type of the field, automatically set based on T.
"""
name: str
multiple: bool
value: T
type: FieldType = field(init=False)
def __post_init__(self) -> None:
"""
After initialization, determine the field type by calling the _set_type method.
"""
self._set_type()
def _set_type(self) -> None:
"""
Set the `type` attribute based on the field's value.
This method must be implemented by subclasses.
Raises:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement the _set_type method.")
def to_dict(self) -> dict[str, Any]:
"""
Convert the metadata field to a dictionary representation.
Returns:
dict[str, Any]: Dictionary representation of the metadata field.
Raises:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement the to_dict method.")
@dataclass
class PrimitiveMetadataField(BaseMetadataField[str]):
"""
Metadata field representing a primitive type (e.g., string) for Dataverse.
"""
def _set_type(self) -> None:
self.type = FieldType.PRIMITIVE
def to_dict(self) -> dict[str, str | bool]:
"""
Convert the primitive metadata field to a dictionary representation.
Returns:
dict[str, str | bool]: Dictionary with field properties.
"""
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
}
@dataclass
class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]):
"""
Metadata field for controlled vocabulary values.
"""
def _set_type(self) -> None:
self.type = FieldType.VOCABULARY
def to_dict(self) -> dict[str, Any]:
"""
Convert the controlled vocabulary metadata field to a dictionary.
Returns:
dict[str, Any]: Dictionary representation.
"""
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
}
@dataclass
class CompoundMetadataField(
BaseMetadataField[Sequence[Sequence[PrimitiveMetadataField | ControlledVocabularyMetadataField]]]
):
"""
Metadata field representing compound types, composed of multiple subfields.
"""
def _set_type(self) -> None:
self.type = FieldType.COMPOUND
def to_dict(self) -> dict[str, Any]:
"""
Convert the compound metadata field to a dictionary representation.
Returns:
dict[str, Any]: Dictionary representation of the compound field.
"""
value_list: list[dict[str, Any]] = []
for outer_list in self.value:
field_dicts: list[dict[str, Any]] = []
for field_item in outer_list:
field_dicts.append({field_item.name: field_item.to_dict()})
value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts))
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": value_list
}
@dataclass
class Person:
"""
Represents a person (e.g., an author or a PI).
Attributes:
family_name (str): Family name of the person.
given_name (str): Given name of the person.
orcid (str): ORCID identifier (optional).
email (str): Email address (optional).
affiliation (str): Affiliation of the person (optional).
project (list[str]): List of associated projects.
"""
family_name: str
given_name: str
orcid: str = ""
email: str = ""
affiliation: str = ""
project: list[str] = field(default_factory=list)
def format_name(self) -> str:
"""
Format the name in 'Family, Given' order.
Returns:
str: Formatted name.
"""
return f"{self.family_name}, {self.given_name}"
def author_fields(self) -> list[PrimitiveMetadataField | ControlledVocabularyMetadataField]:
"""
Build metadata fields for an author.
Returns:
list: List of metadata fields representing the author.
"""
if self.orcid:
return [
PrimitiveMetadataField("authorName", False, self.format_name()),
PrimitiveMetadataField("authorAffiliation", False, self.affiliation),
ControlledVocabularyMetadataField("authorIdentifierScheme", False, "ORCID"),
PrimitiveMetadataField("authorIdentifier", False, self.orcid)
]
else:
return [
PrimitiveMetadataField("authorName", False, self.format_name()),
PrimitiveMetadataField("authorAffiliation", False, self.affiliation)
]
def dataset_contact_fields(self) -> list[PrimitiveMetadataField]:
"""
Build metadata fields for dataset contact information.
Returns:
list: List of metadata fields for the dataset contact.
"""
return [
PrimitiveMetadataField("datasetContactName", False, self.format_name()),
PrimitiveMetadataField("datasetContactAffiliation", False, self.affiliation),
PrimitiveMetadataField("datasetContactEmail", False, self.email)
]
@dataclass
class License:
"""
Represents a license with name, URI, and short identifier.
Attributes:
name (str): The full name of the license.
uri (str): The license URI.
short (str): The short identifier of the license.
"""
name: str
uri: str
short: str
@dataclass
class Abstract:
"""
Represents an abstract with its text and source.
Attributes:
text (str): The abstract text.
source (str): The source of the abstract ('crossref', 'openalex', or 'none').
"""
text: str
source: str
def __post_init__(self):
allowed_sources = ["crossref", "openalex", "none"]
if self.source not in allowed_sources:
raise ValueError(f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}.")
@dataclass
class ConfigData:
"""
Represents configuration data loaded from a YAML file.
Attributes:
dataverse (dict[str, str]): Dataverse-related configuration.
phase (dict[str, dict[str, int]]): Mapping of project phases.
pis (list[dict[str, Any]]): List of principal investigator configurations.
default_grants (list[dict[str, str]]): Default grant configurations.
"""
dataverse: dict[str, str]
phase: dict[str, dict[str, int]]
pis: list[dict[str, Any]]
default_grants: list[dict[str, str]]
class Config:
"""
Singleton class to handle configuration loading and retrieval.
"""
_instance: 'Config | None' = None
_config_data: ConfigData | None = None
def __new__(cls) -> 'Config':
"""
Create and return the singleton instance of Config.
Returns:
Config: The singleton instance.
"""
if cls._instance is None:
cls._instance = super(Config, cls).__new__(cls)
return cls._instance
@classmethod
def load_config(cls, config_path: str | Path | None = None) -> None:
"""
Load configuration from a YAML file.
Args:
config_path (str | Path | None): Path to the configuration file.
If None, the default config.yaml in the script directory is used.
Raises:
FileNotFoundError: If the configuration file does not exist.
ValueError: If any PI email address is invalid.
"""
if config_path is None:
config_path = Path(__file__).parent / "config.yaml"
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
# Validate PI email addresses
pis = config_data.get('pis', [])
for pi in pis:
if email := pi.get('email'):
if not validate_email_address(email):
raise ValueError(f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}")
cls._config_data = ConfigData(
dataverse=config_data.get('dataverse', {}),
phase=config_data.get('phase', {}),
pis=config_data.get('pis', []),
default_grants=config_data.get('default_grants', [])
)
@classmethod
def get_config(cls) -> ConfigData:
"""
Retrieve the loaded configuration data.
Returns:
ConfigData: The configuration data.
Raises:
RuntimeError: If the configuration could not be loaded.
"""
if cls._config_data is None:
cls.load_config()
if cls._config_data is None:
raise RuntimeError("Failed to load configuration")
return cls._config_data
@property
def PHASE(self) -> dict[str, dict[str, int]]:
"""
Get phase configuration.
Returns:
dict[str, dict[str, int]]: Mapping of phases.
"""
return self.get_config().phase
@property
def PIS(self) -> list[dict[str, Any]]:
"""
Get PI configurations.
Returns:
list[dict[str, Any]]: List of PI configurations.
"""
return self.get_config().pis
@property
def DEFAULT_GRANTS(self) -> list[dict[str, str]]:
"""
Get default grant configurations.
Returns:
list[dict[str, str]]: List of default grants.
"""
return self.get_config().default_grants
@property
def DATAVERSE(self) -> dict[str, str]:
"""
Get Dataverse configurations.
Returns:
dict[str, str]: Dataverse configuration.
"""
return self.get_config().dataverse
class APIClient:
"""
Client for making HTTP requests to external APIs.
Attributes:
session (requests.Session): The underlying requests session.
"""
def __init__(self, contact_mail: str | None = None, user_agent: str = f"UDE-Doi2Dataset/{VERSION}", token: str | None = None) -> None:
"""
Initialize the API client with optional contact mail, user agent, and token.
Args:
contact_mail (str | None): Contact email address.
user_agent (str): User agent string.
token (str | None): Optional API token.
"""
self.session = requests.Session()
self._set_headers(contact_mail, user_agent, token)
def _set_headers(self, contact_mail: str | None, user_agent: str, token: str | None) -> None:
"""
Set HTTP headers for the session based on contact email and token.
Args:
contact_mail (str | None): Contact email address.
user_agent (str): User agent string.
token (str | None): Optional API token.
"""
if contact_mail:
header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"}
else:
header = {"User-Agent": user_agent}
if token:
header["X-Dataverse-key"] = token
self.session.headers.update(header)
def make_request(self, url: str, method: str = "GET", **kwargs: Any) -> requests.Response | None:
"""
Make an HTTP request and return the response.
Args:
url (str): The URL to request.
method (str): HTTP method to use (default: GET).
**kwargs: Additional arguments for requests.request.
Returns:
requests.Response | None: The HTTP response, or None if the request failed.
"""
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"\n{ICONS['error']} Request failed: {str(e)}")
return None
class NameProcessor:
"""
Provides utility methods for processing names.
"""
@staticmethod
def normalize_string(s: str) -> str:
"""
Normalize a string using Unicode NFKD normalization and convert to ASCII.
Args:
s (str): The string to normalize.
Returns:
str: The normalized string.
"""
return unicodedata.normalize("NFKD", s.lower()).encode("ASCII", "ignore").decode("ASCII")
@staticmethod
def split_name(full_name: str) -> tuple[str, str]:
"""
Split a full name into given and family names.
Args:
full_name (str): The full name (e.g., "Doe, John" or "John Doe").
Returns:
tuple[str, str]: A tuple (given_name, family_name).
"""
if "," in full_name:
surname, given_name = full_name.split(",", 1)
return given_name.strip(), surname.strip()
parts = full_name.strip().split()
if len(parts) == 1:
return "", parts[0]
return " ".join(parts[:-1]), parts[-1]
class PIFinder:
"""
Finds principal investigators (PIs) among a list of Person objects.
"""
def __init__(self, pis: list[Person]) -> None:
"""
Initialize with a list of Person objects representing potential PIs.
Args:
pis (list[Person]): List of Person objects.
"""
self.pis = pis
def find_pi(self, family_name: str | None = None, orcid: str | None = None, given_name: str | None = None) -> Person | None:
"""
Find a PI by ORCID or name.
Args:
family_name (str | None): Family name.
orcid (str | None): ORCID identifier.
given_name (str | None): Given name.
Returns:
Person | None: The matched PI or None if not found.
"""
if orcid:
return self._find_by_orcid(normalize_orcid(orcid))
if family_name:
return self._find_by_name(family_name, given_name)
return None
def _find_by_orcid(self, orcid: str) -> Person | None:
"""
Find a PI by ORCID.
Args:
orcid (str): Normalized ORCID.
Returns:
Person | None: The matched PI or None.
"""
for person in self.pis:
if normalize_orcid(person.orcid) == orcid:
return person
return None
def _find_by_name(self, family_name: str, given_name: str | None) -> Person | None:
"""
Find a PI by family name (and optionally given name).
Args:
family_name (str): Family name.
given_name (str | None): Given name (optional).
Returns:
Person | None: The matched PI or None.
"""
matches: list[Person] = []
normalized_family_name = NameProcessor.normalize_string(family_name)
for person in self.pis:
if NameProcessor.normalize_string(person.family_name) == normalized_family_name:
matches.append(person)
if not matches:
return None
if given_name:
normalized_given_name = NameProcessor.normalize_string(given_name)
for match in matches:
if NameProcessor.normalize_string(match.given_name) == normalized_given_name:
return match
return None
if len(matches) == 1:
return matches[0]
raise ValueError("Multiple matches found for family name")
class LicenseProcessor:
"""
Processes license information from metadata.
"""
LICENSE_MAP = {
"cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"),
"cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"),
"cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"),
"cc-by-nc-sa": ("https://creativecommons.org/licenses/by-nc-sa/4.0/", "CC BY-NC-SA 4.0"),
"cc-by-nc-nd": ("https://creativecommons.org/licenses/by-nc-nd/4.0/", "CC BY-NC-ND 4.0"),
"cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"),
"cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"),
"pd": ("https://creativecommons.org/publicdomain/mark/1.0/", "Public Domain Mark 1.0"),
}
@classmethod
def process_license(cls, data: dict[str, Any]) -> License:
"""
Process and return license information based on input data.
Args:
data (dict[str, Any]): Input data containing license info.
Returns:
License: Processed license information.
"""
location = data.get("primary_location", {})
license_short = location.get("license", "")
if not license_short:
return License(name="", uri="", short="unknown")
base_license = license_short.split("/")[0].lower()
uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short))
return License(name=name, uri=uri, short=license_short)
class AbstractProcessor:
"""
Retrieves and processes abstracts from CrossRef and OpenAlex.
"""
def __init__(self, api_client: APIClient):
"""
Initialize with an APIClient instance.
Args:
api_client (APIClient): The API client to use for requests.
"""
self.api_client = api_client
def get_abstract(self, doi: str, data: dict[str, Any], license: License) -> Abstract:
"""
Get an abstract based on DOI and license permissions.
Args:
doi (str): The DOI.
data (dict[str, Any]): Data retrieved from an external source.
license (License): License information.
Returns:
Abstract: The abstract with its source.
"""
license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"}
if license.short in license_ok:
console.print(f"\n{ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.", style="info")
crossref_abstract = self._get_crossref_abstract(doi)
if crossref_abstract:
return Abstract(text=crossref_abstract, source="crossref")
else:
console.print(f"\n{ICONS['warning']} No abstract found in CrossRef!", style="warning")
else:
console.print(f"\n{ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!", style="info")
openalex_abstract = self._get_openalex_abstract(data)
if openalex_abstract:
return Abstract(text=openalex_abstract, source="openalex")
else:
console.print(f"\n{ICONS['warning']} No abstract found in OpenAlex!", style="warning")
console.print(f"\n{ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!", style="warning")
return Abstract(text="", source="none")
def _get_crossref_abstract(self, doi: str) -> str | None:
"""
Retrieve abstract from CrossRef API.
Args:
doi (str): The DOI.
Returns:
str | None: The abstract if found, otherwise None.
"""
url = f"https://api.crossref.org/works/{doi}"
response = self.api_client.make_request(url)
if response and response.status_code == 200:
abstract_raw = response.json().get("message", {}).get("abstract")
return self._clean_jats(abstract_raw)
return None
def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None:
"""
Retrieve abstract from OpenAlex data.
Args:
data (dict[str, Any]): Data from OpenAlex.
Returns:
str | None: The reconstructed abstract, or None if not available.
"""
inv_index = data.get("abstract_inverted_index")
if not inv_index:
return None
word_positions = [(word, pos) for word, positions in inv_index.items() for pos in positions]
sorted_words = sorted(word_positions, key=lambda x: x[1])
return " ".join(word for word, _ in sorted_words)
def _clean_jats(self, text: str | None) -> str:
"""
Clean JATS XML tags in the abstract and convert them to HTML tags.
Args:
text (str | None): The raw abstract text containing JATS tags.
Returns:
str: The cleaned abstract text.
"""
if not text:
return ""
replacements = {
"<jats:italic>": "<i>",
"</jats:italic>": "</i>",
"<jats:bold>": "<b>",
"</jats:bold>": "</b>",
"<jats:sup>": "<sup>",
"</jats:sup>": "</sup>",
"<jats:sub>": "<sub>",
"</jats:sub>": "</sub>",
"<jats:underline>": "<u>",
"</jats:underline>": "</u>",
"<jats:monospace>": "<code>",
"</jats:monospace>": "</code>",
"<jats:sc>": "<small>",
"</jats:sc>": "</small>",
"<jats:p>": "<p>",
"</jats:p>": "</p>",
"<jats:title>": "<h2>",
"</jats:title>": "</h2>",
'<jats:list list-type="bullet">': "<ul>",
"</jats:list>": "</ul>",
'<jats:list list-type="order">': "<ol>",
"</jats:list>": "</ol>",
"<jats:list-item>": "<li>",
"</jats:list-item>": "</li>",
"<jats:blockquote>": "<blockquote>",
"</jats:blockquote>": "</blockquote>",
}
for jats_tag, html_tag in replacements.items():
text = text.replace(jats_tag, html_tag)
return text
class SubjectMapper:
"""
Maps subject names from input data to controlled vocabulary.
"""
CONTROLLED_VOCAB = {
"Agricultural Sciences": "Agricultural Sciences",
"Arts and Humanities": "Arts and Humanities",
"Astronomy": "Astronomy and Astrophysics",
"Astrophysics": "Astronomy and Astrophysics",
"Business": "Business and Management",
"Management": "Business and Management",
"Chemistry": "Chemistry",
"Computer Science": "Computer and Information Science",
"Information Science": "Computer and Information Science",
"Earth Sciences": "Earth and Environmental Sciences",
"Environmental Sciences": "Earth and Environmental Sciences",
"Engineering": "Engineering",
"Law": "Law",
"Mathematics": "Mathematical Sciences",
"Medicine": "Medicine, Health and Life Sciences",
"Health Sciences": "Medicine, Health and Life Sciences",
"Life Sciences": "Medicine, Health and Life Sciences",
"Physics": "Physics",
"Social Sciences": "Social Sciences",
}
@classmethod
def get_subjects(cls, data: dict[str, Any], fallback_subject: str = "Other") -> list[str]:
"""
Extract and map subjects from input data.
Args:
data (dict[str, Any]): The input metadata.
fallback_subject (str): Fallback subject if none found.
Returns:
list[str]: List of mapped subject names.
"""
topics = data.get("topics", [])
subject_collection: list[Any] = []
for topic in topics:
for field_type in ["subfield", "field", "domain"]:
if field_name := topic.get(field_type, {}).get("display_name"):
subject_collection.append(field_name)
mapped_subjects = cls.map_subjects(subject_collection)
return mapped_subjects if mapped_subjects else [fallback_subject]
@classmethod
def map_subjects(cls, subjects: list[str]) -> list[str]:
"""
Map given subjects to valid controlled vocabulary terms.
Args:
subjects (list[str]): List of subjects.
Returns:
list[str]: List of valid subjects.
"""
valid_subjects: set[str] = set()
for subject in subjects:
if mapped_subject := cls.CONTROLLED_VOCAB.get(subject):
valid_subjects.add(mapped_subject)
return list(valid_subjects)
class CitationBuilder:
"""
Builds various citation-related metadata fields.
"""
def __init__(self, data: dict[str, Any], doi: str, pi_finder: PIFinder) -> None:
"""
Initialize the CitationBuilder with data, DOI, and a PIFinder.
Args:
data (dict[str, Any]): Metadata from an external source.
doi (str): The DOI.
pi_finder (PIFinder): Instance to find PI information.
"""
self.data = data
self.doi = doi
self.pi_finder = pi_finder
def build_other_ids(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for other identifiers (e.g., DOI, PMID).
Returns:
list[list[PrimitiveMetadataField]]: Nested list of identifier metadata fields.
"""
other_ids = [[
PrimitiveMetadataField("otherIdAgency", False, "doi"),
PrimitiveMetadataField("otherIdValue", False, self.doi)
]]
if pmid := self.data.get("ids", {}).get("pmid"):
try:
normalized_pmid = normalize_pmid(pmid)
other_ids.append([
PrimitiveMetadataField("otherIdAgency", False, "pmid"),
PrimitiveMetadataField("otherIdValue", False, normalized_pmid)
])
except ValueError:
pass
return other_ids
def build_grants(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for grants.
Returns:
list[list[PrimitiveMetadataField]]: Nested list of grant metadata fields.
"""
config = Config()
default_grants = config.DEFAULT_GRANTS
grants: list[list[PrimitiveMetadataField]] = []
for grant in default_grants:
grants.append([PrimitiveMetadataField("grantNumberAgency", False, grant["funder"]), PrimitiveMetadataField("grantNumberValue", False, grant["id"])])
for grant in self.data.get("grants", []):
grant_funder = grant.get("funder_display_name", {})
grant_id = grant.get("award_id", {})
if not grant_funder or not grant_id:
continue
grants.append([PrimitiveMetadataField("grantNumberAgency", False, grant_funder), PrimitiveMetadataField("grantNumberValue", False, grant_id)])
return grants
def build_authors(self) -> tuple[list[Person], list[Person]]:
"""
Build lists of authors and corresponding authors from the metadata.
Returns:
tuple: (authors, corresponding_authors)
"""
authors: list[Person] = []
corresponding_authors: list[Person] = []
for authorship in self.data.get("authorships", []):
author = authorship.get("author", {})
if not author:
continue
author_person = self._process_author(author, authorship)
authors.append(author_person)
if authorship.get("is_corresponding"):
corresponding_entry = self._process_corresponding_author(author_person, authorship)
if corresponding_entry:
corresponding_authors.append(corresponding_entry)
return authors, corresponding_authors
def _process_author(self, author: dict[str, Any], authorship: dict[str, Any]) -> Person:
"""
Process author data and return a Person instance.
Args:
author (dict[str, Any]): Author data.
authorship (dict[str, Any]): Authorship metadata.
Returns:
Person: Processed author.
"""
display_name = author.get("display_name", "")
given_name, family_name = NameProcessor.split_name(display_name)
person = Person(family_name, given_name)
if affiliations := authorship.get("affiliations"):
affiliation = affiliations[0].get("raw_affiliation_string", "").strip()
person.affiliation = affiliation
if orcid := author.get("orcid"):
person.orcid = normalize_orcid(orcid)
return person
def _process_corresponding_author(self, author: Person, authorship: dict[str, Any]) -> Person | None:
"""
Identify the corresponding author based on provided PI information.
Args:
author (Person): The author.
authorship (dict[str, Any]): Authorship metadata.
Returns:
Person | None: The corresponding author, or None if not found.
"""
pi = self.pi_finder.find_pi(
family_name=author.family_name,
given_name=author.given_name,
orcid=author.orcid
)
if not pi:
return None
return pi
def build_topics(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for topics based on a threshold score.
Returns:
list[list[PrimitiveMetadataField]]: Nested list of topic metadata fields.
"""
topics: list[list[PrimitiveMetadataField]] = []
for topic in self.data.get("topics", []):
if topic.get("score") >= 0.8:
if name := topic.get("display_name"):
topics.append([PrimitiveMetadataField("topicClassValue", False, name)])
return topics
def build_keywords(self) -> list[list[PrimitiveMetadataField]]:
"""
Build metadata fields for keywords from both regular keywords and MeSH terms.
Returns:
list[list[PrimitiveMetadataField]]: Nested list of keyword metadata fields.
"""
keywords: list[list[PrimitiveMetadataField]] = []
for keyword in self.data.get("keywords", []):
# Filter out possibly unrelated keywords (low score)
if keyword["score"] >= 0.5:
keyword_value_field = PrimitiveMetadataField("keywordValue", False, keyword["display_name"])
keywords.append([keyword_value_field])
mesh_base_url = "http://id.nlm.nih.gov/mesh"
for mesh in self.data.get("mesh", []):
url = f"{mesh_base_url}/{mesh['descriptor_ui']}"
if mesh["qualifier_ui"]:
url = f"{url}{mesh['qualifier_ui']}"
keyword_value_field = PrimitiveMetadataField("keywordValue", False, mesh["descriptor_name"])
keyword_term_uri_field = PrimitiveMetadataField("keywordTermURI", False, url)
keyword_vocabulary_field = PrimitiveMetadataField("keywordVocabulary", False, "MeSH")
keyword_vocabulary_uri_field = PrimitiveMetadataField("keywordVocabularyURI", False, mesh_base_url)
keywords.append([keyword_value_field, keyword_term_uri_field, keyword_vocabulary_field, keyword_vocabulary_uri_field])
return keywords
class MetadataProcessor:
"""
Processes metadata for a given DOI by fetching data from OpenAlex,
building metadata blocks, and optionally uploading the dataset.
"""
def __init__(
self,
doi: str,
depositor: str | None = None,
output_path: Path | None = None,
default_subject: str = "Other",
contact_mail: str | None = None,
upload: bool = False,
console: Console | None = None,
progress: Progress | None = None,
task_id: TaskID | None = None
) -> None:
"""
Initialize the MetadataProcessor with configuration and processing options.
Args:
doi (str): The DOI to process.
depositor (str | None): Depositor name.
output_path (Path | None): Path where metadata will be saved.
default_subject (str): Default subject.
contact_mail (str | None): Contact email address.
upload (bool): Whether to upload metadata.
console (Console | None): Rich console instance.
progress (Progress | None): Progress bar instance.
task_id (TaskID | None): Task ID for progress updates.
"""
self.console = console or Console()
try:
self.doi = self._validate_doi(doi)
except ValueError as e:
print(f"Error: {str(e)}")
raise
self.depositor = depositor
self.output_path = output_path
self.default_subject = default_subject
self.api_client = APIClient(contact_mail)
config = Config()
pi_objects = [Person(**pi) for pi in config.PIS]
self.pi_finder = PIFinder(pi_objects)
self.upload = upload
self.progress = progress
self.task_id = task_id
@staticmethod
def _validate_doi(doi: str) -> str:
"""
Validate and normalize a DOI.
Args:
doi (str): The DOI to validate.
Returns:
str: Normalized DOI.
Raises:
ValueError: If the DOI is invalid.
"""
if not is_doi(doi):
raise ValueError(f"Invalid DOI: {doi}")
return normalize_doi(doi)
def _update_progress(self) -> None:
"""
Advance the progress bar if enabled.
"""
if self.progress and self.task_id is not None:
self.progress.advance(self.task_id)
def process(self) -> dict[str, Any]:
"""
Process the DOI: fetch data, build metadata, optionally upload, and save output.
Returns:
dict[str, Any]: The constructed metadata dictionary.
"""
self.console.print(f"{ICONS['processing']} Processing DOI: {self.doi}", style="info")
data = self._fetch_data()
self._update_progress()
metadata = self._build_metadata(data)
self._update_progress()
if self.upload:
self._upload_data(metadata)
self._update_progress()
self._save_output(metadata)
self._update_progress()
self.console.print(f"\n{ICONS['success']} Successfully processed: {self.doi}\n", style="success")
return metadata
def _upload_data(self, metadata: dict[str, Any]) -> dict[str, Any]:
"""
Upload the metadata to Dataverse.
Args:
metadata (dict[str, Any]): The metadata to upload.
Returns:
dict[str, Any]: The response from the Dataverse API.
Raises:
ValueError: If the upload fails.
"""
config = Config()
token = config.DATAVERSE['api_token']
client = APIClient(token=token)
url = f"{config.DATAVERSE['url']}/api/dataverses/{config.DATAVERSE['dataverse']}/datasets?doNotValidate=true"
auth = (config.DATAVERSE['auth_user'], config.DATAVERSE['auth_password'])
response = client.make_request(url, method="POST", auth=auth, json=metadata)
if response is None or response.status_code != 201:
self.console.print(f"\n{ICONS['error']} Failed to upload to Dataverse: {url}", style="error")
raise ValueError(f"Failed to upload to Dataverse: {url}")
else:
perma = response.json().get("data", {}).get("persistentId", "")
self.console.print(f"{ICONS['upload']} Dataset uploaded to: {config.DATAVERSE['dataverse']} with ID {perma}", style="info")
return response.json()
def _fetch_data(self) -> dict[str, Any]:
"""
Fetch metadata from OpenAlex for the given DOI.
Returns:
dict[str, Any]: The fetched data.
Raises:
ValueError: If data fetching fails.
"""
url = f"https://api.openalex.org/works/https://doi.org/{self.doi}"
response = self.api_client.make_request(url)
if response is None or response.status_code != 200:
self.console.print(f"\n{ICONS['error']} Failed to fetch data for DOI: {self.doi}", style="error")
raise ValueError(f"Failed to fetch data for DOI: {self.doi}")
return response.json()
def _build_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Construct the complete metadata dictionary from fetched data.
Args:
data (dict[str, Any]): The data retrieved from OpenAlex.
Returns:
dict[str, Any]: The complete metadata dictionary.
"""
license_info = LicenseProcessor.process_license(data)
abstract_processor = AbstractProcessor(self.api_client)
abstract = abstract_processor.get_abstract(self.doi, data, license_info)
citation_builder = CitationBuilder(data, self.doi, self.pi_finder)
authors, corresponding_authors = citation_builder.build_authors()
author_fields: list[list[PrimitiveMetadataField | ControlledVocabularyMetadataField]] = []
corresponding_author_fields: list[list[PrimitiveMetadataField]] = []
for author in authors:
author_fields.append(author.author_fields())
if not corresponding_authors:
self.console.print(f"{ICONS['warning']} No corresponding authors explicitly declared; PIs are used as a fallback!", style="warning")
pis = self._get_involved_pis(data)
corresponding_authors: list[Person]
for pi in pis:
corresponding_authors.append(pi)
for corresponding_author in corresponding_authors:
corresponding_author_fields.append(corresponding_author.dataset_contact_fields())
description = self._build_description(data, abstract)
grants = citation_builder.build_grants()
return_dict: dict[str, Any] = {
"datasetVersion": {
"metadataBlocks": {
"citation": {
"fields": [
PrimitiveMetadataField("title", False, data.get("title", "")).to_dict(),
PrimitiveMetadataField("distributionDate", False, data.get("publication_date", "")).to_dict(),
CompoundMetadataField("otherId", True, citation_builder.build_other_ids()).to_dict(),
CompoundMetadataField("dsDescription", True, [[PrimitiveMetadataField("dsDescriptionValue", False, description)]]).to_dict(),
ControlledVocabularyMetadataField("subject", True, SubjectMapper.get_subjects(data, self.default_subject)).to_dict(),
CompoundMetadataField("topicClassification", True, citation_builder.build_topics()).to_dict(),
CompoundMetadataField("keyword", True, citation_builder.build_keywords()).to_dict(),
PrimitiveMetadataField("depositor", False, self.depositor or data["primary_location"]["source"].get("display_name", "")).to_dict(),
PrimitiveMetadataField("alternativeURL", False, f"https://doi.org/{self.doi}").to_dict(),
CompoundMetadataField("author", True, author_fields).to_dict(),
CompoundMetadataField("datasetContact", True, corresponding_author_fields).to_dict(),
CompoundMetadataField("grantNumber", True, grants).to_dict()
],
"displayName": "Citation Metadata"
},
"crc1430_org_v1": self._build_organization_metadata(data)
},
"files": []
}
}
if license_info.name:
return_dict["datasetVersion"]["license"] = {
"name": license_info.name,
"uri": license_info.uri
},
else:
return_dict["datasetVersion"]["termsOfUse"] = f"All rights reserved. Copyright © {self._get_publication_year(data)}, [TODO: Insert copyright holder here!]"
return return_dict
def _build_description(self, data: dict[str, Any], abstract: Abstract) -> str:
"""
Build the description field by combining a header and the abstract.
Args:
data (dict[str, Any]): The metadata.
abstract (Abstract): The abstract object.
Returns:
str: The full description.
"""
head = self._build_description_head(data)
return f"{head}{abstract.text}"
def _build_description_head(self, data: dict[str, Any]) -> str:
"""
Build the header for the description based on publication details.
Args:
data (dict[str, Any]): The metadata.
Returns:
str: The HTML header string.
"""
journal = data.get("primary_location", {}).get("source", {}).get("display_name")
publication_date = data.get("publication_date")
volume = data.get("biblio", {}).get("volume")
issue = data.get("biblio", {}).get("issue")
type = data.get("type")
if all([journal, publication_date, volume, issue, type]):
return f"<p>This {type} was published on {publication_date} in <i>{journal}</i> {volume}({issue})</p>"
elif all([journal, publication_date, type]):
return f"<p>This {type} was published on {publication_date} in <i>{journal}</i></p>"
self.console.print(f"{ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)", style="warning")
return ""
def _get_publication_year(self, data: dict[str, Any]) -> str:
"""
Extract the publication year from the metadata.
Args:
data (dict[str, Any]): The metadata.
Returns:
str: The publication year.
"""
return data.get("publication_year", "")
def _build_organization_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Build organization metadata fields (phase, project, PI names).
Args:
data (dict[str, Any]): The metadata.
Returns:
dict[str, Any]: Organization metadata.
"""
publication_year = self._get_publication_year(data)
if publication_year:
phases = self._get_phases(int(publication_year))
else:
phases = []
pis = self._get_involved_pis(data)
projects: list[str] = []
for pi in pis:
for project in pi.project:
projects.append(project)
pi_names: list[str] = []
for pi in pis:
pi_names.append(pi.format_name())
# Deduplicate projects and PI names
unique_projects = list(set(projects))
unique_pi_names = list(set(pi_names))
return {
"fields": [
ControlledVocabularyMetadataField("crc1430OrgV1Phase", True, phases).to_dict(),
ControlledVocabularyMetadataField("crc1430OrgV1Project", True, unique_projects).to_dict(),
ControlledVocabularyMetadataField("crc1430OrgV1PI", True, unique_pi_names).to_dict()
]
}
def _get_phases(self, year: int) -> list[str]:
"""
Determine the project phases matching a given publication year.
Args:
year (int): The publication year.
Returns:
list[str]: List of matching phase names.
"""
config = Config()
matching_phases: list[str] = []
for phase_name, phase_info in config.PHASE.items():
phase = Phase(phase_name, phase_info["start"], phase_info["end"])
if phase.check_year(year):
matching_phases.append(phase.name)
return matching_phases
def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]:
"""
Identify involved principal investigators from the metadata.
Args:
data (dict[str, Any]): The metadata.
Returns:
list[Person]: List of PIs.
"""
involved_pis: list[Person] = []
for authorship in data.get("authorships", []):
author = authorship.get("author", {})
if not author:
continue
display_name = author.get("display_name", "")
given_name, family_name = NameProcessor.split_name(display_name)
if pi := self.pi_finder.find_pi(
family_name=family_name,
given_name=given_name,
orcid=author.get("orcid")
):
involved_pis.append(pi)
return involved_pis
def _save_output(self, metadata: dict[str, Any]) -> None:
"""
Save the generated metadata to a file or print it to the console.
Args:
metadata (dict[str, Any]): The metadata to save.
"""
if self.output_path:
try:
with open(self.output_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=4, ensure_ascii=False)
self.console.print(f"{ICONS['save']} Metadata saved in: {self.output_path}", style="info")
except Exception as e:
self.console.print(f"{ICONS['error']} Error saving metadata: {str(e)}\n", style="error")
raise
else:
self.console.print(metadata)
def sanitize_filename(doi: str) -> str:
"""
Convert DOI to a valid filename using only alphanumeric characters and underscores.
Args:
doi (str): The DOI to sanitize.
Returns:
str: Sanitized filename string.
"""
# Replace non-alphanumeric characters with underscores
sanitized = ''.join(c if c.isalnum() else '_' for c in doi)
# Remove consecutive underscores
while '__' in sanitized:
sanitized = sanitized.replace('__', '_')
# Remove leading/trailing underscores
return sanitized.strip('_')
def print_summary(results: dict[str, list[Any]], console: Console) -> None:
"""
Print a summary table of processing results to the console.
Args:
results (dict[str, list[Any]]): Dictionary containing success and failed DOIs.
console (Console): Rich console object for output.
"""
table = Table(title="Processing Results")
table.add_column("Status", style="bold")
table.add_column("Count", justify="right")
table.add_column("DOIs", style="dim")
table.add_row(
f"{ICONS['success']} Success",
str(len(results["success"])),
", ".join(results["success"][:3]) + ("..." if len(results["success"]) > 3 else "")
)
if results["failed"]:
table.add_row(
f"{ICONS['error']} Failed",
str(len(results["failed"])),
", ".join(doi for doi, _ in results["failed"][:3]) +
("..." if len(results["failed"]) > 3 else "")
)
console.print(Panel(table, title="Summary", border_style="blue"))
def validate_email_address(email: str):
"""
Validate an email address and ensure its domain has an MX record.
Args:
email (str): The email address to validate.
Returns:
bool: True if the email address is valid and its domain resolves, otherwise False.
"""
try:
# Basic validation
valid = validate_email(email)
email = valid.normalized
# Check domain has MX record
domain = email.split('@')[1]
dns.resolver.resolve(domain, 'MX')
return True
except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN):
return False
def process_doi_batch(
dois: set[str],
output_dir: Path,
depositor: str | None = None,
default_subject: str = "Medicine, Health and Life Sciences",
contact_mail: str | None = None,
upload: bool = False
) -> dict[str, list[Any]]:
"""
Process a batch of DOIs and return a summary of results.
Args:
dois (set[str]): Set of DOIs to process.
output_dir (Path): Directory where metadata files will be saved.
depositor (str | None): Depositor name.
default_subject (str): Default subject for metadata.
contact_mail (str | None): Contact email address.
upload (bool): Flag indicating whether to upload metadata to Dataverse.
Returns:
dict[str, list[Any]]: Dictionary with keys 'success' and 'failed'.
"""
results: dict[str, list[Any]] = {"success": [], "failed": []}
progress_columns = [
SpinnerColumn(),
TextColumn("[bold blue]{task.description:<50}"),
BarColumn(bar_width=None),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn(""), # Separator
TimeElapsedColumn(),
TextColumn(""), # Separator
TextColumn("[bold]{task.completed}/{task.total}"),
]
# Define steps for each DOI processing
if upload:
doi_total_steps = 4 # Fetch, Build, Upload, Save
else:
doi_total_steps = 3 # Fetch, Build, Save
with Progress(
*progress_columns,
console=console,
transient=True # This makes the progress bar disappear after completion
) as progress:
# Add main task
main_task = progress.add_task(
"[bold blue]Processing DOIs...",
total=len(dois)
)
# Add status task for current DOI
status_task = progress.add_task(
"[cyan]Current:",
total=None, # Indeterminate progress
visible=False # Hidden initially
)
status_task = progress.add_task(
"[cyan]Current:",
total=doi_total_steps,
visible=False
)
for doi in dois:
try:
# Update status display
progress.update(
status_task,
description=f"[cyan]Current: [white]{doi[:50]}...",
visible=True,
completed=0 # Reset progress for new DOI
)
# Process the DOI
sanitized_filename = sanitize_filename(normalize_doi(doi))
output_path = output_dir / f"{sanitized_filename}_metadata.json"
processor = MetadataProcessor(
doi=doi,
depositor=depositor,
output_path=output_path,
default_subject=default_subject,
contact_mail=contact_mail,
upload=upload,
console=console,
progress=progress,
task_id=status_task
)
# Process and capture result
processor.process()
results["success"].append(doi)
# Update progress
progress.advance(main_task)
except Exception as e:
# Handle errors
results["failed"].append((doi, str(e)))
# Show error but keep progress bar
progress.console.print(
f"{ICONS['error']} Error processing {doi}: {str(e)}",
style="error"
)
finally:
# Clear current status
progress.update(status_task, visible=False)
# Print final summary
print_summary(results, console)
return results
if __name__ == "__main__":
console = Console(theme=THEME)
try:
parser = argparse.ArgumentParser(description="Process DOIs to generate metadata")
parser.add_argument(
"dois",
nargs="*",
help="One or more DOIs to process"
)
parser.add_argument(
"-f", "--file",
help="File containing DOIs (one per line)",
type=argparse.FileType('r')
)
parser.add_argument(
"-o", "--output-dir",
help="Output directory for metadata files",
default="."
)
parser.add_argument(
"-d", "--depositor",
help="Name of the depositor",
default=None
)
parser.add_argument(
"-s", "--subject",
help="Default subject",
default="Medicine, Health and Life Sciences"
)
parser.add_argument(
"-m", "--contact-mail",
help="Contact email address",
default=False
)
parser.add_argument(
"-u", "--upload",
help="Upload to Dataverse",
action='store_true'
)
args = parser.parse_args()
# Ensure we have either DOIs as arguments or a file
if not args.dois and not args.file:
console.print(f"{ICONS['error']} Error: No DOIs provided. Use either command line arguments or -f/--file option.", style="error")
parser.print_help()
sys.exit(1)
# Get DOIs from both direct arguments and file if provided
dois = set(args.dois) # Start with directly provided DOIs
if args.file:
console.print(f"{ICONS['file']} Reading DOIs from file: {args.file.name}", style="info")
dois.update(line.strip() for line in args.file if line.strip())
# Create output directory if it doesn't exist
output_dir = Path(args.output_dir)
try:
output_dir.mkdir(parents=True, exist_ok=True)
console.print(f"{ICONS['folder']} Output directory: {output_dir}\n", style="info")
except Exception as e:
console.print(f"Failed to create output directory: {str(e)}\n", style="error")
sys.exit(1)
if args.contact_mail:
if not validate_email_address(args.contact_mail):
raise ValueError(f"Not a valid email address: {args.contact_mail}")
console.print(f"{ICONS['info']} Exposing contact email <{args.contact_mail}> to API services.\n", style="info")
# Process DOIs and track time
start_time = datetime.now()
results = process_doi_batch(
dois=dois,
output_dir=output_dir,
depositor=args.depositor,
default_subject=args.subject,
contact_mail=args.contact_mail,
upload=args.upload
)
except KeyboardInterrupt:
console.print(f"\n{ICONS['warning']} Processing interrupted by user", style="warning")
sys.exit(1)
except Exception as e:
console.print(f"\n{ICONS['error']} An unexpected error occurred: {str(e)}", style="error")
sys.exit(1)