aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2020-03-04 19:53:11 +0100
committerJ08nY2020-03-04 19:53:11 +0100
commitf8345c356e37cb8e277d5bf5262c7cc2e57477bd (patch)
tree7b945b91a7e860697f74efe4011978ba05e44e9e
parenta97f49ebe3c8e28d2a9ba76711555a3378b62341 (diff)
downloadpyecsca-f8345c356e37cb8e277d5bf5262c7cc2e57477bd.tar.gz
pyecsca-f8345c356e37cb8e277d5bf5262c7cc2e57477bd.tar.bz2
pyecsca-f8345c356e37cb8e277d5bf5262c7cc2e57477bd.zip
Redo the TraceSet IO interfaces.
-rw-r--r--pyecsca/ec/efd/shortw/w12-0/scaling/z5
-rw-r--r--pyecsca/ec/efd/shortw/w12-0/scaling/z.op35
-rw-r--r--pyecsca/sca/trace/__init__.py1
-rw-r--r--pyecsca/sca/trace/plot.py0
-rw-r--r--pyecsca/sca/trace/trace.py53
-rw-r--r--pyecsca/sca/trace_set/__init__.py2
-rw-r--r--pyecsca/sca/trace_set/base.py28
-rw-r--r--pyecsca/sca/trace_set/chipwhisperer.py66
-rw-r--r--pyecsca/sca/trace_set/hdf5.py84
-rw-r--r--pyecsca/sca/trace_set/inspector.py155
-rw-r--r--pyecsca/sca/trace_set/pickle.py35
-rw-r--r--test/data/test.h5bin0 -> 6712 bytes
-rw-r--r--test/data/test.picklebin0 -> 463 bytes
-rw-r--r--test/sca/test_align.py4
-rw-r--r--test/sca/test_combine.py6
-rw-r--r--test/sca/test_edit.py2
-rw-r--r--test/sca/test_filter.py2
-rw-r--r--test/sca/test_match.py8
-rw-r--r--test/sca/test_process.py2
-rw-r--r--test/sca/test_sampling.py2
-rw-r--r--test/sca/test_test.py22
-rw-r--r--test/sca/test_trace.py2
-rw-r--r--test/sca/test_traceset.py93
23 files changed, 425 insertions, 152 deletions
diff --git a/pyecsca/ec/efd/shortw/w12-0/scaling/z b/pyecsca/ec/efd/shortw/w12-0/scaling/z
new file mode 100644
index 0000000..02907a4
--- /dev/null
+++ b/pyecsca/ec/efd/shortw/w12-0/scaling/z
@@ -0,0 +1,5 @@
+compute A = 1/Z1
+compute AA = A^2
+compute X3 = X1*A
+compute Y3 = Y1*AA
+compute Z3 = 1
diff --git a/pyecsca/ec/efd/shortw/w12-0/scaling/z.op3 b/pyecsca/ec/efd/shortw/w12-0/scaling/z.op3
new file mode 100644
index 0000000..67652e5
--- /dev/null
+++ b/pyecsca/ec/efd/shortw/w12-0/scaling/z.op3
@@ -0,0 +1,5 @@
+A = 1/Z1
+AA = A^2
+X3 = X1*A
+Y3 = Y1*AA
+Z3 = 1
diff --git a/pyecsca/sca/trace/__init__.py b/pyecsca/sca/trace/__init__.py
index 696e387..4190807 100644
--- a/pyecsca/sca/trace/__init__.py
+++ b/pyecsca/sca/trace/__init__.py
@@ -5,6 +5,7 @@ from .combine import *
from .edit import *
from .filter import *
from .match import *
+from .plot import *
from .process import *
from .sampling import *
from .test import *
diff --git a/pyecsca/sca/trace/plot.py b/pyecsca/sca/trace/plot.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/pyecsca/sca/trace/plot.py
diff --git a/pyecsca/sca/trace/trace.py b/pyecsca/sca/trace/trace.py
index 18df978..99a6814 100644
--- a/pyecsca/sca/trace/trace.py
+++ b/pyecsca/sca/trace/trace.py
@@ -1,23 +1,44 @@
import weakref
+from typing import Optional, Any, Mapping, Sequence
+from copy import copy, deepcopy
+
from numpy import ndarray
+import numpy as np
from public import public
-from typing import Optional, Sequence, Any
@public
class Trace(object):
- """A power trace, which has an optional title, optional data bytes and mandatory samples."""
+ """A trace, which has an optional title, optional data bytes and mandatory samples."""
title: Optional[str]
data: Optional[bytes]
+ meta: Mapping[str, Any]
samples: ndarray
def __init__(self, samples: ndarray, title: Optional[str], data: Optional[bytes],
- trace_set: Any = None):
+ meta: Mapping[str, Any] = None, trace_set: Any = None):
self.title = title
self.data = data
+ self.meta = meta
self.samples = samples
self.trace_set = trace_set
+ def __len__(self):
+ """Length of the trace, in samples."""
+ return len(self.samples)
+
+ def __getitem__(self, index):
+ """Get the sample at `index`."""
+ return self.samples[index]
+
+ def __setitem__(self, key, value):
+ """Set the sample at `key`."""
+ self.samples[key] = value
+
+ def __iter__(self):
+ """Iterate over the samples."""
+ yield from self.samples
+
@property
def trace_set(self) -> Any:
if self._trace_set is None:
@@ -31,17 +52,37 @@ class Trace(object):
else:
self._trace_set = weakref.ref(trace_set)
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state["_trace_set"]
+ return state
+
+ def __eq__(self, other):
+ if not isinstance(other, Trace):
+ return False
+ return np.array_equal(self.samples, other.samples) and self.title == other.title and self.data == other.data and self.meta == other.meta
+
+ def with_samples(self, samples: ndarray) -> "Trace":
+ return Trace(samples, deepcopy(self.title), deepcopy(self.data), deepcopy(self.meta), deepcopy(self.trace_set))
+
+ def __copy__(self):
+ return Trace(copy(self.samples), copy(self.title), copy(self.data), copy(self.meta), copy(self.trace_set))
+
+ def __deepcopy__(self, memodict={}):
+ return Trace(deepcopy(self.samples, memo=memodict), deepcopy(self.title, memo=memodict), deepcopy(self.data, memo=memodict), deepcopy(self.meta, memo=memodict), deepcopy(self.trace_set, memo=memodict))
+
def __repr__(self):
return f"Trace(title={self.title!r}, data={self.data!r}, samples={self.samples!r}, trace_set={self.trace_set!r})"
@public
class CombinedTrace(Trace):
- """A power trace that was combined from other traces, `parents`."""
+ """A trace that was combined from other traces, `parents`."""
def __init__(self, samples: ndarray, title: Optional[str], data: Optional[bytes],
- trace_set=None, parents: Sequence[Trace] = None):
- super().__init__(samples, title, data, trace_set=trace_set)
+ meta: Mapping[str, Any] = None, trace_set: Any = None,
+ parents: Sequence[Trace] = None):
+ super().__init__(samples, title, data, meta, trace_set=trace_set)
self.parents = None
if parents is not None:
self.parents = weakref.WeakSet(parents)
diff --git a/pyecsca/sca/trace_set/__init__.py b/pyecsca/sca/trace_set/__init__.py
index cdede67..ee5e08a 100644
--- a/pyecsca/sca/trace_set/__init__.py
+++ b/pyecsca/sca/trace_set/__init__.py
@@ -3,3 +3,5 @@
from .base import *
from .inspector import *
from .chipwhisperer import *
+from .pickle import *
+from .hdf5 import *
diff --git a/pyecsca/sca/trace_set/base.py b/pyecsca/sca/trace_set/base.py
index 2221933..0bab76d 100644
--- a/pyecsca/sca/trace_set/base.py
+++ b/pyecsca/sca/trace_set/base.py
@@ -1,16 +1,21 @@
+from io import RawIOBase, BufferedIOBase
+from pathlib import Path
+from typing import List, Union
+
from public import public
-from typing import List
from ..trace import Trace
@public
class TraceSet(object):
- _traces: List = []
- _keys: List = []
+ _traces: List[Trace]
+ _keys: List
def __init__(self, *traces: Trace, **kwargs):
self._traces = list(traces)
+ for trace in self._traces:
+ trace.trace_set = self
self.__dict__.update(kwargs)
self._keys = list(kwargs.keys())
@@ -22,10 +27,27 @@ class TraceSet(object):
"""Get the trace at `index`."""
return self._traces[index]
+ def __setitem__(self, key, value):
+ if not isinstance(value, Trace):
+ raise TypeError
+ self._traces[key] = value
+ value.trace_set = self
+
def __iter__(self):
"""Iterate over the traces."""
yield from self._traces
+ @classmethod
+ def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet":
+ raise NotImplementedError
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet":
+ raise NotImplementedError
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ raise NotImplementedError
+
def __repr__(self):
args = ", ".join(["{}={!r}".format(key, getattr(self, key)) for key in
self._keys])
diff --git a/pyecsca/sca/trace_set/chipwhisperer.py b/pyecsca/sca/trace_set/chipwhisperer.py
index 2b37c6f..e70c457 100644
--- a/pyecsca/sca/trace_set/chipwhisperer.py
+++ b/pyecsca/sca/trace_set/chipwhisperer.py
@@ -1,9 +1,14 @@
-import numpy as np
-from .base import TraceSet
-from os.path import exists, isfile, join
from configparser import ConfigParser
+from io import RawIOBase, BufferedIOBase
+from itertools import zip_longest
+from os.path import exists, isfile, join, basename, dirname
+from pathlib import Path
+from typing import Union
+
+import numpy as np
from public import public
+from .base import TraceSet
from ..trace import Trace
@@ -11,27 +16,54 @@ from ..trace import Trace
class ChipWhispererTraceSet(TraceSet):
"""ChipWhisperer trace set (native) format."""
- def __init__(self, path: str = None, name: str = None):
- if path is None and name is None:
- super().__init__()
+ @classmethod
+ def read(cls,
+ input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "ChipWhispererTraceSet":
+ if isinstance(input, (str, Path)):
+ traces, kwargs = ChipWhispererTraceSet.__read(input)
+ return ChipWhispererTraceSet(*traces, **kwargs)
else:
- data = self.__read_data(path, name)
- trace_data = data["traces"]
- traces = [Trace(trace_samples, None, None, trace_set=self) for trace_samples in trace_data]
- del data["traces"]
- config = self.__read_config(path, name)
- super().__init__(*traces, **data, **config)
-
- def __read_data(self, path, name):
+ raise ValueError
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "ChipWhispererTraceSet":
+ raise NotImplementedError
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ raise NotImplementedError
+
+ @classmethod
+ def __read(cls, full_path):
+ file_name = basename(full_path)
+ if not file_name.startswith("config_") or not file_name.endswith(".cfg"):
+ raise ValueError
+ path = dirname(full_path)
+ name = file_name[7:-4]
+ data = ChipWhispererTraceSet.__read_data(path, name)
+ traces = []
+ for samples, key, textin, textout in zip_longest(data["traces"], data["keylist"],
+ data["textin"], data["textout"]):
+ traces.append(
+ Trace(samples, None, None, {"key": key, "textin": textin, "textout": textout}))
+ del data["traces"]
+ del data["keylist"]
+ del data["textin"]
+ del data["textout"]
+ config = ChipWhispererTraceSet.__read_config(path, name)
+ return traces, {**data, **config}
+
+ @classmethod
+ def __read_data(cls, path, name):
types = {"keylist": None, "knownkey": None, "textin": None, "textout": None, "traces": None}
for type in types.keys():
- type_path = join(path, name + "_" + type + ".npy")
+ type_path = join(path, name + type + ".npy")
if exists(type_path) and isfile(type_path):
types[type] = np.load(type_path, allow_pickle=True)
return types
- def __read_config(self, path, name):
- config_path = join(path, "config_" + name + "_.cfg")
+ @classmethod
+ def __read_config(cls, path, name):
+ config_path = join(path, "config_" + name + ".cfg")
if exists(config_path) and isfile(config_path):
config = ConfigParser()
config.read(config_path)
diff --git a/pyecsca/sca/trace_set/hdf5.py b/pyecsca/sca/trace_set/hdf5.py
new file mode 100644
index 0000000..0a24393
--- /dev/null
+++ b/pyecsca/sca/trace_set/hdf5.py
@@ -0,0 +1,84 @@
+from copy import copy
+from io import RawIOBase, BufferedIOBase, IOBase
+from pathlib import Path
+from typing import Union, Optional
+import numpy as np
+import h5py
+from public import public
+
+from .base import TraceSet
+from .. import Trace
+
+
+@public
+class HDF5TraceSet(TraceSet):
+ _file: Optional[h5py.File]
+
+ @classmethod
+ def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet":
+ if isinstance(input, (str, Path)):
+ hdf5 = h5py.File(str(input), mode="r")
+ elif isinstance(input, IOBase):
+ hdf5 = h5py.File(input, mode="r")
+ else:
+ raise ValueError
+ kwargs = dict(hdf5.attrs)
+ traces = []
+ for k, v in hdf5.items():
+ meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None
+ samples = hdf5[k]
+ traces.append(Trace(np.array(samples, dtype=samples.dtype), None, None, meta))
+ hdf5.close()
+ return HDF5TraceSet(*traces, **kwargs)
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "HDF5TraceSet":
+ if isinstance(input, (str, Path)):
+ hdf5 = h5py.File(str(input), mode="a")
+ elif isinstance(input, IOBase):
+ hdf5 = h5py.File(input, mode="a")
+ else:
+ raise ValueError
+ kwargs = dict(hdf5.attrs)
+ traces = []
+ for k, v in hdf5.items():
+ meta = dict(hdf5[k].attrs) if hdf5[k].attrs else None
+ samples = hdf5[k]
+ traces.append(Trace(samples, k, None, meta))
+ return HDF5TraceSet(*traces, **kwargs, _file=hdf5)
+
+ def __setitem__(self, key, value):
+ if not isinstance(value, Trace):
+ raise TypeError
+ if self._file is not None:
+ if str(key) in self._file:
+ del self._file[str(key)]
+ self._file[str(key)] = value.samples
+ if value.meta:
+ for k, v in value.meta.items():
+ self._file[str(key)].attrs[k] = v
+ super().__setitem__(key, value)
+
+ def save(self):
+ if self._file is not None:
+ self._file.flush()
+
+ def close(self):
+ if self._file is not None:
+ self._file.close()
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ if isinstance(output, (str, Path)):
+ hdf5 = h5py.File(str(output), "w")
+ elif isinstance(output, IOBase):
+ hdf5 = h5py.File(output, "w")
+ else:
+ raise ValueError
+ for k in self._keys:
+ hdf5[k] = getattr(self, k)
+ for i, trace in enumerate(self._traces):
+ dset = hdf5.create_dataset(str(i), trace.samples)
+ if trace.meta:
+ for k, v in trace.meta.items():
+ dset.attrs[k] = v
+ hdf5.close()
diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py
index b07313d..4f45aa0 100644
--- a/pyecsca/sca/trace_set/inspector.py
+++ b/pyecsca/sca/trace_set/inspector.py
@@ -1,10 +1,11 @@
-import numpy as np
import struct
from enum import IntEnum
from io import BytesIO, RawIOBase, BufferedIOBase, UnsupportedOperation
from pathlib import Path
+from typing import Union, Optional
+
+import numpy as np
from public import public
-from typing import Union, Optional, BinaryIO, List
from .base import TraceSet
from ..trace import Trace
@@ -100,7 +101,6 @@ class InspectorTraceSet(TraceSet):
external_clock_frequencty: float = 0
external_clock_time_base: int = 0
- _raw_traces: Optional[List[Trace]] = None
_tag_parsers: dict = {
0x41: ("num_traces", 4, Parsers.read_int, Parsers.write_int),
0x42: ("num_samples", 4, Parsers.read_int, Parsers.write_int),
@@ -136,39 +136,34 @@ class InspectorTraceSet(TraceSet):
0x66: ("external_clock_frequency", 4, Parsers.read_float, Parsers.write_float),
0x67: ("external_clock_time_base", 4, Parsers.read_int, Parsers.write_int)
}
- _set_tags: set = set()
- def __init__(self, input: Optional[Union[str, Path, bytes, RawIOBase, BufferedIOBase]] = None,
- keep_raw_traces: bool = True):
+ @classmethod
+ def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "TraceSet":
"""
Read Inspector trace set from file path, bytes or file-like object.
:param input: Input file path, bytes or file-like object.
- :param keep_raw_traces: Whether to store the raw (unscaled) traces as well.
+ :return:
"""
- traces = None
if isinstance(input, bytes):
- with BytesIO(input) as f:
- traces = self.__read(f)
- elif isinstance(input, (Path, str)):
- with open(input, "rb") as r:
- traces = self.__read(r)
+ with BytesIO(input) as r:
+ traces, tags = InspectorTraceSet.__read(r)
+ elif isinstance(input, (str, Path)):
+ with open(input, "rb") as f:
+ traces, tags = InspectorTraceSet.__read(f)
elif isinstance(input, (RawIOBase, BufferedIOBase)):
- traces = self.__read(input)
- elif input is not None:
- raise ValueError(
- "Cannot parse data, unknown input: {}".format(input))
- if traces is not None:
- super().__init__(*self.__scale(traces))
- else:
- super().__init__()
- if keep_raw_traces:
- self._raw_traces = traces
+ traces, tags = InspectorTraceSet.__read(input)
else:
- del traces
-
- def __read(self, file):
- self._set_tags = set()
+ raise ValueError
+ for trace in traces:
+ new = InspectorTraceSet.__scale(trace.samples, tags["y_scale"])
+ del trace.samples
+ trace.samples = new
+ return InspectorTraceSet(*traces, **tags)
+
+ @classmethod
+ def __read(cls, file):
+ tags = {}
while True:
tag = ord(file.read(1))
length = ord(file.read(1))
@@ -176,35 +171,52 @@ class InspectorTraceSet(TraceSet):
length = Parsers.read_int(file.read(length & 0x7f))
value = file.read(length)
if tag in InspectorTraceSet._tag_parsers:
- tag_name, tag_len, tag_reader, _ = \
- InspectorTraceSet._tag_parsers[tag]
+ tag_name, tag_len, tag_reader, _ = InspectorTraceSet._tag_parsers[tag]
if tag_len is None or length == tag_len:
- setattr(self, tag_name, tag_reader(value))
- self._set_tags.add(tag)
+ tags[tag_name] = tag_reader(value)
elif tag == 0x5f and length == 0:
break
else:
continue
result = []
- for _ in range(self.num_traces):
- title = None if self.title_space == 0 else Parsers.read_str(
- file.read(self.title_space))
- data = None if self.data_space == 0 else file.read(self.data_space)
- dtype = self.sample_coding.dtype()
+ for _ in range(tags["num_traces"]):
+ title = None if "title_space" not in tags else Parsers.read_str(
+ file.read(tags["title_space"]))
+ data = None if "data_space" not in tags else file.read(tags["data_space"])
+ dtype = tags["sample_coding"].dtype()
try:
- samples = np.fromfile(file, dtype, self.num_samples)
+ samples = np.fromfile(file, dtype, tags["num_samples"])
except UnsupportedOperation:
samples = np.frombuffer(
- file.read(dtype.itemsize * self.num_samples), dtype,
- self.num_samples)
- result.append(Trace(samples, title, data, trace_set=self))
- return result
+ file.read(dtype.itemsize * tags["num_samples"]), dtype,
+ tags["num_samples"])
+ result.append(Trace(samples, title, data))
+ return result, tags
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet":
+ raise NotImplementedError
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ """
+ Save this trace set into a file.
+
+ :param output: An output path or file-like object.
+ """
+ if isinstance(output, (str, Path)):
+ with open(output, "wb") as f:
+ self.__write(f)
+ elif isinstance(output, (RawIOBase, BufferedIOBase)):
+ self.__write(output)
+ else:
+ raise ValueError
def __write(self, file):
- for set_tag in self._set_tags:
- tag_name, tag_len, _, tag_writer = InspectorTraceSet._tag_parsers[
- set_tag]
- tag_byte = Parsers.write_int(set_tag, length=1)
+ for tag, tag_tuple in self._tag_parsers.items():
+ tag_name, tag_len, _, tag_writer = tag_tuple
+ if tag_name not in self._keys:
+ continue
+ tag_byte = Parsers.write_int(tag, length=1)
value_bytes = tag_writer(getattr(self, tag_name), tag_len)
length = len(value_bytes)
if length <= 0x7f:
@@ -218,47 +230,32 @@ class InspectorTraceSet(TraceSet):
file.write(value_bytes)
file.write(b"\x5f\x00")
- for trace in self._raw_traces:
+ for trace in self._traces:
if self.title_space != 0 and trace.title is not None:
file.write(Parsers.write_str(trace.title))
if self.data_space != 0 and trace.data is not None:
file.write(trace.data)
+ unscaled = InspectorTraceSet.__unscale(trace.samples, self.y_scale, self.sample_coding)
try:
- trace.samples.tofile(file)
+ unscaled.tofile(file)
except UnsupportedOperation:
- file.write(trace.samples.tobytes())
-
- def __scale(self, traces):
- return list(map(lambda trace: Trace(trace.samples.astype("f4") * self.y_scale, trace.title,
- trace.data, trace_set=self),
- traces))
-
- def save(self, output: Union[Path, str, BinaryIO]):
- """
- Save this trace set into a file.
+ file.write(unscaled.tobytes())
+ del unscaled
- :param output: An output path or file-like object.
- """
- if isinstance(output, (Path, str)):
- with open(output, "wb") as f:
- self.__write(f)
- elif isinstance(output, (RawIOBase, BufferedIOBase)):
- self.__write(output)
- else:
- raise ValueError("Cannot save data, unknown output: {}".format(output))
+ def __setitem__(self, key, value):
+ if not isinstance(value, Trace):
+ raise TypeError
+ if len(value) != self.num_samples or value.samples.dtype != self.sample_coding.dtype():
+ raise ValueError
+ super().__setitem__(key, value)
- def __bytes__(self):
- """Return the byte-representation of this trace set file."""
- with BytesIO() as b:
- self.save(b)
- return b.getvalue()
+ @staticmethod
+ def __scale(samples: np.ndarray, factor: float):
+ return samples.astype("f4") * factor
- @property
- def raw(self) -> Optional[List[Trace]]:
- """The raw (unscaled) traces, as read from the trace set file."""
- if self._raw_traces is None:
- return None
- return list(self._raw_traces)
+ @staticmethod
+ def __unscale(samples: np.ndarray, factor: float, coding: SampleCoding):
+ return (samples * (1 / factor)).astype(coding.dtype())
@property
def sampling_frequency(self) -> int:
@@ -266,7 +263,5 @@ class InspectorTraceSet(TraceSet):
return int(1 / self.x_scale)
def __repr__(self):
- args = ", ".join(
- [f"{self._tag_parsers[set_tag][0]}={getattr(self, self._tag_parsers[set_tag][0])!r}"
- for set_tag in self._set_tags])
+ args = ", ".join([f"{key}={getattr(self, key)!r}" for key in self._keys])
return f"InspectorTraceSet({args})"
diff --git a/pyecsca/sca/trace_set/pickle.py b/pyecsca/sca/trace_set/pickle.py
new file mode 100644
index 0000000..42bc3bb
--- /dev/null
+++ b/pyecsca/sca/trace_set/pickle.py
@@ -0,0 +1,35 @@
+import pickle
+from io import BufferedIOBase, RawIOBase, IOBase
+from pathlib import Path
+from typing import Union
+
+from public import public
+
+from .base import TraceSet
+
+
+@public
+class PickleTraceSet(TraceSet):
+ @classmethod
+ def read(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet":
+ if isinstance(input, bytes):
+ return pickle.loads(input)
+ elif isinstance(input, (str, Path)):
+ with open(input, "rb") as f:
+ return pickle.load(f)
+ elif isinstance(input, IOBase):
+ return pickle.load(input)
+ raise ValueError
+
+ @classmethod
+ def inplace(cls, input: Union[str, Path, bytes, RawIOBase, BufferedIOBase]) -> "PickleTraceSet":
+ raise NotImplementedError
+
+ def write(self, output: Union[str, Path, RawIOBase, BufferedIOBase]):
+ if isinstance(output, (str, Path)):
+ with open(output, "wb") as f:
+ pickle.dump(self, f)
+ elif isinstance(output, IOBase):
+ pickle.dump(self, output)
+ else:
+ raise ValueError
diff --git a/test/data/test.h5 b/test/data/test.h5
new file mode 100644
index 0000000..78df1fa
--- /dev/null
+++ b/test/data/test.h5
Binary files differ
diff --git a/test/data/test.pickle b/test/data/test.pickle
new file mode 100644
index 0000000..43f7f07
--- /dev/null
+++ b/test/data/test.pickle
Binary files differ
diff --git a/test/sca/test_align.py b/test/sca/test_align.py
index 96cc458..b595058 100644
--- a/test/sca/test_align.py
+++ b/test/sca/test_align.py
@@ -23,13 +23,13 @@ class AlignTests(TestCase):
@slow
def test_large_align(self):
- example = InspectorTraceSet("test/data/example.trs")
+ example = InspectorTraceSet.read("test/data/example.trs")
result = align_correlation(*example, reference_offset=100000, reference_length=20000, max_offset=15000)
self.assertIsNotNone(result)
@slow
def test_large_dtw_align(self):
- example = InspectorTraceSet("test/data/example.trs")
+ example = InspectorTraceSet.read("test/data/example.trs")
result = align_dtw(*example[:5])
self.assertIsNotNone(result)
diff --git a/test/sca/test_combine.py b/test/sca/test_combine.py
index 7fda1fa..41df9fb 100644
--- a/test/sca/test_combine.py
+++ b/test/sca/test_combine.py
@@ -7,9 +7,9 @@ from pyecsca.sca import Trace, CombinedTrace, average, conditional_average, stan
class CombineTests(TestCase):
def setUp(self):
- self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff")
- self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff")
- self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00")
+ self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff", None)
+ self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff", None)
+ self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00", None)
def test_average(self):
self.assertIsNone(average())
diff --git a/test/sca/test_edit.py b/test/sca/test_edit.py
index 637e9b7..f5b529e 100644
--- a/test/sca/test_edit.py
+++ b/test/sca/test_edit.py
@@ -8,7 +8,7 @@ from pyecsca.sca import Trace, trim, reverse, pad
class EditTests(TestCase):
def setUp(self):
- self._trace = Trace(np.array([10, 20, 30, 40, 50], dtype=np.dtype("i1")), None, None)
+ self._trace = Trace(np.array([10, 20, 30, 40, 50], dtype=np.dtype("i1")), None, None, None)
def test_trim(self):
result = trim(self._trace, 2)
diff --git a/test/sca/test_filter.py b/test/sca/test_filter.py
index 9194fa4..73516d8 100644
--- a/test/sca/test_filter.py
+++ b/test/sca/test_filter.py
@@ -10,7 +10,7 @@ class FilterTests(TestCase):
def setUp(self):
self._trace = Trace(
np.array([5, 12, 15, 13, 15, 11, 7, 2, -4, -8, -10, -8, -13, -9, -11, -8, -5],
- dtype=np.dtype("i1")), None, None)
+ dtype=np.dtype("i1")), None, None, None)
def test_lowpass(self):
result = filter_lowpass(self._trace, 100, 20)
diff --git a/test/sca/test_match.py b/test/sca/test_match.py
index b8663c7..549f143 100644
--- a/test/sca/test_match.py
+++ b/test/sca/test_match.py
@@ -10,21 +10,21 @@ class MatchingTests(TestCase):
def test_simple_match(self):
pattern = Trace(np.array([1, 15, 12, -10, 0, 13, 17, -1, 0], dtype=np.dtype("i1")), None,
- None)
+ None, None)
base = Trace(np.array(
[0, 1, 3, 1, 2, -2, -3, 1, 15, 12, -10, 0, 13, 17, -1, 0, 3, 1],
- dtype=np.dtype("i1")), None, None)
+ dtype=np.dtype("i1")), None, None, None)
filtered = match_part(base, 7, 9)
self.assertListEqual(filtered, [7])
plot(self, base=base, pattern=pad(pattern, (filtered[0], 0)))
def test_multiple_match(self):
pattern = Trace(np.array([1, 15, 12, -10, 0, 13, 17, -1, 0], dtype=np.dtype("i1")), None,
- None)
+ None, None)
base = Trace(np.array(
[0, 1, 3, 1, 2, -2, -3, 1, 18, 10, -5, 0, 13, 17, -1, 0, 3, 1, 2, 5, 13, 8, -8, 1,
11, 15, 0, 1, 5, 2, 4],
- dtype=np.dtype("i1")), None, None)
+ dtype=np.dtype("i1")), None, None, None)
filtered = match_pattern(base, pattern, 0.9)
self.assertListEqual(filtered, [7, 19])
plot(self, base=base, pattern1=pad(pattern, (filtered[0], 0)), pattern2=pad(pattern, (filtered[1], 0)))
diff --git a/test/sca/test_process.py b/test/sca/test_process.py
index f039a95..49d520f 100644
--- a/test/sca/test_process.py
+++ b/test/sca/test_process.py
@@ -7,7 +7,7 @@ from pyecsca.sca import Trace, absolute, invert, threshold, rolling_mean, offset
class ProcessTests(TestCase):
def setUp(self):
- self._trace = Trace(np.array([30, -60, 145, 247], dtype=np.dtype("i2")), None, None)
+ self._trace = Trace(np.array([30, -60, 145, 247], dtype=np.dtype("i2")), None, None, None)
def test_absolute(self):
result = absolute(self._trace)
diff --git a/test/sca/test_sampling.py b/test/sca/test_sampling.py
index 40eaa7d..c08128c 100644
--- a/test/sca/test_sampling.py
+++ b/test/sca/test_sampling.py
@@ -29,7 +29,7 @@ class SamplingTests(TestCase):
def test_downsample_decimate(self):
trace = Trace(np.array([20, 30, 55, 18, 15, 10, 35, 24, 21, 15, 10, 8, -10, -5,
-8, -12, -15, -18, -34, -21, -17, -10, -5, -12, -6, -2,
- 4, 8, 21, 28], dtype=np.dtype("i1")), None, None)
+ 4, 8, 21, 28], dtype=np.dtype("i1")), None, None, None)
result = downsample_decimate(trace, 2)
self.assertIsNotNone(result)
self.assertIsInstance(result, Trace)
diff --git a/test/sca/test_test.py b/test/sca/test_test.py
index 5277c20..f012f7f 100644
--- a/test/sca/test_test.py
+++ b/test/sca/test_test.py
@@ -8,19 +8,19 @@ from pyecsca.sca import Trace, welch_ttest, student_ttest, ks_test
class TTestTests(TestCase):
def setUp(self):
- self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff")
- self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff")
- self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00")
- self.d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00")
+ self.a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff", None)
+ self.b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff", None)
+ self.c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00", None)
+ self.d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00", None)
def test_welch_ttest(self):
self.assertIsNotNone(welch_ttest([self.a, self.b], [self.c, self.d]))
a = Trace(np.array([19.8, 20.4, 19.6, 17.8, 18.5, 18.9, 18.3, 18.9, 19.5, 22.0]), None,
- None)
+ None, None)
b = Trace(np.array([28.2, 26.6, 20.1, 23.3, 25.2, 22.1, 17.7, 27.6, 20.6, 13.7]), None,
- None)
+ None, None)
c = Trace(np.array([20.2, 21.6, 27.1, 13.3, 24.2, 20.1, 11.7, 25.6, 26.6, 21.4]), None,
- None)
+ None, None)
result = welch_ttest([a, b], [b, c])
self.assertIsNotNone(result)
@@ -35,8 +35,8 @@ class KolmogorovSmirnovTests(TestCase):
def test_ks_test(self):
self.assertIsNone(ks_test([], []))
- a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff")
- b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff")
- c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00")
- d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00")
+ a = Trace(np.array([20, 80], dtype=np.dtype("i1")), None, b"\xff", None)
+ b = Trace(np.array([30, 42], dtype=np.dtype("i1")), None, b"\xff", None)
+ c = Trace(np.array([78, 56], dtype=np.dtype("i1")), None, b"\x00", None)
+ d = Trace(np.array([98, 36], dtype=np.dtype("i1")), None, b"\x00", None)
self.assertIsNotNone(ks_test([a, b], [c, d]))
diff --git a/test/sca/test_trace.py b/test/sca/test_trace.py
index 3399c09..d09c4f5 100644
--- a/test/sca/test_trace.py
+++ b/test/sca/test_trace.py
@@ -6,7 +6,7 @@ from pyecsca.sca import Trace
class TraceTests(TestCase):
def test_basic(self):
- trace = Trace(np.array([10, 15, 24], dtype=np.dtype("i1")), "Name", b"\xff\xaa")
+ trace = Trace(np.array([10, 15, 24], dtype=np.dtype("i1")), "Name", b"\xff\xaa", None)
self.assertIsNotNone(trace)
self.assertIn("Trace", str(trace))
self.assertIsNone(trace.trace_set)
diff --git a/test/sca/test_traceset.py b/test/sca/test_traceset.py
index 6d48707..a3ff526 100644
--- a/test/sca/test_traceset.py
+++ b/test/sca/test_traceset.py
@@ -1,8 +1,17 @@
import os.path
+import shutil
import tempfile
from unittest import TestCase
-from pyecsca.sca import TraceSet, InspectorTraceSet, ChipWhispererTraceSet
+import numpy as np
+
+from pyecsca.sca import (TraceSet, InspectorTraceSet, ChipWhispererTraceSet, PickleTraceSet,
+ HDF5TraceSet, Trace)
+
+EXAMPLE_TRACES = [Trace(np.array([20, 40, 50, 50, 10], dtype=np.dtype("i1")), None, None),
+ Trace(np.array([1, 2, 3, 4, 5], dtype=np.dtype("i1")), None, None),
+ Trace(np.array([6, 7, 8, 9, 10], dtype=np.dtype("i1")), None, None)]
+EXAMPLE_KWARGS = {"num_traces": 3, "thingy": "abc"}
class TraceSetTests(TestCase):
@@ -11,12 +20,14 @@ class TraceSetTests(TestCase):
self.assertIsNotNone(TraceSet())
self.assertIsNotNone(InspectorTraceSet())
self.assertIsNotNone(ChipWhispererTraceSet())
+ self.assertIsNotNone(PickleTraceSet())
+ self.assertIsNotNone(HDF5TraceSet())
class InspectorTraceSetTests(TestCase):
def test_load_fname(self):
- result = InspectorTraceSet("test/data/example.trs")
+ result = InspectorTraceSet.read("test/data/example.trs")
self.assertIsNotNone(result)
self.assertEqual(result.global_title, "Example trace set")
self.assertEqual(len(result), 10)
@@ -27,36 +38,76 @@ class InspectorTraceSetTests(TestCase):
def test_load_file(self):
with open("test/data/example.trs", "rb") as f:
- self.assertIsNotNone(InspectorTraceSet(f))
+ self.assertIsNotNone(InspectorTraceSet.read(f))
def test_load_bytes(self):
with open("test/data/example.trs", "rb") as f:
- self.assertIsNotNone(InspectorTraceSet(f.read()))
-
- def test_get_bytes(self):
- self.assertIsNotNone(bytes(InspectorTraceSet("test/data/example.trs")))
-
- def test_keep_traces(self):
- trace_set = InspectorTraceSet("test/data/example.trs")
- self.assertIsNotNone(trace_set.raw)
- trace_set = InspectorTraceSet("test/data/example.trs", keep_raw_traces=False)
- self.assertIsNone(trace_set.raw)
+ self.assertIsNotNone(InspectorTraceSet.read(f.read()))
def test_save(self):
- trace_set = InspectorTraceSet("test/data/example.trs")
+ trace_set = InspectorTraceSet.read("test/data/example.trs")
with tempfile.TemporaryDirectory() as dirname:
path = os.path.join(dirname, "out.trs")
- trace_set.save(path)
+ trace_set.write(path)
self.assertTrue(os.path.exists(path))
- self.assertIsNotNone(InspectorTraceSet(path))
+ self.assertIsNotNone(InspectorTraceSet.read(path))
- with self.assertRaises(ValueError):
- trace_set.save(None)
-
-class ChipWhispererTraceSetTest(TestCase):
+class ChipWhispererTraceSetTests(TestCase):
def test_load_fname(self):
- result = ChipWhispererTraceSet("test/data/", "chipwhisperer")
+ result = ChipWhispererTraceSet.read("test/data/config_chipwhisperer_.cfg")
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
+
+
+class PickleTraceSetTests(TestCase):
+
+ def test_load_fname(self):
+ result = PickleTraceSet.read("test/data/test.pickle")
+ self.assertIsNotNone(result)
+
+ def test_load_file(self):
+ with open("test/data/test.pickle", "rb") as f:
+ self.assertIsNotNone(PickleTraceSet.read(f))
+
+ def test_save(self):
+ trace_set = PickleTraceSet(*EXAMPLE_TRACES, **EXAMPLE_KWARGS)
+ with tempfile.TemporaryDirectory() as dirname:
+ path = os.path.join(dirname, "out.pickle")
+ trace_set.write(path)
+ self.assertTrue(os.path.exists(path))
+ self.assertIsNotNone(PickleTraceSet.read(path))
+
+
+class HDF5TraceSetTests(TestCase):
+
+ def test_load_fname(self):
+ result = HDF5TraceSet.read("test/data/test.h5")
+ self.assertIsNotNone(result)
+
+ def test_load_file(self):
+ with open("test/data/test.h5", "rb") as f:
+ self.assertIsNotNone(HDF5TraceSet.read(f))
+
+ def test_inplace(self):
+ with tempfile.TemporaryDirectory() as dirname:
+ path = os.path.join(dirname, "test.h5")
+ shutil.copy("test/data/test.h5", path)
+ trace_set = HDF5TraceSet.inplace(path)
+ self.assertIsNotNone(trace_set)
+ test_trace = Trace(np.array([6, 7], dtype=np.dtype("i1")), None, None, meta={"thing": "ring"})
+ trace_set[0] = test_trace
+ trace_set.save()
+ trace_set.close()
+
+ test_set = HDF5TraceSet.read(path)
+ self.assertEquals(test_set[0], test_trace)
+
+ def test_save(self):
+ trace_set = HDF5TraceSet(*EXAMPLE_TRACES, **EXAMPLE_KWARGS)
+ with tempfile.TemporaryDirectory() as dirname:
+ path = os.path.join(dirname, "out.h5")
+ trace_set.write(path)
+ self.assertTrue(os.path.exists(path))
+ self.assertIsNotNone(HDF5TraceSet.read(path))