Source code for rocrate_validator.rocrate

# Copyright (c) 2024-2025 CRS4
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import io
import json
import struct
import zipfile
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional, Union
from urllib.parse import unquote

from rdflib import Graph

from rocrate_validator import log as logging
from rocrate_validator.errors import ROCrateInvalidURIError
from rocrate_validator.utils import URI, HttpRequester, validate_rocrate_uri

# set up logging
logger = logging.getLogger(__name__)


class ROCrateEntity:

    def __init__(self, metadata: ROCrateMetadata, raw_data: object) -> None:
        self._raw_data = raw_data
        self._metadata = metadata

    @property
    def id(self) -> str:
        return self._raw_data.get('@id')

    @property
    def type(self) -> Union[str, list[str]]:
        return self._raw_data.get('@type')

    @property
    def name(self) -> str:
        return self._raw_data.get('name')

    @property
    def metadata(self) -> ROCrateMetadata:
        return self._metadata

    @property
    def ro_crate(self) -> ROCrate:
        return self.metadata.ro_crate

    def is_remote(self) -> bool:
        return self.id_as_uri.is_remote_resource()

    @classmethod
    def get_id_as_path(cls, entity_id: str, ro_crate: Optional[ROCrate] = None) -> Path:
        return cls.get_path_from_identifier(entity_id, ro_crate.uri.as_path() if ro_crate else None)

    @staticmethod
    def get_path_from_identifier(identifier: str, rocrate_path: Optional[Union[str, Path]] = None) -> Path:
        """
        Get the path from an identifier.


        :param identifier: the identifier of the entity
        :type identifier: str

        :param rocrate_path: the path to the RO-Crate
        :type rocrate_path: Optional[Union[str, Path]

        :return: the path to the entity
        :rtype: Path

        """
        def __define_path__(path: str, decode: bool = False) -> Path:
            # ensure the path is a string and remove the file:// prefix
            path = str(path).replace('file://', '')
            # Decode the path if required
            if decode:
                path = unquote(path)
            # Convert the path to a Path object
            path = Path(path)
            # if the path is absolute, return it
            if path.is_absolute():
                return path
            try:
                # set the base path
                base_path = rocrate_path
                if base_path is None:
                    base_path = Path('./')
                elif not isinstance(base_path, Path):
                    base_path = Path(base_path)
                # Check if the path if the root of the RO-Crate
                if path == Path('./'):
                    return base_path
                # if the path is relative, try to resolve it
                return base_path / path.relative_to(base_path)
            except ValueError:
                # if the path cannot be resolved, return the absolute path
                return base_path / path
        # Define the path based on the identifier
        path = __define_path__(identifier)
        if not path.exists():
            path = __define_path__(identifier, decode=True)
        return path

    @property
    def id_as_path(self) -> Path:
        return self.get_id_as_path(self.id, self.ro_crate)

    @classmethod
    def get_id_as_uri(cls, entity_id: str, ro_crate: ROCrate) -> URI:
        assert entity_id, "Entity ID cannot be None"
        if entity_id.startswith("http"):
            return URI(entity_id)
        return URI(cls.get_id_as_path(entity_id, ro_crate))

    @property
    def id_as_uri(self) -> URI:
        return self.get_id_as_uri(self.id, self.ro_crate)

    def has_absolute_path(self) -> bool:
        return self.get_id_as_path(self.id).is_absolute()

    def has_relative_path(self) -> bool:
        return not self.has_absolute_path()

    def has_local_identifier(self) -> bool:
        has_local_id = self.id.startswith('#') or \
            f"{self.ro_crate.uri}/#" in self.id or \
            f"file://{self.ro_crate.uri}/#" in self.id
        logger.debug("Identifier '%s' is %s a local identifier", self.id,
                     "" if has_local_id else " not")
        return has_local_id

    def has_type(self, entity_type: str) -> bool:
        assert isinstance(entity_type, str), "Entity type must be a string"
        e_types = self.type if isinstance(self.type, list) else [self.type]
        return entity_type in e_types

    def has_types(self, entity_types: list[str], all_types: bool = False) -> bool:
        """
        Check if the entity has any or all of the specified types.
        """
        assert isinstance(entity_types, list), "Entity types must be a list"
        e_types = self.type if isinstance(self.type, list) else [self.type]
        if all_types:
            return all([t in e_types for t in entity_types])
        return any([t in e_types for t in entity_types])

    def __process_property__(self, name: str, data: object) -> object:
        if isinstance(data, dict) and '@id' in data:
            entity = self.metadata.get_entity(data['@id'])
            if entity is None:
                return ROCrateEntity(self, data)
            return entity
        return data

    def get_property(self, name: str, default=None) -> Union[str, ROCrateEntity]:
        data = self._raw_data.get(name, default)
        if data is None:
            return None
        if isinstance(data, list):
            return [self.__process_property__(name, _) for _ in data]
        return self.__process_property__(name, data)

    @property
    def raw_data(self) -> object:
        return self._raw_data

    def is_local(self) -> bool:
        return not self.is_remote()

    def is_available(self) -> bool:
        try:
            # check if the entity points to an external file
            if self.id.startswith("http"):
                return ROCrate.get_external_file_size(self.id) > 0

            # check if the entity is part of the local RO-Crate
            if self.ro_crate.uri.is_local_resource():
                # check if the file exists in the local file system
                if isinstance(self.ro_crate, ROCrateLocalFolder):
                    logger.debug("Checking the availability of a local entity in a local folder")
                    return self.ro_crate.has_file(self.id_as_path) \
                        or self.ro_crate.has_directory(self.id_as_path)
                # check if the file exists in the local zip file
                if isinstance(self.ro_crate, ROCrateLocalZip):
                    logger.debug("Checking the availability of a local entity in a local zip file")
                    # Skip the check for the root of a ZIP archive
                    if self.id == "./":
                        logger.debug("Skipping the check for the presence of the Data Entity '%s' within the RO-Crate "
                                     "as it is the root of a ZIP archive", self.id)
                        return True
                    return self.ro_crate.get_entry(str(self.id)) is not None

            # check if the entity is part of the remote RO-Crate
            if self.ro_crate.uri.is_remote_resource():
                return self.ro_crate.get_file_size(Path(self.id)) > 0
        except Exception as e:
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(e)
            return False

        raise ROCrateInvalidURIError(uri=self.id, message="Could not determine the availability of the entity")

    def get_size(self) -> int:
        try:
            return self.metadata.ro_crate.get_file_size(Path(self.id))
        except Exception as e:
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(e)
            return 0

    def __str__(self) -> str:
        return f"Entity({self.id})"

    def __repr__(self) -> str:
        return str(self)

    def __eq__(self, other: ROCrateEntity) -> bool:
        if not isinstance(other, ROCrateEntity):
            return False
        return self.id == other.id


