"""Log protocol (:class:`cbclib.LogProtocol`) together with log container (:class:`cbclib.LogContainer`)
provide an interface to retrieve the data from the log files, which contain the readouts from the
motors and other instrument during the experiment.
Examples:
Generate a default built-in log protocol:
>>> import cbclib as cbc
>>> cbc.LogProtocol()
LogProtocol(datatypes={'exposure': 'float', 'n_points': 'int', 'n_steps': 'int', 'scan_type':
'str', 'step_size': 'float', 'x_sample': 'float', 'y_sample': 'float', 'z_sample': 'float',
'r_sample': 'float'}, log_keys={'exposure': ['Exposure'], 'n_points': ['Points count'],
'n_steps': ['Steps count'], 'scan_type': ['Device'], 'step_size': ['Step size'], 'x_sample':
['X-SAM', 'SAM-X', 'SCAN-X'], 'y_sample': ['Y-SAM', 'SAM-Y', 'SCAN-Y'], 'z_sample': ['Z-SAM',
'SAM-Z', 'SCAN-Z'], 'r_sample': ['R-SAM', 'SAM-R', 'SCAN-R']}, part_keys={'exposure':
'Type: Method', 'n_points': 'Type: Scan', 'n_steps': 'Type: Scan', 'scan_type': 'Type: Scan',
'step_size': 'Type: Scan', 'x_sample': 'Session logged attributes', 'y_sample':
'Session logged attributes', 'z_sample': 'Session logged attributes', 'r_sample':
'Session logged attributes'})
Generate a default log data container:
>>> cbc.LogContainer()
LogContainer(protocol=LogProtocol(datatypes={'exposure': 'float', 'n_points': 'int', 'n_steps':
'int', 'scan_type': 'str', 'step_size': 'float', 'x_sample': 'float', 'y_sample': 'float',
'z_sample': 'float', 'r_sample': 'float'}, log_keys={'exposure': ['Exposure'], 'n_points':
['Points count'], 'n_steps': ['Steps count'], 'scan_type': ['Device'], 'step_size': ['Step size'],
'x_sample': ['X-SAM', 'SAM-X', 'SCAN-X'], 'y_sample': ['Y-SAM', 'SAM-Y', 'SCAN-Y'], 'z_sample':
['Z-SAM', 'SAM-Z', 'SCAN-Z'], 'r_sample': ['R-SAM', 'SAM-R', 'SCAN-R']}, part_keys={'exposure':
'Type: Method', 'n_points': 'Type: Scan', 'n_steps': 'Type: Scan', 'scan_type': 'Type: Scan',
'step_size': 'Type: Scan', 'x_sample': 'Session logged attributes', 'y_sample':
'Session logged attributes', 'z_sample': 'Session logged attributes', 'r_sample':
'Session logged attributes'}), log_attr={}, log_data={}, idxs=None, translations=None)
"""
from __future__ import annotations
from dataclasses import dataclass, field
import os
import re
from typing import Any, ClassVar, Dict, Iterable, List, Optional, Tuple, TypeVar
import numpy as np
from .data_container import DataContainer, INIContainer
from .cbc_setup import Sample, ScanSamples, ScanSetup
LOG_PROTOCOL = os.path.join(os.path.dirname(__file__), 'config/log_protocol.ini')
L = TypeVar('L', bound='LogContainer')
[docs]@dataclass
class LogProtocol(INIContainer):
"""Log file protocol class. Contains log file keys to retrieve and the data types of the
corresponding values.
Args:
datatypes : Dictionary with attributes' datatypes. 'float', 'int', 'bool', or 'str' are
allowed.
log_keys : Dictionary with attributes' log file keys.
part_keys : Dictionary with the part names inside the log file where the attributes are
stored.
"""
__ini_fields__ = {'datatypes': 'datatypes', 'log_keys': 'log_keys', 'part_keys': 'part_keys'}
datatypes : Dict[str, str]
log_keys : Dict[str, List[str]]
part_keys : Dict[str, str]
known_types : ClassVar[Dict[str, Any]] = {'int': int, 'bool': bool, 'float': float, 'str': str}
unit_dict : ClassVar[Dict[str, float]] = {'mm': 1e-3, 'mdeg': 1.7453292519943296e-05,
'µm,um': 1e-6, 'udeg,µdeg': 1.7453292519943296e-08,
'nm': 1e-9, 'ndeg': 1.7453292519943296e-11,
'pm': 1e-12, 'pdeg': 1.7453292519943296e-14,
'percent': 1e-2}
def __post_init__(self):
self.log_keys = {attr: self.str_to_list(val)
for attr, val in self.log_keys.items() if attr in self.datatypes}
self.part_keys = {attr: val for attr, val in self.part_keys.items()
if attr in self.datatypes}
[docs] @classmethod
def import_default(cls) -> LogProtocol:
"""Return the default :class:`LogProtocol` object.
Returns:
A :class:`LogProtocol` object with the default parameters.
"""
return cls.import_ini(LOG_PROTOCOL)
@classmethod
def _get_unit(cls, key: str) -> float:
for unit_key in cls.unit_dict:
units = unit_key.split(',')
for unit in units:
if unit in key:
return cls.unit_dict[unit_key]
return 1.0
@classmethod
def _has_unit(cls, key: str) -> bool:
has_unit = False
for unit_key in cls.unit_dict:
units = unit_key.split(',')
for unit in units:
has_unit |= (unit in key)
return has_unit
[docs] def load_attributes(self, path: str) -> Dict[str, Dict[str, Any]]:
"""Return attributes' values from a log file at the given `path`.
Args:
path : Path to the log file.
Returns:
Dictionary with the attributes retrieved from the log file.
"""
if not isinstance(path, str):
raise ValueError('path must be a string')
with open(path, 'r') as log_file:
log_str = ''
for line in log_file:
if line.startswith('# '):
log_str += line.strip('# ')
else:
break
# List all the sector names
part_keys = list(self.part_keys.values())
# Divide log into sectors
parts_list = [part for part in re.split('(' + '|'.join(part_keys) + \
'|--------------------------------)\n*', log_str) if part]
# Rearange sectors into a dictionary
parts = {}
for idx, part in enumerate(parts_list):
if part in part_keys:
if part == 'Session logged attributes':
attr_keys, attr_vals = parts_list[idx + 1].strip('\n').split('\n')
parts['Session logged attributes'] = ''
for key, val in zip(attr_keys.split(';'), attr_vals.split(';')):
parts['Session logged attributes'] += key + ': ' + val + '\n'
else:
val = parts_list[idx + 1]
match = re.search(r'Device:.*\n', val)
if match:
name = match[0].split(': ')[-1][:-1]
parts[part + ', ' + name] = val
# Populate attributes dictionary
attr_dict = {part_name: {} for part_name in parts}
for part_name, part in parts.items():
for attr, part_key in self.part_keys.items():
if part_key in part_name:
for log_key in self.log_keys[attr]:
# Find the attribute's mention and divide it into a key and value pair
match = re.search(log_key + r'.*\n', part)
if match:
raw_str = match[0]
raw_val = raw_str.strip('\n').split(': ')[1]
# Extract a number string
val_num = re.search(r'[-]*\d+[.]*\d*', raw_val)
dtype = self.known_types[self.datatypes[attr]]
attr_dict[part_name][attr] = dtype(val_num[0] if val_num else raw_val)
# Apply unit conversion if needed
if np.issubdtype(dtype, np.floating):
attr_dict[part_name][attr] *= self._get_unit(raw_str)
return attr_dict
[docs] def load_data(self, path: str, idxs: Optional[Iterable[int]]=None,
return_idxs=False) -> Tuple[Dict[str, np.ndarray], np.ndarray]:
"""Retrieve the main data array from the log file.
Args:
path : Path to the log file.
idxs : Array of data indices to load. Loads info for all the frames by default.
return_idxs : Return an array of indices of the scan steps read from the log file
if True.
Returns:
A tuple of two elements:
* Dictionary with data fields and their names retrieved from the log file.
* An array of indices of the scan steps read from the log file.
"""
if idxs is not None:
idxs = np.asarray(idxs)
idxs.sort()
line_count = 0
with open(path, 'r') as log_file:
for line_idx, line in enumerate(log_file):
if line.startswith('# '):
if 'WARNING' not in line:
keys_line = line.strip('# ')
else:
data_line = line
if idxs is None:
skiprows = line_idx
max_rows = None
break
if idxs.size == 0:
skiprows = line_idx
max_rows = 0
break
if line_count == idxs[0]:
skiprows = line_idx
if line_count == idxs[-1]:
max_rows = line_idx - skiprows + 1
break
line_count += 1
keys = keys_line.strip('\n').split(';')
data_strings = data_line.strip('\n').split(';')
dtypes = {'names': [], 'formats': []}
converters = {}
for idx, (key, val) in enumerate(zip(keys, data_strings)):
dtypes['names'].append(key)
unit = self._get_unit(key)
if 'float' in key:
dtypes['formats'].append(np.dtype(float))
converters[idx] = lambda item, unit=unit: unit * float(item)
elif 'int' in key:
if self._has_unit(key):
converters[idx] = lambda item, unit=unit: unit * float(item)
dtypes['formats'].append(np.dtype(float))
else:
dtypes['formats'].append(np.dtype(int))
elif 'Array' in key:
dtypes['formats'].append(np.ndarray)
func = lambda part, unit=unit: unit * float(part)
conv = lambda item, func=func: np.asarray(list(map(func, item.strip(b' []').split(b','))))
converters[idx] = conv
else:
dtypes['formats'].append('<S' + str(len(val)))
converters[idx] = lambda item: item.strip(b' []')
txt_dict = {}
txt_tuple = np.loadtxt(path, delimiter=';', converters=converters,
dtype=dtypes, unpack=True, skiprows=skiprows,
max_rows=max_rows)
if idxs is None:
txt_dict.update(zip(keys, txt_tuple))
idxs = np.arange(txt_tuple[0].size)
elif idxs.size == 0:
txt_dict.update(zip(keys, txt_tuple))
else:
txt_dict.update({key: np.atleast_1d(data)[idxs - np.min(idxs)]
for key, data in zip(keys, txt_tuple)})
if return_idxs:
return txt_dict, idxs
return txt_dict
[docs]@dataclass
class LogContainer(DataContainer):
"""Log data container class. Takes a log protocol :class:`cbclib.LogProtocol` and provides
an interface to read the log files and generate a an array of sample translations and a set
of scan samples :class:`cbclib.ScanSamples`.
Args:
protocol : A log protocol object
log_attr : A dictionary of log attributes imported from a log file.
log_data : A dictionary of log data imported from a log file.
idxs : A set of indices of the scan steps imported from a log file.
translations : An array of sample translations.
"""
protocol : LogProtocol = field(default_factory=LogProtocol.import_default)
log_attr : Dict[str, Dict[str, Any]] = field(default_factory=dict)
log_data : Dict[str, Any] = field(default_factory=dict)
idxs : Optional[np.ndarray] = None
translations : Optional[np.ndarray] = None
_no_data_exc : ClassVar[ValueError] = ValueError('No log data in the container')
def __len__(self) -> int:
return 0 if self.idxs is None else self.idxs.size
[docs] def read_logs(self: L, log_path: str, idxs: Optional[Iterable[int]]=None) -> L:
"""Read a log file under the path `log_path`. Read out only the frame indices defined by
``idxs``. If ``idxs`` is None, read the whole log file.
Args:
log_path : Path to the log file.
idxs : List of indices to read. Read the whole log file if None.
Returns:
A new log container with ``log_attr``, ``log_data``, and ``idxs`` updated.
"""
log_attr = self.protocol.load_attributes(log_path)
log_data, idxs = self.protocol.load_data(log_path, idxs=idxs, return_idxs=True)
return LogContainerFull(**dict(self, log_attr=log_attr, log_data=log_data, idxs=idxs))
[docs] def find_log_part_key(self, attr: str) -> Optional[str]:
"""Find a name of the log dictionary corresponding to an attribute name `attr`.
Args:
attr : A name of the attribute to find.
Returns:
A name of the log dictionary, corresponding to the given attribute name `attr`.
"""
log_keys = self.protocol.log_keys.get(attr, [])
for part in self.log_attr:
for log_key in log_keys:
if log_key in part:
return part
return None
[docs] def find_log_attribute(self, attr: str, part_key: Optional[str]=None) -> Optional[Any]:
"""Find a value in the log attributes corresponding to an attribute name `attr`.
Args:
attr : A name of the attribute to find.
part_key : Search in the given part of the log dictionary if provided.
Returns:
Value of the log attribute. Returns None if nothing is found.
"""
if part_key is None:
part_key = self.protocol.part_keys.get(attr, '')
part_dict = self.log_attr.get(part_key, {})
value = part_dict.get(attr, None)
return value
[docs] def find_log_dataset(self, attr: str) -> Optional[np.ndarray]:
"""Find a dataset in the log data corresponding to an attribute name `attr`.
Args:
attr : A name of the attribute to find.
Returns:
Dataset for the given attribute. Returns None if nothing is found.
"""
log_keys = self.protocol.log_keys.get(attr, [])
for data_key, log_dset in self.log_data.items():
for log_key in log_keys:
if log_key in data_key:
return log_dset
return None
[docs] def simulate_translations(self: L) -> L:
"""Simulate sample translations based on the log attributes.
Raises:
ValueError : If ``log_attr`` is missing.
Returns:
A new log container with ``translations`` updated.
"""
raise self._no_data_exc
[docs] def read_translations(self: L) -> L:
"""Generate sample translations based on the log data.
Raises:
ValueError : If ``log_data`` is missing.
Returns:
A new log container with ``translations`` updated.
"""
raise self._no_data_exc
[docs] def generate_samples(self: L, dist: float, setup: ScanSetup) -> L:
"""Generate a :class:`cbclib.ScanSamples` object from the sample translations.
Args:
dist : Initial focus-to-sample distance in meters.
setup : Experimental setup.
Raises:
ValueError : If ``translations`` is missing.
Returns:
A scan samples object.
"""
raise self._no_data_exc
@dataclass
class LogContainerFull(LogContainer):
protocol : LogProtocol
log_attr : Dict[str, Dict[str, Any]]
log_data : Dict[str, Any]
idxs : Optional[np.ndarray] = None
translations : Optional[np.ndarray] = None
def _is_log_translations(self) -> bool:
return (self.find_log_attribute('x_sample') is not None and
self.find_log_attribute('y_sample') is not None and
self.find_log_attribute('z_sample') is not None and
self.find_log_attribute('r_sample') is not None and
(self.find_log_dataset('x_sample') is not None or
self.find_log_dataset('y_sample') is not None or
self.find_log_dataset('z_sample') is not None or
self.find_log_dataset('r_sample') is not None))
def _is_sim_translations(self) -> bool:
return (self.find_log_attribute('x_sample') is not None and
self.find_log_attribute('y_sample') is not None and
self.find_log_attribute('z_sample') is not None and
self.find_log_attribute('r_sample') is not None and
(self.find_log_part_key('x_sample') is not None or
self.find_log_part_key('y_sample') is not None or
self.find_log_part_key('z_sample') is not None or
self.find_log_part_key('r_sample') is not None))
def simulate_translations(self) -> LogContainerFull:
if not self._is_sim_translations():
raise ValueError('The necessary data is not found')
translations = np.tile((self.find_log_attribute('x_sample'),
self.find_log_attribute('y_sample'),
self.find_log_attribute('z_sample'),
self.find_log_attribute('r_sample')), (len(self), 1))
translations = np.nan_to_num(translations)
step_sizes, n_steps = [], []
for scan_motor, unit_vec in zip(['x_sample', 'y_sample',
'z_sample', 'r_sample'], np.eye(4, 4)):
part_key = self.find_log_part_key(scan_motor)
if part_key is not None:
step_sizes.append(self.log_attr[part_key].get('step_size') * unit_vec)
n_steps.append(self.log_attr[part_key].get('n_points'))
steps = np.tensordot(np.stack(np.mgrid[[slice(0, n) for n in n_steps]], axis=0),
np.stack(step_sizes, axis=0), (0, 0)).reshape(-1, 4)
return self.replace(translations=translations + steps)
def read_translations(self) -> LogContainerFull:
if not self._is_log_translations():
raise ValueError('The necessary data is not found')
translations = np.tile((self.find_log_attribute('x_sample'),
self.find_log_attribute('y_sample'),
self.find_log_attribute('z_sample'),
self.find_log_attribute('r_sample')), (len(self), 1))
translations = np.nan_to_num(translations)
for idx, scan_motor in enumerate(['x_sample', 'y_sample', 'z_sample', 'r_sample']):
dset = self.find_log_dataset(scan_motor)
if dset is not None:
translations[:dset.size, idx] = dset
return self.replace(translations=translations)
def generate_samples(self, dist: float, setup: ScanSetup) -> ScanSamples:
if self.translations is None:
raise ValueError('No translations in the container')
samples = {}
for frame, translation in zip(self.idxs, self.translations):
samples[frame] = Sample(setup.tilt_rotation(translation[3] - self.translations[0, 3]),
translation[2] - self.translations[0, 2] + dist)
return ScanSamples(samples)