Source code for omidb.parser

import datetime
import dataclasses
import re
import pathlib
import json
from typing import List, Dict, Optional, Iterator, Any, Sequence, Union
from loguru import logger
from . import utilities
from .client import Client, Site
from .episode import Episode
from . import episode
from .study import Study
from .series import Series
from .image import Image
from .mark import (
    BenignClassification,
    Conspicuity,
    MassClassification,
    BoundingBox,
    Mark,
)

from .events import (
    Event,
    Events,
    BreastScreeningData,
    Screening,
    BaseEvent,
    Opinion,
    SideOpinion,
)


[docs]class DB: """ OMI-DB parser :param root_dir: Root directory of the OMI-DB. The `data` and `images` directories should sit somewhere below the root, and can be nested under sub-directories, e.g. ``data/sample/omi-db/(data|images)`` is fine. :param ignore_missing_images: If ``True``, the existence of dicom images belonging to a series will not be checked: parsing is based entirely on the JSON representations of the DICOM headers. Set to ``False`` to parse only those images for which you have *both* JSON and DICOM files for. :param clients: Only parse these clients, if they exist :param exclude_clients: Exclude these clients, even if they are in ``clients`` :param distinct_event_study_links: Only match events to imaging studies when distinct (1 to 1 mapping) """ def __init__( self, root_dir: Union[str, pathlib.Path], ignore_missing_images: bool = True, clients: Optional[Sequence[str]] = None, exclude_clients: Optional[Sequence[str]] = None, distinct_event_study_links: bool = True, ): root_dir = pathlib.Path(root_dir) self.ignore_missing_images = ignore_missing_images self.distinct_event_study_links = distinct_event_study_links # The parent of the DATA/IMAGES directory self.real_root_dir = pathlib.Path() self._data_dir = pathlib.Path() self._image_dir = pathlib.Path() data_found = False for sub_dir in root_dir.glob("**/"): if sub_dir.name in ("data", "DATA"): self.real_root_dir = sub_dir.parent self._data_dir = sub_dir data_found = True if sub_dir.name in ("images", "IMAGES"): self._image_dir = sub_dir if not data_found: raise FileNotFoundError( f"Failed to parse the given directory. Does {root_dir} contain " "the omidb 'data' directory?" ) if not clients: clients = [] for client_path in self._data_dir.glob("*"): if client_path.is_dir() and client_path.name[:4] in ("demd", "optm"): clients.append(client_path.name) self.clients = set(clients) if exclude_clients: self.clients = set(clients) - set(exclude_clients) def __iter__(self) -> Iterator[Client]: """ Iterates over all parsable clients found in the OMI-DB directory. :return: client_it: A :class:`omidb.client.Client` iterator """ for client in self.clients: _data_dir = self._data_dir / client studies: List[str] = [] for study in _data_dir.glob("*/**"): # Extract study ID from path if study.is_dir(): match = re.match(r"\d+.", study.name) if match: studies.append(study.name) yield self._parse_client(client, studies) def _parse_client(self, client: str, studies: List[str]) -> Client: try: episodes = self._episodes(client, studies) site: Site = utilities.enum_lookup( # type: ignore self._imagedb(client)["Site"], Site ) return Client(id=client, episodes=episodes, site=site) except Exception as e: # noqa logger.exception(f"omidb: Failed to parse {client}, ") raise e def _nbss_path(self, client_id: str) -> pathlib.Path: """Path of the NBSS json file corresponding to the client with ID `client_id` """ p1 = self._data_dir / client_id / ("nbss_" + client_id + ".json") if not p1.exists(): p1 = self._data_dir / client_id / ("NBSS_" + client_id + ".json") return p1 def _nbss(self, client_id: str) -> Dict[str, Any]: """NBSS data corresponding to the client with ID `client_id`""" with open(self._nbss_path(client_id)) as f: nbss: Dict[str, Any] = json.load(f) return nbss def _imagedb(self, client_id: str) -> Dict[str, Any]: """IMAGEDB data corresponding to the client with ID `client_id`""" p1 = self._data_dir / client_id / ("imagedb_" + client_id + ".json") if not p1.exists(): p1 = self._data_dir / client_id / ("IMAGEDB_" + client_id + ".json") with open(p1) as f: imagedb: Dict[str, Any] = json.load(f) return imagedb def _has_studies(self, client_id: str) -> bool: imagedb = self._imagedb(client_id) if ("STUDIES" not in imagedb) or (not isinstance(imagedb["STUDIES"], dict)): logger.error(f"No studies listed in IMAGEDB for client {client_id}") return False return True def _parse_mark(self, mark_data: Dict[str, Any]) -> Mark: args: Dict[str, Any] = {} for param_name, key in zip( ( "architectural_distortion", "dystrophic_calcification", "fat_necrosis", "focal_asymmetry", "mass", "suspicious_calcifications", "milk_of_calcium", "other_benign_cluster", "plasma_cell_mastitis", "benign_skin_feature", "calcifications", "suture_calcification", "vascular_feature", ), ( "ArchitecturalDistortion", "Dystrophic", "FatNecrosis", "FocalAsymmetry", "Mass", "SuspiciousCalcifications", "MilkOfCalcium", "OtherBenignCluster", "PlasmaCellMastitis", "Skin", "WithCalcification", "SutureCalcification", "Vascular", ), ): args[param_name] = True if mark_data.get(key) else None args["benign_classification"] = utilities.enum_lookup( str(mark_data.get("BenignClassification")), BenignClassification ) args["conspicuity"] = utilities.enum_lookup( str(mark_data.get("Conspicuity")), Conspicuity ) args["mass_classification"] = utilities.enum_lookup( str(mark_data.get("MassClassification")), MassClassification ) args["lesion_id"] = str(mark_data["LinkedNBSSLesionNumber"]) args["id"] = str(mark_data["MarkID"]) args["boundingBox"] = BoundingBox( x1=int(mark_data["X1"]), y1=int(mark_data["Y1"]), x2=int(mark_data["X2"]), y2=int(mark_data["Y2"]), ) return Mark(**args) def _parse_series( self, client_id: str, study: str, study_data: Dict[str, Any] ) -> List[Series]: series_list = [] for series, series_dic in study_data.items(): if not isinstance(series_dic, dict): continue image_list = list(series_dic.keys()) images = [] for image in image_list: dcm_path = self._image_dir / client_id / study / (image + ".dcm") json_path = self._data_dir / client_id / study / (image + ".json") # Due to inconsistency in file naming if not json_path.exists(): json_path = json_path.with_suffix(".dcm.json") if not json_path.exists(): logger.error( f"Image metadata {json_path} does not exist, skipping." ) continue # Skip this image if no dcm file if not self.ignore_missing_images: if not dcm_path.is_file(): logger.error(f"Image {dcm_path} of does not exist, skipping.") continue marks: List[Mark] = [] image_marks_data = series_dic.get(image) if isinstance(image_marks_data, dict): for mark_id, mark_data in image_marks_data.items(): try: marks.append(self._parse_mark(mark_data)) except Exception: logger.exception( f"Failed to parse mark data of {client_id} " f"{image}.dcm" ) continue images.append(Image(image, dcm_path, json_path, marks)) series_list.append(Series(id=series, images=images)) return series_list def _parse_events(self, episode_data: Dict[str, Any]) -> Events: event_kwargs = { "screening": episode_data.get("SCREENING", None), "assessment": episode_data.get("ASSESSMENT", None), "biopsy_wide": episode_data.get("BIOPSYWIDE", None), "biopsy_fine": episode_data.get("BIOPSYFINE", None), "clinical": episode_data.get("CLINICAL", None), "surgery": episode_data.get("SURGERY", None), } for key, value in event_kwargs.items(): if not value: continue # SCREENING must exist in order to parse OTHERSCREENING if key == "screening": event_kwargs["screening"] = [self._parse_screening_event(value)] other_screening = episode_data.get("OTHERSCREENING", None) if other_screening is not None: for screen_id, screen in other_screening.items(): event_kwargs["screening"].append( self._parse_screening_event(screen) ) else: event_kwargs[key] = self._parse_base_event(value) return Events(**event_kwargs) def _parse_base_event(self, data: Dict[str, Any]) -> BaseEvent: dates: List[datetime.date] = [] for i, side in enumerate(["L", "R"]): side_data = data.get(side) if not side_data: continue for lesion_id, lesion in side_data.items(): if lesion.get("DatePerformed"): date = utilities.str_to_date(lesion["DatePerformed"]) if date not in dates: dates.append(date) # Some events may not have lesions due to null ID # so add higher-level dateperformed to improve linking if data.get("dateperformed"): date = utilities.str_to_date(data["dateperformed"]) if date not in dates: dates.append(date) left_opinion: SideOpinion = utilities.enum_lookup( # type: ignore str(data.get("left_opinion")), SideOpinion ) right_opinion: SideOpinion = utilities.enum_lookup( # type: ignore str(data.get("right_opinion")), SideOpinion ) return BaseEvent( left_opinion=left_opinion, right_opinion=right_opinion, dates=dates ) def _parse_screening_event(self, data: Dict[str, Any]) -> Screening: breast_data: List[Optional[BreastScreeningData]] = [None, None] dates: List[datetime.date] = [] for i, side in enumerate(["L", "R"]): side_data = data.get(side) if not side_data: continue date: Optional[datetime.date] = None if side_data.get("DateTaken"): date = utilities.str_to_date(side_data["DateTaken"]) if date not in dates: dates.append(date) opinion: Optional[Opinion] = utilities.enum_lookup( # type: ignore side_data.get("Opinion"), Opinion ) breast_data[i] = BreastScreeningData( date=date, equipment_make_model=side_data.get("EquipmentMakeModel"), opinion=opinion, ) left_opinion: Optional[SideOpinion] = utilities.enum_lookup( # type: ignore str(data.get("left_opinion")), SideOpinion ) right_opinion: Optional[SideOpinion] = utilities.enum_lookup( # type: ignore str(data.get("right_opinion")), SideOpinion ) return Screening( left_opinion=left_opinion, right_opinion=right_opinion, left=breast_data[0], right=breast_data[1], dates=dates, ) def _episodes(self, client_id: str, studies: Optional[List[str]]) -> List[Episode]: """ Prepare and instantiate all objects required for this client and the associated studies. Clinical data will be extracted from NBSS even if there are no imaging studies listed in IMAGEDB. """ nbss = self._nbss(client_id) imagedb = self._imagedb(client_id) # Events for each episode events: Dict[str, Events] = {} for episode_id, nbss_episode in nbss.items(): if isinstance(nbss_episode, dict): events[episode_id] = self._parse_events(nbss_episode) studies = [] if studies is None else studies # Studies for each episode episode_studies: Dict[str, List[Study]] = {} if self._has_studies(client_id): for study_id, study_data in imagedb["STUDIES"].items(): if study_id not in studies: # These exist locally if not study_id: logger.error( f"Empty study listed in IMAGEDB for client " f"{client_id}...skipping" ) else: logger.error( f"{client_id}/{study_id} directory does not exist " "but listed in IMAGEDB...skipping" ) continue series_list = self._parse_series(client_id, study_id, study_data) study_date = ( utilities.str_to_date(study_data["StudyDate"]) if study_data.get("StudyDate") else None ) if study_data is None: logger.warning(f"{client_id}/{study_id} has no study date") # Try to extract episode ID by study-date <->event-date elif ( ("EpisodeID" not in study_data) or (not study_data["EpisodeID"]) or (study_data["EpisodeID"] not in nbss) ): logger.warning( f"Episode {study_data.get('EpisodeID')} not found in NBSS " f"for {client_id}/{study_id}, " f"attempting link via event dates (will replace episode ID)" ) for episode_id, episode_events in events.items(): for field in dataclasses.fields(episode_events): event = getattr(episode_events, field.name) if event is None: continue if isinstance(event, list): event_dates = [date for e in event for date in e.dates] else: event_dates = event.dates if event and (study_date in event_dates): study_data["EpisodeID"] = episode_id logger.info( f"Linked study {study_id} to episode " f"{episode_id}" ) break # If still not episode ID, skip if ("EpisodeID" not in study_data) or (not study_data["EpisodeID"]): logger.error( f"EpisodeID not found for {client_id}/{study_id}, skipping" ) continue """" if event and (study_date in event_dates): fallback to episode id """ matched_events: List[Event] = [] if study_data["EpisodeID"] in events: episode_events = events[study_data["EpisodeID"]] for field in dataclasses.fields(episode_events): event_list = getattr(episode_events, field.name) if not event_list: break if not isinstance(event_list, list): event_list = [event_list] for e in event_list: if study_date in e.dates: matched_events.append(getattr(Event, field.name)) if not matched_events: logger.warning( f"No events in episode " f"{study_data['EpisodeID']} match study date of " f"{client_id}/{study_id}" ) for field in dataclasses.fields(episode_events): event = getattr(episode_events, field.name) if event is None: continue matched_events.append(getattr(Event, field.name)) logger.warning( f"Linked {field.name} event to " f"{client_id}/{study_id} via episode " f"{study_data['EpisodeID']} only" ) if len(matched_events) > 1: logger.warning( f"{client_id}/{study_id}: Multiple events linked" ) if self.distinct_event_study_links: logger.info("Dropping matched events as not distinct") matched_events = [] else: logger.warning( f"Episode {study_data['EpisodeID']} has no events " "(episode not found in NBSS)" ) continue # Now add studies to the episode study = Study( id=study_id, series=series_list, date=study_date, event_type=matched_events, ) if study_data["EpisodeID"] not in episode_studies: episode_studies[study_data["EpisodeID"]] = [study] else: episode_studies[study_data["EpisodeID"]].append(study) # Complete episode out = [] for episode_id, episode_events in events.items(): this_episodes_studies = episode_studies.get(episode_id, None) if not this_episodes_studies: logger.warning(f"{episode_id} in NBSS but not in IMAGEDB") elif self.distinct_event_study_links: for idx, study1 in enumerate(this_episodes_studies): for study2 in this_episodes_studies[idx:]: if study1 == study2: continue # Two studies have the same date, drop any links if study1.date == study2.date: logger.warning( "Dropping events linked to " f"{client_id}/{study1.id} and " f"{client_id}/{study2.id} as same date " ) study1.event_type = [] study2.event_type = [] continue # Two studies have the different dates, but linked to same event if study1.event_type and study1.event_type == study2.event_type: event_list = getattr( episode_events, study1.event_type[0].name ) if not isinstance(event_list, list): event_list = [event_list] # Pool the dates if multiple screens... event_dates = [date for e in event_list for date in e.dates] if study1.date not in event_dates: logger.warning( "Dropping events linked to " f"{client_id}/{study1.id}" ) study1.event_type = [] if study2.date not in event_dates: logger.warning( "Dropping events linked to " f"{client_id}/{study2.id}" ) study2.event_type = [] nbss_episode = nbss[episode_id] is_closed = True if nbss_episode.get("EpisodeIsClosed") == "Y" else False ep_type: Optional[episode.Type] = utilities.enum_lookup( # type: ignore nbss_episode.get("EpisodeType"), episode.Type ) ep_action: Optional[episode.Action] = utilities.enum_lookup( # type: ignore nbss_episode.get("EpisodeAction"), episode.Action ) opened = ( utilities.str_to_date(nbss_episode["EpisodeOpenedDate"]) if nbss_episode.get("EpisodeOpenedDate") else None ) closed = ( utilities.str_to_date(nbss_episode["EpisodeClosedDate"]) if nbss_episode.get("EpisodeClosedDate") else None ) out.append( Episode( id=episode_id, events=episode_events, studies=this_episodes_studies, type=ep_type, action=ep_action, opened_date=opened, closed_date=closed, is_closed=is_closed, ) ) return out