"""Provides a traceset implementation based on Riscure's Inspector traceset format (``.trs``)."""
import struct
from enum import IntEnum
from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation
from pathlib import Path
from typing import Union, Optional, BinaryIO
import numpy as np
from public import public
from .base import TraceSet
from ..trace import Trace
[docs]
@public
class SampleCoding(IntEnum):
Int8 = 0x01
Int16 = 0x02
Int32 = 0x04
Float8 = 0x11
Float16 = 0x12
Float32 = 0x14
[docs]
def dtype(self):
char = "f" if self.value & 0x10 else "i"
return np.dtype(f"<{char}{self.value & 0x0F}")
[docs]
@public
class Parsers:
[docs]
@staticmethod
def read_int(bytes):
return int.from_bytes(bytes, byteorder="little")
[docs]
@staticmethod
def read_bool(bytes):
return Parsers.read_int(bytes) == 1
[docs]
@staticmethod
def read_float(bytes):
return struct.unpack("<f", bytes)[0]
[docs]
@staticmethod
def read_str(bytes):
return bytes.decode("ascii")
[docs]
@staticmethod
def write_int(i, length=1):
return int.to_bytes(i, length=length, byteorder="little")
[docs]
@staticmethod
def write_bool(b, length=1):
return Parsers.write_int(b, length=length)
[docs]
@staticmethod
def write_float(f, length=None):
return struct.pack(f"<{'e' if length == 2 else 'f'}", f)
[docs]
@staticmethod
def write_str(s, length=None):
return s.encode("ascii")
[docs]
@public
class InspectorTraceSet(TraceSet):
"""Riscure Inspector trace set format (.trs)."""
num_traces: int
num_samples: int
sample_coding: SampleCoding
data_space: int = 0
title_space: int = 0
global_title: str = "title"
description: Optional[str] = None
x_offset: int = 0
x_label: Optional[str] = None
y_label: Optional[str] = None
x_scale: float = 1
y_scale: float = 1
trace_offset: int = 0
log_scale: int = 0
scope_range: float = 0
scope_coupling: int = 0
scope_offset: float = 0
scope_impedance: float = 0
scope_id: Optional[str] = None
filter_type: int = 0
filter_frequency: float = 0
filter_range: float = 0
external_clock: bool = False
external_clock_threshold: float = 0
external_clock_multiplier: int = 0
external_clock_phase_shift: int = 0
external_clock_resampler_mask: int = 0
external_clock_resampler_enabled: bool = False
external_clock_frequencty: float = 0
external_clock_time_base: int = 0
_tag_parsers: dict = {
0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int),
0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int),
0x43: (
"sample_coding",
1,
lambda bytes: SampleCoding(Parsers.read_int(bytes)),
lambda coding, length: Parsers.write_int(coding.value, length=length),
),
0x44: ("data_space", 2, Parsers.read_int, Parsers.write_int),
0x45: ("title_space", 1, Parsers.read_int, Parsers.write_int),
0x46: ("global_title", None, Parsers.read_str, Parsers.write_str),
0x47: ("description", None, Parsers.read_str, Parsers.write_str),
0x48: ("x_offset", None, Parsers.read_int, Parsers.write_int),
0x49: ("x_label", None, Parsers.read_str, Parsers.write_str),
0x4A: ("y_label", None, Parsers.read_str, Parsers.write_str),
0x4B: ("x_scale", 4, Parsers.read_float, Parsers.write_float),
0x4C: ("y_scale", 4, Parsers.read_float, Parsers.write_float),
0x4D: ("trace_offset", 4, Parsers.read_int, Parsers.write_int),
0x4E: ("log_scale", 1, Parsers.read_int, Parsers.write_int),
0x55: ("scope_range", 4, Parsers.read_float, Parsers.write_float),
0x56: ("scope_coupling", 4, Parsers.read_int, Parsers.write_int),
0x57: ("scope_offset", 4, Parsers.read_float, Parsers.write_float),
0x58: ("scope_impedance", 4, Parsers.read_float, Parsers.write_float),
0x59: ("scope_id", None, Parsers.read_str, Parsers.write_str),
0x5A: ("filter_type", 4, Parsers.read_int, Parsers.write_int),
0x5B: ("filter_frequency", 4, Parsers.read_float, Parsers.write_float),
0x5C: ("filter_range", 4, Parsers.read_float, Parsers.read_float),
0x60: ("external_clock", 1, Parsers.read_bool, Parsers.write_bool),
0x61: ("external_clock_threshold", 4, Parsers.read_float, Parsers.write_float),
0x62: ("external_clock_multiplier", 4, Parsers.read_int, Parsers.write_int),
0x63: ("external_clock_phase_shift", 4, Parsers.read_int, Parsers.write_int),
0x64: ("external_clock_resampler_mask", 4, Parsers.read_int, Parsers.write_int),
0x65: (
"external_clock_resampler_enabled",
1,
Parsers.read_bool,
Parsers.write_bool,
),
0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float),
0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int),
}
[docs]
@classmethod
def read(cls, input: Union[str, Path, bytes, BinaryIO]) -> "InspectorTraceSet":
"""
Read Inspector trace set from file path, bytes or file-like object.
:param input: Input file path, bytes or file-like object.
:return:
"""
if isinstance(input, bytes):
with BytesIO(input) as r:
traces, tags = InspectorTraceSet.__read(r)
elif isinstance(input, (str, Path)):
with open(input, "rb") as f:
traces, tags = InspectorTraceSet.__read(f)
elif isinstance(input, (RawIOBase, BufferedIOBase, BinaryIO)):
traces, tags = InspectorTraceSet.__read(input)
else:
raise TypeError
for trace in traces:
new = InspectorTraceSet.__scale(trace.samples, tags["y_scale"])
del trace.samples
trace.samples = new
return InspectorTraceSet(*traces, **tags)
@classmethod
def __read(cls, file):
tags = {}
while True:
tag = ord(file.read(1))
length = ord(file.read(1))
if length & 0x80:
length = Parsers.read_int(file.read(length & 0x7F))
value = file.read(length)
if tag in InspectorTraceSet._tag_parsers:
tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag]
if tag_len is None or length == tag_len:
tags[tag_name] = tag_reader(value)
elif tag == 0x5F and length == 0:
break
else:
continue
result = []
for _ in range(tags["num_traces"]):
title = (
None
if "title_space" not in tags
else Parsers.read_str(file.read(tags["title_space"]))
)
data = None if "data_space" not in tags else file.read(tags["data_space"])
dtype = tags["sample_coding"].dtype()
try:
samples = np.fromfile(file, dtype, tags["num_samples"])
except UnsupportedOperation:
samples = np.frombuffer(
file.read(dtype.itemsize * tags["num_samples"]),
dtype,
tags["num_samples"],
)
result.append(Trace(samples, {"title": title, "data": data}))
return result, tags
[docs]
@classmethod
def inplace(cls, input: Union[str, Path, bytes, BinaryIO]) -> "InspectorTraceSet":
raise NotImplementedError
[docs]
def write(self, output: Union[str, Path, BinaryIO]):
"""
Save this trace set into a file.
:param output: An output path or file-like object.
"""
if isinstance(output, (str, Path)):
with open(output, "wb") as f:
self.__write(f)
elif isinstance(output, (RawIOBase, BufferedIOBase, BinaryIO)):
self.__write(output)
else:
raise TypeError
def __write(self, file):
for tag, tag_tuple in self._tag_parsers.items():
tag_name, tag_len, _, tag_writer = tag_tuple
if tag_name not in self._keys:
continue
tag_byte = Parsers.write_int(tag, length=1)
value_bytes = tag_writer(getattr(self, tag_name), tag_len)
length = len(value_bytes)
if length <= 0x7F:
length_bytes = Parsers.write_int(length, length=1)
else:
length_data = Parsers.write_int(
length, length=(length.bit_length() + 7) // 8
)
length_bytes = Parsers.write_int(0x80 | len(length_data)) + length_data
file.write(tag_byte)
file.write(length_bytes)
file.write(value_bytes)
file.write(b"\x5f\x00")
for trace in self._traces:
if self.title_space != 0 and trace.meta["title"] is not None:
file.write(Parsers.write_str(trace.meta["title"]))
if self.data_space != 0 and trace.meta["data"] is not None:
file.write(trace.meta["data"])
unscaled = InspectorTraceSet.__unscale(
trace.samples, self.y_scale, self.sample_coding
)
try:
unscaled.tofile(file)
except UnsupportedOperation:
file.write(unscaled.tobytes())
del unscaled
@staticmethod
def __scale(samples: np.ndarray, factor: float):
return samples.astype("f4") * factor
@staticmethod
def __unscale(samples: np.ndarray, factor: float, coding: SampleCoding):
return (samples * (1 / factor)).astype(coding.dtype())
@property
def sampling_frequency(self) -> int:
"""Return the sampling frequency of the trace set."""
return int(1 / self.x_scale)
def __repr__(self):
args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys])
return f"InspectorTraceSet({args})"