nixview-python/file_handler.py

162 lines
5.3 KiB
Python

import os
import nixio as nix
from enum import Enum
class ItemDescriptor():
def __init__(self, name=None, id=None, type=None, value=None, definition=None, entity_type=None) -> None:
super().__init__()
self.name = name
self.type = type
self.id = id
self.definition = definition
self.value = value
self.entity_type = entity_type
class NodeType(Enum):
Root = "root"
Section = "section"
Block = "block"
DataArray = "data_array"
DataFrame = "data_frame"
Property = "property"
Dimension = "dimension"
Source = "source"
Tag = "tag"
MultiTag = "multi_tag"
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class FileHandler(metaclass=Singleton):
def __init__(self) -> None:
super().__init__()
self._filename = None
self._nix_file = None
self._file_requests = []
def open(self, filename):
"""[summary]
Args:
filename ([type]): [description]
"""
self.close()
if not os.path.exists(filename):
return False, "File %s could not be found!" % filename
try:
self._nix_file = nix.File.open(filename, nix.FileMode.ReadOnly)
self._filename = filename
return True, "Successfully opened file %s." % filename.split(os.sep)[-1]
except RuntimeError as e:
return False, "Failed to open file %s! \n Error message is: %s" % (filename, e)
except OSError as e:
return False, "Failed to open file %s! \n Error message is: %s\n Probably no nix file?!" % (filename, e)
def close(self):
# TODO check if there are any pending file requests!
if self._nix_file is not None and self._nix_file.is_open():
self._nix_file.close()
self._nix_file = None
self._file_requests = []
def file_descriptor(self):
return ItemDescriptor()
@property
def is_valid(self):
return self._nix_file is not None and self._nix_file.is_open()
@property
def filename(self):
return self._filename
def request_metadata(self, root_id=None, depth=1):
"""[summary]
Args:
root_id ([type], optional): [description]. Defaults to None.
depth (int, optional): [description]. Defaults to 1.
"""
def get_subsections(section):
sub_sections = []
for s in section.sections:
sub_sections.append(ItemDescriptor(s.name, s.id, s.type, definition=s.definition, entity_type="Section"))
return sub_sections
def get_properties(section):
props = []
for p in section.props:
value = "unset"
props.append(ItemDescriptor(p.name, p.id, value=value, entity_type="Property"))
return props
sections = []
properties = []
if root_id is None:
sections = get_subsections(self._nix_file)
else:
found_section = self._nix_file.find_sections(lambda s: s.id == root_id)
for fs in found_section:
sections.extend(get_subsections(fs))
properties.extend(get_properties(fs))
return sections, properties
def _entity_info(self, entities, entity_type):
infos = []
for e in entities:
itd = ItemDescriptor(e.name, e.id, e.type, definition=e.definition, entity_type=entity_type)
infos.append(itd)
return infos
def request_blocks(self):
return self._entity_info(self._nix_file.blocks, "Block")
def request_data_arrays(self, block_id):
return self._entity_info(self._nix_file.blocks[block_id].data_arrays, "DataArray")
def request_tags(self, block_id):
tags = self._entity_info(self._nix_file.blocks[block_id].tags, "Tag")
tags.extend(self._entity_info(self._nix_file.blocks[block_id].multi_tags), "MultiTag")
return tags
def request_references(self, block_id, tag_id, is_mtag):
b = self._nix_file.blocks[block_id]
t = None
if is_mtag:
t = b.multi_tags[tag_id]
else:
t = b.tags[tag_id]
return self._entity_info(t.references, "DataArray")
def request_features(self, block_id, tag_id, is_mtag):
b = self._nix_file.blocks[block_id]
t = None
if is_mtag:
t = b.multi_tags[tag_id]
else:
t = b.tags[tag_id]
feats = []
for f in t.features:
itd = ItemDescriptor(f.data.name, f.id, f.type, definition=f.data.definition, entity_type="Feature")
feats.append(itd)
return feats
def request_dimensions(self, block_id, array_id):
dimensions = []
for i, d in enumerate(self._nix_file.blocks[block_id].data_arrays[array_id].dimensions):
dim_name = "%i. dim: %s" % (i+1, d.label if hasattr(d, "label") else "")
dim_type= "%s %s" % (d.dimension_type, "dimension")
dimensions.append(ItemDescriptor(dim_name, type=dim_type, entity_type="Dimension"))
return dimensions