#!/bin/env python
"""
doi2dataset.py
This script processes DOIs to generate metadata for Dataverse datasets.
It supports fetching data from OpenAlex and CrossRef, mapping metadata fields,
processing author and grant information, and optionally uploading the metadata
to a Dataverse instance.
Usage:
python doi2dataset.py [options] doi1 doi2 ...
Options:
-f, --file File containing DOIs (one per line)
-o, --output-dir Output directory for metadata files (default: current directory)
-d, --depositor Name of the depositor
-s, --subject Default subject (default: "Medicine, Health and Life Sciences")
-m, --contact-mail Contact email address
-u, --upload Upload metadata to Dataverse
"""
import argparse
import json
import sys
import unicodedata
import warnings # TODO: Remove once the warning is stripped from idutils
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from functools import reduce
from pathlib import Path
from typing import Any, Sequence
import dns.resolver
import requests
import yaml
from email_validator import EmailNotValidError, validate_email
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TaskID,
TextColumn,
TimeElapsedColumn,
)
from rich.table import Table
from rich.theme import Theme
# Idutils throws an unconditional warning about deprecation of relative imports.
# Since we are not using them, supress the warning to not confuse users
# TODO: Remove once the warning is stripped from idutils
warnings.filterwarnings("ignore", category=DeprecationWarning)
from idutils.normalizers import normalize_doi, normalize_orcid, normalize_pmid
from idutils.validators import is_doi
# Script version
VERSION = "1.0"
# Icon definitions for console output
ICONS = {
'success': "✓", # Simple checkmark
'error': "✗", # Simple X
'warning': "!", # Simple exclamation
'info': "ℹ", # Info symbol
'processing': "⋯", # Three dots
'done': "∎", # Filled square
'file': "⨳", # Document symbol
'folder': "⊞", # Folder symbol
'clock': "◷", # Clock symbol
'search': "⌕", # Search symbol
'data': "≡", # Three lines
'doi': "∾", # Link symbol
'total': "∑", # Sum symbol
'save': "⤓", # Save/download arrow
'upload': "⤒" # Upload arrow
}
# Theme configuration for Rich console output
THEME = Theme({
"info": "cyan",
"warning": "yellow",
"error": "red bold",
"success": "green",
})
# Available sources for metadata abstracts
SOURCES = ["openalex", "crossref", "none"]
def format_status(icon: str, message: str, style: str = "default") -> str:
"""
Format a status message with an icon and a given style.
Args:
icon (str): Key for the icon character from the ICONS dictionary.
message (str): The status message.
style (str): The style to apply (e.g., 'default', 'info', 'warning', 'error', 'success').
Returns:
str: The formatted status message.
"""
return f"[{style}]{ICONS[icon]} {message}[/{style}]"
class FieldType(Enum):
"""Enum representing different Dataverse field types."""
PRIMITIVE = "primitive"
COMPOUND = "compound"
VOCABULARY = "controlledVocabulary"
@dataclass
class Phase:
"""
Represents a project phase with a defined time span.
Attributes:
name (str): The name of the project phase.
start (int): The start year of the project phase.
end (int): The end year of the project phase.
"""
name: str
start: int
end: int
def check_year(self, year: int) -> bool:
"""
Checks whether a given year falls within the project's phase boundaries.
Args:
year (int): The year to check.
Returns:
bool: True if the year is within the phase boundaries, otherwise False.
"""
if self.start <= year <= self.end:
return True
return False
@dataclass
class BaseMetadataField[T]:
"""
Base class for Dataverse metadata fields.
This class defines a metadata field with a name, a value of type T, and
a flag indicating whether multiple values are allowed. It serves as
a template for specific metadata field implementations.
Attributes:
name (str): The name of the metadata field.
multiple (bool): Indicates whether multiple values are allowed.
value (T): The value stored in the field.
type (FieldType): The type of the field, automatically set based on T.
"""
name: str
multiple: bool
value: T
type: FieldType = field(init=False)
def __post_init__(self) -> None:
"""
After initialization, determine the field type by calling the _set_type method.
"""
self._set_type()
def _set_type(self) -> None:
"""
Set the `type` attribute based on the field's value.
This method must be implemented by subclasses.
Raises:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement the _set_type method.")
def to_dict(self) -> dict[str, Any]:
"""
Convert the metadata field to a dictionary representation.
Returns:
dict[str, Any]: Dictionary representation of the metadata field.
Raises:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement the to_dict method.")
@dataclass
class PrimitiveMetadataField(BaseMetadataField[str]):
"""
Metadata field representing a primitive type (e.g., string) for Dataverse.
"""
def _set_type(self) -> None:
self.type = FieldType.PRIMITIVE
def to_dict(self) -> dict[str, str | bool]:
"""
Convert the primitive metadata field to a dictionary representation.
Returns:
dict[str, str | bool]: Dictionary with field properties.
"""
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
}
@dataclass
class ControlledVocabularyMetadataField(BaseMetadataField[str | list[str]]):
"""
Metadata field for controlled vocabulary values.
"""
def _set_type(self) -> None:
self.type = FieldType.VOCABULARY
def to_dict(self) -> dict[str, Any]:
"""
Convert the controlled vocabulary metadata field to a dictionary.
Returns:
dict[str, Any]: Dictionary representation.
"""
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": self.value,
}
@dataclass
class CompoundMetadataField(
BaseMetadataField[Sequence[Sequence[PrimitiveMetadataField | ControlledVocabularyMetadataField]]]
):
"""
Metadata field representing compound types, composed of multiple subfields.
"""
def _set_type(self) -> None:
self.type = FieldType.COMPOUND
def to_dict(self) -> dict[str, Any]:
"""
Convert the compound metadata field to a dictionary representation.
Returns:
dict[str, Any]: Dictionary representation of the compound field.
"""
value_list: list[dict[str, Any]] = []
for outer_list in self.value:
field_dicts: list[dict[str, Any]] = []
for field_item in outer_list:
field_dicts.append({field_item.name: field_item.to_dict()})
value_list.append(reduce(lambda d1, d2: d1 | d2, field_dicts))
return {
"typeName": self.name,
"typeClass": self.type.value,
"multiple": self.multiple,
"value": value_list
}
@dataclass
class Person:
"""
Represents a person (e.g., an author or a PI).
Attributes:
family_name (str): Family name of the person.
given_name (str): Given name of the person.
orcid (str): ORCID identifier (optional).
email (str): Email address (optional).
affiliation (str): Affiliation of the person (optional).
project (list[str]): List of associated projects.
"""
family_name: str
given_name: str
orcid: str = ""
email: str = ""
affiliation: str = ""
project: list[str] = field(default_factory=list)
def format_name(self) -> str:
"""
Format the name in 'Family, Given' order.
Returns:
str: Formatted name.
"""
return f"{self.family_name}, {self.given_name}"
def author_fields(self) -> list[PrimitiveMetadataField | ControlledVocabularyMetadataField]:
"""
Build metadata fields for an author.
Returns:
list: List of metadata fields representing the author.
"""
if self.orcid:
return [
PrimitiveMetadataField("authorName", False, self.format_name()),
PrimitiveMetadataField("authorAffiliation", False, self.affiliation),
ControlledVocabularyMetadataField("authorIdentifierScheme", False, "ORCID"),
PrimitiveMetadataField("authorIdentifier", False, self.orcid)
]
else:
return [
PrimitiveMetadataField("authorName", False, self.format_name()),
PrimitiveMetadataField("authorAffiliation", False, self.affiliation)
]
def dataset_contact_fields(self) -> list[PrimitiveMetadataField]:
"""
Build metadata fields for dataset contact information.
Returns:
list: List of metadata fields for the dataset contact.
"""
return [
PrimitiveMetadataField("datasetContactName", False, self.format_name()),
PrimitiveMetadataField("datasetContactAffiliation", False, self.affiliation),
PrimitiveMetadataField("datasetContactEmail", False, self.email)
]
@dataclass
class License:
"""
Represents a license with name, URI, and short identifier.
Attributes:
name (str): The full name of the license.
uri (str): The license URI.
short (str): The short identifier of the license.
"""
name: str
uri: str
short: str
@dataclass
class Abstract:
"""
Represents an abstract with its text and source.
Attributes:
text (str): The abstract text.
source (str): The source of the abstract ('crossref', 'openalex', or 'none').
"""
text: str
source: str
def __post_init__(self):
allowed_sources = ["crossref", "openalex", "none"]
if self.source not in allowed_sources:
raise ValueError(f"{self.source} is not valid! Needs to be one of {str(allowed_sources)}.")
@dataclass
class ConfigData:
"""
Represents configuration data loaded from a YAML file.
Attributes:
dataverse (dict[str, str]): Dataverse-related configuration.
phase (dict[str, dict[str, int]]): Mapping of project phases.
pis (list[dict[str, Any]]): List of principal investigator configurations.
default_grants (list[dict[str, str]]): Default grant configurations.
"""
dataverse: dict[str, str]
phase: dict[str, dict[str, int]]
pis: list[dict[str, Any]]
default_grants: list[dict[str, str]]
class Config:
"""
Singleton class to handle configuration loading and retrieval.
"""
_instance: 'Config | None' = None
_config_data: ConfigData | None = None
def __new__(cls) -> 'Config':
"""
Create and return the singleton instance of Config.
Returns:
Config: The singleton instance.
"""
if cls._instance is None:
cls._instance = super(Config, cls).__new__(cls)
return cls._instance
@classmethod
def load_config(cls, config_path: str | Path | None = None) -> None:
"""
Load configuration from a YAML file.
Args:
config_path (str | Path | None): Path to the configuration file.
If None, the default config.yaml in the script directory is used.
Raises:
FileNotFoundError: If the configuration file does not exist.
ValueError: If any PI email address is invalid.
"""
if config_path is None:
config_path = Path(__file__).parent / "config.yaml"
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
# Validate PI email addresses
pis = config_data.get('pis', [])
for pi in pis:
if email := pi.get('email'):
if not validate_email_address(email):
raise ValueError(f"Configuration Error: Invalid email address for PI {pi.get('given_name', '')} {pi.get('family_name', '')}: {email}")
cls._config_data = ConfigData(
dataverse=config_data.get('dataverse', {}),
phase=config_data.get('phase', {}),
pis=config_data.get('pis', []),
default_grants=config_data.get('default_grants', [])
)
@classmethod
def get_config(cls) -> ConfigData:
"""
Retrieve the loaded configuration data.
Returns:
ConfigData: The configuration data.
Raises:
RuntimeError: If the configuration could not be loaded.
"""
if cls._config_data is None:
cls.load_config()
if cls._config_data is None:
raise RuntimeError("Failed to load configuration")
return cls._config_data
@property
def PHASE(self) -> dict[str, dict[str, int]]:
"""
Get phase configuration.
Returns:
dict[str, dict[str, int]]: Mapping of phases.
"""
return self.get_config().phase
@property
def PIS(self) -> list[dict[str, Any]]:
"""
Get PI configurations.
Returns:
list[dict[str, Any]]: List of PI configurations.
"""
return self.get_config().pis
@property
def DEFAULT_GRANTS(self) -> list[dict[str, str]]:
"""
Get default grant configurations.
Returns:
list[dict[str, str]]: List of default grants.
"""
return self.get_config().default_grants
@property
def DATAVERSE(self) -> dict[str, str]:
"""
Get Dataverse configurations.
Returns:
dict[str, str]: Dataverse configuration.
"""
return self.get_config().dataverse
class APIClient:
"""
Client for making HTTP requests to external APIs.
Attributes:
session (requests.Session): The underlying requests session.
"""
def __init__(self, contact_mail: str | None = None, user_agent: str = f"UDE-Doi2Dataset/{VERSION}", token: str | None = None) -> None:
"""
Initialize the API client with optional contact mail, user agent, and token.
Args:
contact_mail (str | None): Contact email address.
user_agent (str): User agent string.
token (str | None): Optional API token.
"""
self.session = requests.Session()
self._set_headers(contact_mail, user_agent, token)
def _set_headers(self, contact_mail: str | None, user_agent: str, token: str | None) -> None:
"""
Set HTTP headers for the session based on contact email and token.
Args:
contact_mail (str | None): Contact email address.
user_agent (str): User agent string.
token (str | None): Optional API token.
"""
if contact_mail:
header = {"User-Agent": f"{user_agent} (mailto:{contact_mail})"}
else:
header = {"User-Agent": user_agent}
if token:
header["X-Dataverse-key"] = token
self.session.headers.update(header)
def make_request(self, url: str, method: str = "GET", **kwargs: Any) -> requests.Response | None:
"""
Make an HTTP request and return the response.
Args:
url (str): The URL to request.
method (str): HTTP method to use (default: GET).
**kwargs: Additional arguments for requests.request.
Returns:
requests.Response | None: The HTTP response, or None if the request failed.
"""
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"\n{ICONS['error']} Request failed: {str(e)}")
return None
class NameProcessor:
"""
Provides utility methods for processing names.
"""
@staticmethod
def normalize_string(s: str) -> str:
"""
Normalize a string using Unicode NFKD normalization and convert to ASCII.
Args:
s (str): The string to normalize.
Returns:
str: The normalized string.
"""
return unicodedata.normalize("NFKD", s.lower()).encode("ASCII", "ignore").decode("ASCII")
@staticmethod
def split_name(full_name: str) -> tuple[str, str]:
"""
Split a full name into given and family names.
Args:
full_name (str): The full name (e.g., "Doe, John" or "John Doe").
Returns:
tuple[str, str]: A tuple (given_name, family_name).
"""
if "," in full_name:
surname, given_name = full_name.split(",", 1)
return given_name.strip(), surname.strip()
parts = full_name.strip().split()
if len(parts) == 1:
return "", parts[0]
return " ".join(parts[:-1]), parts[-1]
class PIFinder:
"""
Finds principal investigators (PIs) among a list of Person objects.
"""
def __init__(self, pis: list[Person]) -> None:
"""
Initialize with a list of Person objects representing potential PIs.
Args:
pis (list[Person]): List of Person objects.
"""
self.pis = pis
def find_pi(self, family_name: str | None = None, orcid: str | None = None, given_name: str | None = None) -> Person | None:
"""
Find a PI by ORCID or name.
Args:
family_name (str | None): Family name.
orcid (str | None): ORCID identifier.
given_name (str | None): Given name.
Returns:
Person | None: The matched PI or None if not found.
"""
if orcid:
return self._find_by_orcid(normalize_orcid(orcid))
if family_name:
return self._find_by_name(family_name, given_name)
return None
def _find_by_orcid(self, orcid: str) -> Person | None:
"""
Find a PI by ORCID.
Args:
orcid (str): Normalized ORCID.
Returns:
Person | None: The matched PI or None.
"""
for person in self.pis:
if normalize_orcid(person.orcid) == orcid:
return person
return None
def _find_by_name(self, family_name: str, given_name: str | None) -> Person | None:
"""
Find a PI by family name (and optionally given name).
Args:
family_name (str): Family name.
given_name (str | None): Given name (optional).
Returns:
Person | None: The matched PI or None.
"""
matches: list[Person] = []
normalized_family_name = NameProcessor.normalize_string(family_name)
for person in self.pis:
if NameProcessor.normalize_string(person.family_name) == normalized_family_name:
matches.append(person)
if not matches:
return None
if given_name:
normalized_given_name = NameProcessor.normalize_string(given_name)
for match in matches:
if NameProcessor.normalize_string(match.given_name) == normalized_given_name:
return match
return None
if len(matches) == 1:
return matches[0]
raise ValueError("Multiple matches found for family name")
class LicenseProcessor:
"""
Processes license information from metadata.
"""
LICENSE_MAP = {
"cc-by": ("https://creativecommons.org/licenses/by/4.0/", "CC BY 4.0"),
"cc-by-sa": ("https://creativecommons.org/licenses/by-sa/4.0/", "CC BY-SA 4.0"),
"cc-by-nc": ("https://creativecommons.org/licenses/by-nc/4.0/", "CC BY-NC 4.0"),
"cc-by-nc-sa": ("https://creativecommons.org/licenses/by-nc-sa/4.0/", "CC BY-NC-SA 4.0"),
"cc-by-nc-nd": ("https://creativecommons.org/licenses/by-nc-nd/4.0/", "CC BY-NC-ND 4.0"),
"cc-by-nd": ("https://creativecommons.org/licenses/by-nd/4.0/", "CC BY-ND 4.0"),
"cc0": ("https://creativecommons.org/publicdomain/zero/1.0/", "CC0 1.0"),
"pd": ("https://creativecommons.org/publicdomain/mark/1.0/", "Public Domain Mark 1.0"),
}
@classmethod
def process_license(cls, data: dict[str, Any]) -> License:
"""
Process and return license information based on input data.
Args:
data (dict[str, Any]): Input data containing license info.
Returns:
License: Processed license information.
"""
location = data.get("primary_location", {})
license_short = location.get("license", "")
if not license_short:
return License(name="", uri="", short="unknown")
base_license = license_short.split("/")[0].lower()
uri, name = cls.LICENSE_MAP.get(base_license, ("", license_short))
return License(name=name, uri=uri, short=license_short)
class AbstractProcessor:
"""
Retrieves and processes abstracts from CrossRef and OpenAlex.
"""
def __init__(self, api_client: APIClient):
"""
Initialize with an APIClient instance.
Args:
api_client (APIClient): The API client to use for requests.
"""
self.api_client = api_client
def get_abstract(self, doi: str, data: dict[str, Any], license: License) -> Abstract:
"""
Get an abstract based on DOI and license permissions.
Args:
doi (str): The DOI.
data (dict[str, Any]): Data retrieved from an external source.
license (License): License information.
Returns:
Abstract: The abstract with its source.
"""
license_ok = {"cc-by", "cc-by-sa", "cc-by-nc", "cc-by-nc-sa", "cc0", "pd"}
if license.short in license_ok:
console.print(f"\n{ICONS['info']} License {license.name} allows derivative works. Pulling abstract from CrossRef.", style="info")
crossref_abstract = self._get_crossref_abstract(doi)
if crossref_abstract:
return Abstract(text=crossref_abstract, source="crossref")
else:
console.print(f"\n{ICONS['warning']} No abstract found in CrossRef!", style="warning")
else:
console.print(f"\n{ICONS['info']} License {license.name} does not allow derivative works. Reconstructing abstract from OpenAlex!", style="info")
openalex_abstract = self._get_openalex_abstract(data)
if openalex_abstract:
return Abstract(text=openalex_abstract, source="openalex")
else:
console.print(f"\n{ICONS['warning']} No abstract found in OpenAlex!", style="warning")
console.print(f"\n{ICONS['warning']} No abstract found in either CrossRef nor OpenAlex!", style="warning")
return Abstract(text="", source="none")
def _get_crossref_abstract(self, doi: str) -> str | None:
"""
Retrieve abstract from CrossRef API.
Args:
doi (str): The DOI.
Returns:
str | None: The abstract if found, otherwise None.
"""
url = f"https://api.crossref.org/works/{doi}"
response = self.api_client.make_request(url)
if response and response.status_code == 200:
abstract_raw = response.json().get("message", {}).get("abstract")
return self._clean_jats(abstract_raw)
return None
def _get_openalex_abstract(self, data: dict[str, Any]) -> str | None:
"""
Retrieve abstract from OpenAlex data.
Args:
data (dict[str, Any]): Data from OpenAlex.
Returns:
str | None: The reconstructed abstract, or None if not available.
"""
inv_index = data.get("abstract_inverted_index")
if not inv_index:
return None
word_positions = [(word, pos) for word, positions in inv_index.items() for pos in positions]
sorted_words = sorted(word_positions, key=lambda x: x[1])
return " ".join(word for word, _ in sorted_words)
def _clean_jats(self, text: str | None) -> str:
"""
Clean JATS XML tags in the abstract and convert them to HTML tags.
Args:
text (str | None): The raw abstract text containing JATS tags.
Returns:
str: The cleaned abstract text.
"""
if not text:
return ""
replacements = {
" ",
"",
"
", "
This {type} was published on {publication_date} in {journal} {volume}({issue})
" elif all([journal, publication_date, type]): return f"This {type} was published on {publication_date} in {journal}
" self.console.print(f"{ICONS['warning']} No abstract header added, missing information (journal, publication date and/or document type)", style="warning") return "" def _get_publication_year(self, data: dict[str, Any]) -> str: """ Extract the publication year from the metadata. Args: data (dict[str, Any]): The metadata. Returns: str: The publication year. """ return data.get("publication_year", "") def _build_organization_metadata(self, data: dict[str, Any]) -> dict[str, Any]: """ Build organization metadata fields (phase, project, PI names). Args: data (dict[str, Any]): The metadata. Returns: dict[str, Any]: Organization metadata. """ publication_year = self._get_publication_year(data) if publication_year: phases = self._get_phases(int(publication_year)) else: phases = [] pis = self._get_involved_pis(data) projects: list[str] = [] for pi in pis: for project in pi.project: projects.append(project) pi_names: list[str] = [] for pi in pis: pi_names.append(pi.format_name()) # Deduplicate projects and PI names unique_projects = list(set(projects)) unique_pi_names = list(set(pi_names)) return { "fields": [ ControlledVocabularyMetadataField("crc1430OrgV1Phase", True, phases).to_dict(), ControlledVocabularyMetadataField("crc1430OrgV1Project", True, unique_projects).to_dict(), ControlledVocabularyMetadataField("crc1430OrgV1PI", True, unique_pi_names).to_dict() ] } def _get_phases(self, year: int) -> list[str]: """ Determine the project phases matching a given publication year. Args: year (int): The publication year. Returns: list[str]: List of matching phase names. """ config = Config() matching_phases: list[str] = [] for phase_name, phase_info in config.PHASE.items(): phase = Phase(phase_name, phase_info["start"], phase_info["end"]) if phase.check_year(year): matching_phases.append(phase.name) return matching_phases def _get_involved_pis(self, data: dict[str, Any]) -> list[Person]: """ Identify involved principal investigators from the metadata. Args: data (dict[str, Any]): The metadata. Returns: list[Person]: List of PIs. """ involved_pis: list[Person] = [] for authorship in data.get("authorships", []): author = authorship.get("author", {}) if not author: continue display_name = author.get("display_name", "") given_name, family_name = NameProcessor.split_name(display_name) if pi := self.pi_finder.find_pi( family_name=family_name, given_name=given_name, orcid=author.get("orcid") ): involved_pis.append(pi) return involved_pis def _save_output(self, metadata: dict[str, Any]) -> None: """ Save the generated metadata to a file or print it to the console. Args: metadata (dict[str, Any]): The metadata to save. """ if self.output_path: try: with open(self.output_path, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=4, ensure_ascii=False) self.console.print(f"{ICONS['save']} Metadata saved in: {self.output_path}", style="info") except Exception as e: self.console.print(f"{ICONS['error']} Error saving metadata: {str(e)}\n", style="error") raise else: self.console.print(metadata) def sanitize_filename(doi: str) -> str: """ Convert DOI to a valid filename using only alphanumeric characters and underscores. Args: doi (str): The DOI to sanitize. Returns: str: Sanitized filename string. """ # Replace non-alphanumeric characters with underscores sanitized = ''.join(c if c.isalnum() else '_' for c in doi) # Remove consecutive underscores while '__' in sanitized: sanitized = sanitized.replace('__', '_') # Remove leading/trailing underscores return sanitized.strip('_') def print_summary(results: dict[str, list[Any]], console: Console) -> None: """ Print a summary table of processing results to the console. Args: results (dict[str, list[Any]]): Dictionary containing success and failed DOIs. console (Console): Rich console object for output. """ table = Table(title="Processing Results") table.add_column("Status", style="bold") table.add_column("Count", justify="right") table.add_column("DOIs", style="dim") table.add_row( f"{ICONS['success']} Success", str(len(results["success"])), ", ".join(results["success"][:3]) + ("..." if len(results["success"]) > 3 else "") ) if results["failed"]: table.add_row( f"{ICONS['error']} Failed", str(len(results["failed"])), ", ".join(doi for doi, _ in results["failed"][:3]) + ("..." if len(results["failed"]) > 3 else "") ) console.print(Panel(table, title="Summary", border_style="blue")) def validate_email_address(email: str): """ Validate an email address and ensure its domain has an MX record. Args: email (str): The email address to validate. Returns: bool: True if the email address is valid and its domain resolves, otherwise False. """ try: # Basic validation valid = validate_email(email) email = valid.normalized # Check domain has MX record domain = email.split('@')[1] dns.resolver.resolve(domain, 'MX') return True except (EmailNotValidError, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): return False def process_doi_batch( dois: set[str], output_dir: Path, depositor: str | None = None, default_subject: str = "Medicine, Health and Life Sciences", contact_mail: str | None = None, upload: bool = False ) -> dict[str, list[Any]]: """ Process a batch of DOIs and return a summary of results. Args: dois (set[str]): Set of DOIs to process. output_dir (Path): Directory where metadata files will be saved. depositor (str | None): Depositor name. default_subject (str): Default subject for metadata. contact_mail (str | None): Contact email address. upload (bool): Flag indicating whether to upload metadata to Dataverse. Returns: dict[str, list[Any]]: Dictionary with keys 'success' and 'failed'. """ results: dict[str, list[Any]] = {"success": [], "failed": []} progress_columns = [ SpinnerColumn(), TextColumn("[bold blue]{task.description:<50}"), BarColumn(bar_width=None), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TextColumn("•"), # Separator TimeElapsedColumn(), TextColumn("•"), # Separator TextColumn("[bold]{task.completed}/{task.total}"), ] # Define steps for each DOI processing if upload: doi_total_steps = 4 # Fetch, Build, Upload, Save else: doi_total_steps = 3 # Fetch, Build, Save with Progress( *progress_columns, console=console, transient=True # This makes the progress bar disappear after completion ) as progress: # Add main task main_task = progress.add_task( "[bold blue]Processing DOIs...", total=len(dois) ) # Add status task for current DOI status_task = progress.add_task( "[cyan]Current:", total=None, # Indeterminate progress visible=False # Hidden initially ) status_task = progress.add_task( "[cyan]Current:", total=doi_total_steps, visible=False ) for doi in dois: try: # Update status display progress.update( status_task, description=f"[cyan]Current: [white]{doi[:50]}...", visible=True, completed=0 # Reset progress for new DOI ) # Process the DOI sanitized_filename = sanitize_filename(normalize_doi(doi)) output_path = output_dir / f"{sanitized_filename}_metadata.json" processor = MetadataProcessor( doi=doi, depositor=depositor, output_path=output_path, default_subject=default_subject, contact_mail=contact_mail, upload=upload, console=console, progress=progress, task_id=status_task ) # Process and capture result processor.process() results["success"].append(doi) # Update progress progress.advance(main_task) except Exception as e: # Handle errors results["failed"].append((doi, str(e))) # Show error but keep progress bar progress.console.print( f"{ICONS['error']} Error processing {doi}: {str(e)}", style="error" ) finally: # Clear current status progress.update(status_task, visible=False) # Print final summary print_summary(results, console) return results if __name__ == "__main__": console = Console(theme=THEME) try: parser = argparse.ArgumentParser(description="Process DOIs to generate metadata") parser.add_argument( "dois", nargs="*", help="One or more DOIs to process" ) parser.add_argument( "-f", "--file", help="File containing DOIs (one per line)", type=argparse.FileType('r') ) parser.add_argument( "-o", "--output-dir", help="Output directory for metadata files", default="." ) parser.add_argument( "-d", "--depositor", help="Name of the depositor", default=None ) parser.add_argument( "-s", "--subject", help="Default subject", default="Medicine, Health and Life Sciences" ) parser.add_argument( "-m", "--contact-mail", help="Contact email address", default=False ) parser.add_argument( "-u", "--upload", help="Upload to Dataverse", action='store_true' ) args = parser.parse_args() # Ensure we have either DOIs as arguments or a file if not args.dois and not args.file: console.print(f"{ICONS['error']} Error: No DOIs provided. Use either command line arguments or -f/--file option.", style="error") parser.print_help() sys.exit(1) # Get DOIs from both direct arguments and file if provided dois = set(args.dois) # Start with directly provided DOIs if args.file: console.print(f"{ICONS['file']} Reading DOIs from file: {args.file.name}", style="info") dois.update(line.strip() for line in args.file if line.strip()) # Create output directory if it doesn't exist output_dir = Path(args.output_dir) try: output_dir.mkdir(parents=True, exist_ok=True) console.print(f"{ICONS['folder']} Output directory: {output_dir}\n", style="info") except Exception as e: console.print(f"Failed to create output directory: {str(e)}\n", style="error") sys.exit(1) if args.contact_mail: if not validate_email_address(args.contact_mail): raise ValueError(f"Not a valid email address: {args.contact_mail}") console.print(f"{ICONS['info']} Exposing contact email <{args.contact_mail}> to API services.\n", style="info") # Process DOIs and track time start_time = datetime.now() results = process_doi_batch( dois=dois, output_dir=output_dir, depositor=args.depositor, default_subject=args.subject, contact_mail=args.contact_mail, upload=args.upload ) except KeyboardInterrupt: console.print(f"\n{ICONS['warning']} Processing interrupted by user", style="warning") sys.exit(1) except Exception as e: console.print(f"\n{ICONS['error']} An unexpected error occurred: {str(e)}", style="error") sys.exit(1)