nixview-python/nixview/file_handler.py

427 lines
17 KiB
Python

import os
import nixio as nix
import numpy as np
from enum import Enum
import datetime as dt
from numpy.core.defchararray import not_equal
from nixview.constants import io_chunksize as chunksize
class ItemDescriptor():
def __init__(self, name=None, id=None, type=None, value=None, definition=None, block_id=None, entity_type=None, shape=None, metadata=None, data_type=None, source_id=None, created_at=None, updated_at=None) -> None:
super().__init__()
self.name = name
self.type = type
self.id = id
self.block_id= block_id
self.definition = definition
self.value = value
self.entity_type = entity_type
self.data_type = data_type
self.shape = shape
self.metadata_id = metadata
self.source_id = source_id
self.created_at = None
self.updated_at = None
def to_html(self):
descr = "<html><h4>%s: %s</h4>" % (self.type, self.name)
descr += "<ol style='list-style-type:none'>"
descr += "<li><small><b>id:</b> %s</small></li>" % (self.id)
descr += "<li><small><b>entity type:</b> %s</small></li>" % (self.entity_type.value)
descr += "<li><small><b>data type:</b> %s</small></li>" % (str(self.data_type))
descr += "<li><small><b>data shape:</b> %s</small></li>" % (str(self.shape))
descr += "<hr>"
descr += "<p><small><b>definition:</b> %s</small></p>" % (self.definition)
descr += "<hr>"
descr += "<li><small><b>metadata id:</b> %s</small></li>" % (self.metadata_id)
descr += "<li><small><b>source id:</b> %s</small></li>" % (self.source_id)
descr += "<hr>"
descr += "<li><small><b>created at:</b> %s</small></li>" % (str(dt.datetime.fromtimestamp(self.created_at)) if self.created_at else "")
descr += "<li><small><b>updated at:</b> %s</small></li>" % (str(dt.datetime.fromtimestamp(self.updated_at)) if self.updated_at else "")
descr += "</ol>"
descr += "</html>"
return descr
class FileDescriptor():
def __init__(self, filename, format, version, created_at, updated_at, size) -> None:
super().__init__()
self.name = filename
self.size = None
self.format = format
self.version = version
self.created_at = created_at
self.updated_at = updated_at
self.size = size
self.block_count = 0
self.data_array_count = 0
self.tag_count = 0
self.group_count = 0
self.data_frame_count = 0
def toHtml(self):
def namAndPath(filename):
parts = filename.split(os.sep)
name = parts[-1]
path = ""
if len(parts) > 1:
path = os.sep.join(parts[:-1])
return name, path
name, path = namAndPath(self.name)
descr = "<html><h4>%s</h4>" % name
descr += "<ol style='list-style-type:none'>"
descr += "<li><small><b>location:</b> %s</small></li>" % (path if len(path) > 1 else ".")
descr += "<li><small><b>format:</b> %s</small></li>" % (self.format)
descr += "<li><small><b>nix format version:</b> %s</small></li>" % (str(self.version))
descr += "<li><small><b>file size:</b> %.2f MB</small></li>" % (self.size)
descr += "<hr>"
descr += "<li>File contents</li>"
descr += "<li><small><b>blocks:</b> %i</small></li>" % self.block_count
descr += "<li><small><b>groups:</b> %i</small></li>" % self.group_count
descr += "<li><small><b>data arrays:</b> %i</small></li>" % self.data_array_count
descr += "<li><small><b>data frames:</b> %i</small></li>" % self.data_frame_count
descr += "<li><small><b>tags:</b> %i</small></li>" % self.tag_count
descr += "<hr>"
descr += "<li><small><b>created at:</b> %s</small></li>" % (str(dt.datetime.fromtimestamp(self.created_at)))
descr += "<li><small><b>updated at:</b> %s</small></li>" % (str(dt.datetime.fromtimestamp(self.updated_at)))
descr += "</ol>"
descr += "</html>"
return descr
class NodeType(Enum):
Root = "root"
Section = "Section"
Block = "Block"
DataArray = "Data Array"
DataFrame = "Data Frame"
Property = "Property"
Dimension = "Dimension"
Source = "Source"
Tag = "Tag"
MultiTag = "Multi Tag"
Group = "Group"
Feature="Feature"
class DataView():
def __init__(self, item_descriptor, file_handler) -> None:
super().__init__()
self._item_descriptor = item_descriptor
self._file_handler = file_handler
self._buffer = None
self._full_shape = item_descriptor.shape
self._offset = np.zeros(len(self._full_shape), dtype=int)
self._count = None
self._max_dim = None
self.init_buffer()
self.request_more()
def request_more(self):
if self.fully_loaded:
return
data = self._file_handler.request_data(self._item_descriptor, self._offset, self._count)
if data is not None and self._buffer is None:
self._buffer = data
self._offset = data.shape
else:
print("need to fetch more!")
def init_buffer(self):
buffer_shape = np.zeros(len(self._full_shape), dtype=int)
max_dim_count = chunksize
max_dim = np.argmax(self._full_shape)
for i, d in enumerate(self._full_shape):
if i != max_dim:
buffer_shape[i] = self._full_shape[i]
max_dim_count /= self._full_shape[i]
buffer_shape[max_dim] = max_dim_count
self._count = buffer_shape
self._max_dim = max_dim
@property
def fully_loaded(self):
return self._buffer is not None and self._full_shape == self._buffer.shape
def __str__(self) -> str:
r = self._item_descriptor.name + " " + str(self._item_descriptor.entity_type)
r += "buffer size: " + str(self._buffer.shape) if self._buffer is not None else "" + "\n"
r += "chunk size:" + str(self._count)
r += "is fully loaded: " + str(self.fully_loaded)
return r
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class EntityBuffer():
def __init__(self) -> None:
super().__init__()
self._buffer = {}
def put(self, entity):
if not hasattr(entity, "id"):
return
id = entity.id
if id not in self._buffer.keys():
self._buffer[id] = entity
def has(self, id):
return id in self._buffer.keys()
def get(self, id):
if self.has(id):
return self._buffer[id]
else:
return None
def clear(self):
self._buffer.clear()
class FileHandler(metaclass=Singleton):
def __init__(self) -> None:
super().__init__()
self._filename = None
self._nix_file = None
self._file_requests = []
self._entity_buffer = EntityBuffer()
self._file_descriptor = None
self._file_version = None
def open(self, filename):
self.close()
if not os.path.exists(filename):
return False, "File %s could not be found!" % filename
try:
self._nix_file = nix.File.open(filename, nix.FileMode.ReadOnly)
self._filename = filename
self._file_descriptor = FileDescriptor(self.filename, self._nix_file.format, self._nix_file.version,
self._nix_file.created_at, self._nix_file.updated_at, os.path.getsize(self.filename)/1e+6)
self.file_descriptor.block_count = len(self._nix_file.blocks)
for b in self._nix_file.blocks:
self.file_descriptor.data_array_count += len(b.data_arrays)
self.file_descriptor.group_count += len(b.groups)
self.file_descriptor.tag_count += len(b.tags)
self.file_descriptor.tag_count += len(b.multi_tags)
if hasattr(b, "data_frames"):
self.file_descriptor.data_frame_count += len(b.data_frames)
self._file_version = self._nix_file.version
return True, "Successfully opened file %s." % filename.split(os.sep)[-1]
except RuntimeError as e:
return False, "Failed to open file %s! \n Error message is: %s" % (filename, e)
except OSError as e:
return False, "Failed to open file %s! \n Error message is: %s\n Probably no nix file?!" % (filename, e)
def close(self):
if self._nix_file is not None and self._nix_file.is_open():
self._nix_file.close()
self._nix_file = None
self._file_requests = []
self._entity_buffer.clear()
self._file_descriptor = None
self._file_version = None
@property
def file_descriptor(self):
return self._file_descriptor
@property
def is_valid(self):
return self._nix_file is not None and self._nix_file.is_open()
@property
def filename(self):
return self._filename
def request_data(self, entity_descriptor, offset=None, count=None):
entity = self._entity_buffer.get(entity_descriptor.id)
if entity is None:
print("need to do something else")
for i, (o, c) in enumerate(zip(offset, count)):
if o + c > entity.shape[i]:
count[i] = entity.shape[i] - o
seg = tuple([slice(o, o + c) for o, c in zip(offset, count)])
print(seg)
return entity[seg]
def request_section_descriptor(self, id):
fs = self._entity_buffer.get(id)
if fs is None:
found_section = self._nix_file.find_sections(lambda s: s.id == id)
fs = found_section[0] if len(found_section) > 0 else None
if fs is None:
return None
else:
item = ItemDescriptor(fs.name, fs.id, fs.type, definition=fs.definition, entity_type=NodeType.Section)
return item
def request_metadata(self, root_id=None, depth=1):
"""[summary]
Args:
root_id ([type], optional): [description]. Defaults to None.
depth (int, optional): [description]. Defaults to 1.
"""
def get_subsections(section):
sub_sections = []
for s in section.sections:
self._entity_buffer.put(s)
sub_sections.append(ItemDescriptor(s.name, s.id, s.type, definition=s.definition, entity_type=NodeType.Section))
return sub_sections
def get_properties(section):
props = []
for p in section.props:
value = ""
if self._file_version < (1,1,1):
vals = p.values
if len(vals) > 1:
value += "["
value += ",".join(map(str, [v.value for v in vals]))
value += "]"
else:
value = str(vals[0].value)
else:
vals = p.values
value += "["
value += ",".join(map(str, [v.value for v in vals]))
value += "]"
if p.unit is not None:
value += " " + p.unit
props.append(ItemDescriptor(p.name, p.id, value=value, entity_type=NodeType.Property))
return props
sections = []
properties = []
if root_id is None:
sections = get_subsections(self._nix_file)
else:
fs = self._entity_buffer.get(root_id)
if fs is None:
found_section = self._nix_file.find_sections(lambda s: s.id == root_id)
fs = found_section[0] if len(found_section) > 0 else None
if fs is None:
return sections, properties
sections.extend(get_subsections(fs))
properties.extend(get_properties(fs))
return sections, properties
def _entity_info(self, entities, block_id, entity_type):
infos = []
for e in entities:
self._entity_buffer.put(e)
itd = ItemDescriptor(e.name, e.id, e.type, definition=e.definition, entity_type=entity_type, block_id=block_id)
section = e.metadata if hasattr(e, "metadata") else None
itd.metadata_id = section.id if section is not None else None
itd.data_type = e.data_type if hasattr(e, "data_type") else None
itd.created_at = e.created_at if hasattr(e, "created_at") else None
itd.updated_at = e.updated_at if hasattr(e, "updated") else None
itd.shape = e.shape if hasattr(e, "shape") else None
src = e.source if hasattr(e, "source") else None
itd.source_id = src.id if src is not None else None
infos.append(itd)
if entity_type == NodeType.DataArray:
itd.value = "%s %s entries" % (str(e.shape), e.dtype)
elif entity_type == NodeType.Tag:
point_or_segment = "segment" if e.extent else "point"
start = str(e.position)
end = ("to " + str(tuple(np.array(e.position) + np.array(e.extent)))) if e.extent else ""
itd.value = "tags %s %s %s" %(point_or_segment, start, end)
# TODO set the value to something meaningful for the various entity types
return infos
def request_blocks(self):
return self._entity_info(self._nix_file.blocks, None, NodeType.Block)
def get_block(self, id):
b = b = self._entity_buffer.get(id)
if not b:
b = self._nix_file.blocks[id]
return b
def request_data_arrays(self, block_id):
b = self.get_block(block_id)
return self._entity_info(b.data_arrays, block_id, NodeType.DataArray)
def request_tags(self, block_id):
b = self.get_block(block_id)
tags = self._entity_info(b.tags, block_id, NodeType.Tag)
tags.extend(self._entity_info(b.multi_tags, block_id, NodeType.MultiTag))
return tags
def request_references(self, block_id, tag_id, is_mtag):
b = self.get_block(block_id)
t = self._entity_buffer.get(tag_id)
if t is None:
if is_mtag:
t = b.multi_tags[tag_id]
else:
t = b.tags[tag_id]
return self._entity_info(t.references, block_id, NodeType.DataArray)
def request_features(self, block_id, tag_id, is_mtag):
b = self.get_block(block_id)
t = self._entity_buffer.get(tag_id)
if t is None:
if is_mtag:
t = b.multi_tags[tag_id]
else:
t = b.tags[tag_id]
feats = []
for f in t.features:
itd = ItemDescriptor(f.data.name, f.id, f.link_type, definition=f.data.definition, block_id=block_id, entity_type=NodeType.Feature)
feats.append(itd)
return feats
def request_dimensions(self, block_id, array_id):
da = self._entity_buffer.get(array_id)
if da is None:
b = self.get_block(block_id)
da = b.data_arrays[array_id]
dimensions = []
for i, d in enumerate(da.dimensions):
dim_name = "%i. dim: %s" % (i+1, d.label if hasattr(d, "label") else "")
dim_type= "%s %s" % (d.dimension_type, "dimension")
dimensions.append(ItemDescriptor(dim_name, type=dim_type, entity_type=NodeType.Dimension, block_id=block_id))
return dimensions
def request_data_frames(self, block_id):
if self._nix_file.version[1] >= 2:
b = self.get_block(block_id)
return self._entity_info(b.data_frames, block_id, NodeType.DataFrame)
return []
def request_groups(self, block_id):
b = self.get_block(block_id)
return self._entity_info(b.groups, block_id, NodeType.Group)
def request_sources(self, block_id, parent_source_id=None):
def get_subsources(src):
sub_sources = []
for s in src.sources:
self._entity_buffer.put(s)
sub_sources.append(ItemDescriptor(s.name, s.id, s.type, definition=s.definition, entity_type=NodeType.Source))
return sub_sources
b = self.get_block(block_id)
if parent_source_id is None:
return self._entity_info(b.sources, block_id, NodeType.Source)
else:
srcs = b.find_sources(lambda s: s.id == parent_source_id)
sources = []
for src in srcs:
sources.extend(get_subsources(src))
return sources