class ROCrateMetadata:

    METADATA_FILE_DESCRIPTOR = 'ro-crate-metadata.json'

    def __init__(self, ro_crate: ROCrate) -> None:
        self._ro_crate = ro_crate
        self._dict = None
        self._json: str = None

    @property
    def ro_crate(self) -> ROCrate:
        return self._ro_crate

    @property
    def size(self) -> int:
        try:
            return len(self.as_json())
        except Exception as e:
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(e)
            return 0

    def get_file_descriptor_entity(self) -> ROCrateEntity:
        metadata_file_descriptor = self.get_entity(self.METADATA_FILE_DESCRIPTOR)
        if not metadata_file_descriptor:
            raise ValueError("no metadata file descriptor in crate")
        return metadata_file_descriptor

    def get_root_data_entity(self) -> ROCrateEntity:
        metadata_file_descriptor = self.get_file_descriptor_entity()
        main_entity = metadata_file_descriptor.get_property('about')
        if not main_entity:
            raise ValueError("no main entity in metadata file descriptor")
        return main_entity

    def get_root_data_entity_conforms_to(self) -> Optional[list[str]]:
        try:
            root_data_entity = self.get_root_data_entity()
            result = root_data_entity.get_property('conformsTo', [])
            if result is None:
                return None
            if not isinstance(result, list):
                result = [result]
            return [_.id for _ in result]
        except Exception as e:
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(e)
            return None

    def get_main_workflow(self) -> ROCrateEntity:
        root_data_entity = self.get_root_data_entity()
        main_workflow = root_data_entity.get_property('mainEntity')
        if not main_workflow:
            raise ValueError("no main workflow in metadata file descriptor")
        return main_workflow

    def get_entity(self, entity_id: str) -> ROCrateEntity:
        for entity in self.as_dict().get('@graph', []):
            if entity.get('@id') == entity_id:
                return ROCrateEntity(self, entity)
        return None

    def get_entities(self) -> list[ROCrateEntity]:
        entities = []
        for entity in self.as_dict().get('@graph', []):
            entities.append(ROCrateEntity(self, entity))
        return entities

    def get_entities_by_type(self, entity_type: Union[str, list[str]]) -> list[ROCrateEntity]:
        entities = []
        for e in self.get_entities():
            if e.has_types(entity_type):
                entities.append(e)
        return entities

    def get_dataset_entities(self) -> list[ROCrateEntity]:
        return self.get_entities_by_type('Dataset')

    def get_file_entities(self) -> list[ROCrateEntity]:
        return self.get_entities_by_type('File')

    def get_data_entities(self, exclude_web_data_entities: bool = False) -> list[ROCrateEntity]:
        if not exclude_web_data_entities:
            return self.get_entities_by_type(['Dataset', 'File'])
        return [e for e in self.get_entities_by_type(['Dataset', 'File'])
                if not e.is_remote()]

    def get_web_data_entities(self) -> list[ROCrateEntity]:
        entities = []
        for entity in self.get_entities():
            if entity.has_type('File') or entity.has_type('Dataset'):
                if entity.is_remote():
                    entities.append(entity)
        return entities

    def get_conforms_to(self) -> Optional[list[str]]:
        try:
            file_descriptor = self.get_file_descriptor_entity()
            result = file_descriptor.get_property('conformsTo', [])
            if result is None:
                return None
            if not isinstance(result, list):
                result = [result]
            return [_.id for _ in result]
        except Exception as e:
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(e)
            return None

    def as_json(self) -> str:
        if not self._json:
            self._json = self.ro_crate.get_file_content(
                Path(self.METADATA_FILE_DESCRIPTOR), binary_mode=False)
        return self._json

    def as_dict(self) -> dict:
        if not self._dict:
            # if the dictionary is not cached, load it
            self._dict = json.loads(self.as_json())
        return self._dict

    def as_graph(self, publicID: str = None) -> Graph:
        if not self._graph:
            # if the graph is not cached, load it
            self._graph = Graph(base=publicID or self.ro_crate.uri)
            self._graph.parse(data=self.as_json, format='json-ld')
        return self._graph

    def __str__(self) -> str:
        return f"Metadata({self.ro_crate})"

    def __repr__(self) -> str:
        return str(self)

    def __eq__(self, other: ROCrateMetadata) -> bool:
        if not isinstance(other, ROCrateMetadata):
            return False
        return self.ro_crate == other.ro_crate


[docs] class ROCrate(ABC): """ Base class for representing and interacting with a Research Object Crate (RO-Crate). """ def __init__(self, uri: Union[str, Path, URI]): """ Initialize the RO-Crate. :param uri: the URI of the RO-Crate :type uri: Union[str, Path, URI] :raises ROCrateInvalidURIError: if the URI is invalid """ # store the path to the crate self._uri = URI(uri) # cache the list of files self._files = None # initialize variables to cache the data self._dict: dict = None self._graph = None self._metadata = None @property def uri(self) -> URI: """ The URI of the RO-Crate. """ return self._uri @property def metadata(self) -> ROCrateMetadata: """ An ROCrateMetadata object representing the RO-Crate metadata. :return: the metadata object :rtype: ROCrateMetadata """ if not self._metadata: self._metadata = ROCrateMetadata(self) return self._metadata
[docs] @abstractmethod def size(self) -> int: """ The size of the RO-Crate. :return: the size of the RO-Crate :rtype: int """ pass
@property @abstractmethod def list_files(self) -> list[Path]: """ List all the files in the RO-Crate. :return: a list of file paths :rtype: list[Path] """ pass def __parse_path__(self, path: Path) -> Path: assert path, "Path cannot be None" return ROCrateEntity.get_path_from_identifier(str(path), rocrate_path=self.uri.as_path())
[docs] def has_descriptor(self) -> bool: """ Check if the RO-Crate has a metadata descriptor file. :return: `True` if the RO-Crate has a metadata descriptor file, `False` otherwise :rtype: bool """ return (self.uri.as_path().absolute() / self.metadata.METADATA_FILE_DESCRIPTOR).is_file()
[docs] def has_file(self, path: Path) -> bool: """ Check if the RO-Crate has a file. :param path: the path to the file :type path: Path :return: `True` if the RO-Crate has the file, `False` otherwise :rtype: bool """ try: return self.__parse_path__(path).is_file() except Exception as e: if logger.isEnabledFor(logging.DEBUG): logger.exception(e) return False
[docs] def has_directory(self, path: Path) -> bool: """ Check if the RO-Crate has a directory. :param path: the path to the directory :type path: Path :return: `True` if the RO-Crate has the directory, `False` otherwise :rtype: bool """ try: return self.__parse_path__(path).is_dir() except Exception as e: if logger.isEnabledFor(logging.DEBUG): logger.exception(e) return False
[docs] @abstractmethod def get_file_size(self, path: Path) -> int: """ Get the size of a file in the RO-Crate. :param path: the path to the file :type path: Path :return: the size of the file :rtype: int """ pass
[docs] @abstractmethod def get_file_content(self, path: Path, binary_mode: bool = True) -> Union[str, bytes]: """ Get the content of a file in the RO-Crate. :param path: the path to the file :type path: Path :param binary_mode: if `True`, return the file as a `bytes` object; otherwise, return it as a `str` :type binary_mode: bool :return: the content of the file :rtype: Union[str, bytes] """ pass
[docs] @staticmethod def get_external_file_content(uri: str, binary_mode: bool = True) -> Union[str, bytes]: """ Get the content of an external file. :param uri: the URI of the file :type uri: str :param binary_mode: if `True`, return the file as a `bytes` object; otherwise, return it as a `str` :type binary_mode: bool :return: the content of the file :rtype: Union[str, bytes] """ response = HttpRequester().get(str(uri)) response.raise_for_status() return response.content if binary_mode else response.text
[docs] @staticmethod def get_external_file_size(uri: str) -> int: """ Get the size of an external file. :param uri: the URI of the file :type uri: str :return: the size of the file :rtype: int :raises requests.HTTPError: if the request fails """ response = HttpRequester().head(str(uri)) response.raise_for_status() return int(response.headers.get('Content-Length'))
[docs] @staticmethod def new_instance(uri: Union[str, Path, URI]) -> 'ROCrate': """ Create a new instance of the RO-Crate based on the URI. :param uri: the URI of the RO-Crate :type uri: Union[str, Path, URI] :return: a new instance of the RO-Crate :rtype: ROCrate :raises ROCrateInvalidURIError: if the URI is invalid """ # check if the URI is valid validate_rocrate_uri(uri, silent=False) # create a new instance based on the URI if not isinstance(uri, URI): uri = URI(uri) # check if the URI is a local directory if uri.is_local_directory(): return ROCrateLocalFolder(uri) # check if the URI is a local zip file if uri.is_local_file(): return ROCrateLocalZip(uri) # check if the URI is a remote zip file if uri.is_remote_resource(): return ROCrateRemoteZip(uri) # if the URI is not supported, raise an error raise ROCrateInvalidURIError(uri=uri, message="Unsupported RO-Crate URI")
class ROCrateLocalFolder(ROCrate): def __init__(self, path: Union[str, Path, URI]): super().__init__(path) # cache the list of files self._files = None # check if the path is a directory if not self.has_directory(self.uri.as_path()): raise ROCrateInvalidURIError(uri=path) @property def size(self) -> int: return sum(f.stat().st_size for f in self.list_files() if f.is_file()) def list_files(self) -> list[Path]: if not self._files: self._files = [] base_path = self.uri.as_path() for file in base_path.rglob('*'): if file.is_file(): self._files.append(base_path / file) return self._files def get_file_size(self, path: Path) -> int: path = self.__parse_path__(path) if not self.has_file(path): raise FileNotFoundError(f"File not found: {path}") return path.stat().st_size def get_file_content(self, path: Path, binary_mode: bool = True) -> Union[str, bytes]: path = self.__parse_path__(path) if not self.has_file(path): raise FileNotFoundError(f"File not found: {path}") return path.read_bytes() if binary_mode else path.read_text() class ROCrateLocalZip(ROCrate): def __init__(self, path: Union[str, Path, URI], init_zip: bool = True): super().__init__(path) # initialize the zip reference self._zipref = None if init_zip: self.__init_zip_reference__() # cache the list of files self._files = None def __del__(self): if self._zipref and self._zipref.fp is not None: self._zipref.close() del self._zipref @property def size(self) -> int: return self.uri.as_path().stat().st_size def __init_zip_reference__(self): path = self.uri.as_path() # check if the path is a file if not self.uri.as_path().is_file(): raise ROCrateInvalidURIError(uri=path) # check if the file is a zip file if not self.uri.as_path().suffix == '.zip': raise ROCrateInvalidURIError(uri=path) self._zipref = zipfile.ZipFile(path) logger.debug("Initialized zip reference: %s", self._zipref) def __get_file_info__(self, path: Path) -> zipfile.ZipInfo: return self._zipref.getinfo(str(path)) def has_descriptor(self) -> bool: return ROCrateMetadata.METADATA_FILE_DESCRIPTOR in [str(_.name) for _ in self.list_files()] def has_file(self, path: Path) -> bool: if path in self.list_files(): info = self.__get_file_info__(path) return not info.is_dir() return False def has_directory(self, path: Path) -> bool: if path in self.list_files(): info = self.__get_file_info__(path) return info.is_dir() return False def list_files(self) -> list[Path]: if not self._files: self._files = [] for file in self._zipref.namelist(): self._files.append(Path(file)) return self._files def list_entries(self) -> list[zipfile.ZipInfo]: self._zipref.infolist() def get_entry(self, path: Path) -> zipfile.ZipInfo: """ Return the ZipInfo object for the specified path. """ return self.__get_file_info__(path) def get_file_size(self, path: Path) -> int: return self._zipref.getinfo(str(path)).file_size def get_file_content(self, path: Path, binary_mode: bool = True) -> Union[str, bytes]: if not self.has_file(path): raise FileNotFoundError(f"File not found: {path}") data = self._zipref.read(str(path)) return data if binary_mode else data.decode('utf-8') class ROCrateRemoteZip(ROCrateLocalZip): def __init__(self, path: Union[str, Path, URI]): super().__init__(path, init_zip=False) logger.debug("Size: %s", self.size) # # initialize the zip reference self.__init_zip_reference__() def __init_zip_reference__(self): url = str(self.uri) # check if the URI is available if not self.uri.is_available(): raise ROCrateInvalidURIError(uri=url, message="URI is not available") # Step 1: Fetch the last 22 bytes to find the EOCD record eocd_data = self.__fetch_range__(url, -22, '') # Step 2: Find the EOCD record eocd_offset = self.__find_eocd__(eocd_data) # Step 3: Fetch the EOCD and parse it eocd_full_data = self.__fetch_range__(url, -22 - eocd_offset, -1) central_directory_offset, central_directory_size = self.__parse_eocd__(eocd_full_data) # Step 4: Fetch the central directory central_directory_data = self.__fetch_range__(url, central_directory_offset, central_directory_offset + central_directory_size - 1) # Step 5: Parse the central directory and return the zip file self._zipref = zipfile.ZipFile(io.BytesIO(central_directory_data)) @property def size(self) -> int: response = HttpRequester().head(str(self.uri)) response.raise_for_status() # Check if the request was successful file_size = response.headers.get('Content-Length') if file_size is not None: return int(file_size) else: raise Exception("Could not determine the file size from the headers") @staticmethod def __fetch_range__(uri: str, start, end): headers = {'Range': f'bytes={start}-{end}'} response = HttpRequester().get(uri, headers=headers) response.raise_for_status() return response.content @staticmethod def __find_eocd__(data): eocd_signature = b'PK\x05\x06' eocd_offset = data.rfind(eocd_signature) if eocd_offset == -1: raise Exception("EOCD not found") return eocd_offset @staticmethod def __parse_eocd__(data): eocd_size = struct.calcsize('<4s4H2LH') eocd = struct.unpack('<4s4H2LH', data[-eocd_size:]) central_directory_size = eocd[5] central_directory_offset = eocd[6] return central_directory_offset, central_directory_size