aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2020-02-07 23:51:52 +0100
committerJ08nY2020-02-07 23:51:52 +0100
commit8fa9f4bb7455b8153c8d50313c432a1d8bc8f020 (patch)
tree3a48c54280ad5c83eb74fb890565423664bd5032
parentab5d3d0884d50cadb7afaa9e519c5c86ffa6b67f (diff)
downloadpyecsca-8fa9f4bb7455b8153c8d50313c432a1d8bc8f020.tar.gz
pyecsca-8fa9f4bb7455b8153c8d50313c432a1d8bc8f020.tar.bz2
pyecsca-8fa9f4bb7455b8153c8d50313c432a1d8bc8f020.zip
Add docs to Scope class, add support for pico-python.
-rw-r--r--.travis.yml6
-rw-r--r--pyecsca/sca/scope/__init__.py32
-rw-r--r--pyecsca/sca/scope/base.py86
-rw-r--r--pyecsca/sca/scope/chipwhisperer.py44
-rw-r--r--pyecsca/sca/scope/picoscope_alt.py57
-rw-r--r--pyecsca/sca/scope/picoscope_sdk.py (renamed from pyecsca/sca/scope/picoscope.py)116
-rw-r--r--pyecsca/sca/trace/test.py1
-rw-r--r--pyecsca/sca/trace_set/inspector.py6
-rw-r--r--setup.py3
9 files changed, 302 insertions, 49 deletions
diff --git a/.travis.yml b/.travis.yml
index b171891..0bac320 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,6 +15,10 @@ addons:
- libps6000
before_install:
+ - git clone https://github.com/colinoflynn/pico-python
+ - cd pico-python
+ - python setup.py install
+ - cd ..
- git clone https://github.com/picotech/picosdk-python-wrappers
- cd picosdk-python-wrappers
- python setup.py install
@@ -22,7 +26,7 @@ before_install:
install:
- pip install codecov
- - pip install -e ".[picoscope, chipwhisperer, test, typecheck]"
+ - pip install -e ".[picoscope_sdk, picoscope_alt, chipwhisperer, test, typecheck]"
script:
- make -i typecheck
diff --git a/pyecsca/sca/scope/__init__.py b/pyecsca/sca/scope/__init__.py
index 25cd0ca..d11ed82 100644
--- a/pyecsca/sca/scope/__init__.py
+++ b/pyecsca/sca/scope/__init__.py
@@ -1,11 +1,39 @@
+from typing import Type
+
+from .base import *
+
+has_picoscope = False
+has_picosdk = False
+has_chipwhisperer = False
+
+try:
+ import picoscope
+
+ has_picoscope = True
+except ImportError:
+ pass
+
try:
import picosdk
- from .picoscope import *
+
+ has_picosdk = True
except ImportError:
pass
try:
import chipwhisperer
- from .chipwhisperer import *
+
+ has_chipwhisperer = True
except ImportError:
pass
+
+PicoScope: Type[Scope]
+if has_picoscope:
+ from .picoscope_alt import *
+ PicoScope = PicoScopeAlt
+elif has_picosdk:
+ from .picoscope_sdk import *
+ PicoScope = PicoScopeSdk
+
+if has_chipwhisperer:
+ from .chipwhisperer import *
diff --git a/pyecsca/sca/scope/base.py b/pyecsca/sca/scope/base.py
index 3bade40..2979a7d 100644
--- a/pyecsca/sca/scope/base.py
+++ b/pyecsca/sca/scope/base.py
@@ -1,5 +1,89 @@
+from typing import Tuple, Sequence, Optional
+import numpy as np
+from public import public
+
+@public
class Scope(object):
"""An oscilloscope."""
- pass
+
+ def open(self) -> None:
+ """Open the connection to the scope."""
+ raise NotImplementedError
+
+ @property
+ def channels(self) -> Sequence[str]:
+ """A list of channels available on this scope."""
+ raise NotImplementedError
+
+ def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]:
+ """
+ Setup the frequency and sample count for the measurement. The scope might not support
+ the requested values and will adjust them to get the next best frequency and the largest
+ supported number of samples (or the number of samples requested).
+
+ :param frequency: The requested frequency in Hz.
+ :param samples: The requested number of samples to measure.
+ :return: A tuple of the actual set frequency and the actual number of samples.
+ """
+ raise NotImplementedError
+
+ def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None:
+ """
+ Setup a channel to use the coupling method and measure the given voltage range.
+
+ :param channel: The channel to measure.
+ :param coupling: The coupling method ("AC" or "DC).
+ :param range: The voltage range to measure.
+ :param enable: Whether to enable or disable the channel.
+ """
+ raise NotImplementedError
+
+ def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
+ timeout: int, enable: bool) -> None:
+ """
+ Setup a trigger on a particular `channel`, the channel has to be set up and enabled.
+ The trigger will fire based on the `threshold` and `direction`, if enabled, the trigger
+ will capture after `delay` ticks pass. If trigger conditions do not hold it will fire
+ automatically after `timeout` milliseconds.
+
+ :param channel: The channel to trigger on.
+ :param threshold: The value to trigger on.
+ :param direction: The direction to trigger on ("above", "below", "rising", "falling").
+ :param delay: The delay for capture after trigger (clock ticks).
+ :param timeout: The timeout in milliseconds.
+ :param enable: Whether to enable or disable this trigger.
+ """
+ raise NotImplementedError
+
+ def setup_capture(self, channel: str, enable: bool) -> None:
+ """
+ Setup the capture for a channel.
+
+ :param channel: The channel to capture.
+ :param enable: Whether to enable or disable capture.
+ """
+ raise NotImplementedError
+
+ def arm(self) -> None:
+ """Arm the scope, it will listen for the trigger after this point."""
+ raise NotImplementedError
+
+ def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]:
+ """
+ Wait for the trace to capture, this will block until the scope has a trace.
+
+ :param channel: The channel to retrieve the trace from.
+ :param timeout: A time in milliseconds to wait for the trace, returns `None` if it runs out.
+ :return: The trace, or if timed out, None.
+ """
+ raise NotImplementedError
+
+ def stop(self) -> None:
+ """Stop the capture, if any."""
+ raise NotImplementedError
+
+ def close(self) -> None:
+ """Close the connection to the scope."""
+ raise NotImplementedError
diff --git a/pyecsca/sca/scope/chipwhisperer.py b/pyecsca/sca/scope/chipwhisperer.py
index 2eb2747..60fe7cf 100644
--- a/pyecsca/sca/scope/chipwhisperer.py
+++ b/pyecsca/sca/scope/chipwhisperer.py
@@ -1,5 +1,47 @@
+from typing import Optional, Tuple, Sequence
+
+import numpy as np
+from chipwhisperer.capture.scopes.base import ScopeTemplate
+from public import public
+
from .base import Scope
+@public
class ChipWhispererScope(Scope):
"""A ChipWhisperer based scope."""
- pass \ No newline at end of file
+
+ def __init__(self, scope: ScopeTemplate):
+ self.scope = scope
+
+ def open(self) -> None:
+ self.scope.con()
+
+ @property
+ def channels(self) -> Sequence[str]:
+ return ["tio1", "tio2", "tio3", "tio4"]
+
+ def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]:
+ pass
+
+ def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None:
+ pass
+
+ def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
+ timeout: int, enable: bool) -> None:
+ pass
+
+ def setup_capture(self, channel: str, enable: bool) -> None:
+ pass
+
+ def arm(self) -> None:
+ pass
+
+ def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]:
+ pass
+
+ def stop(self) -> None:
+ pass
+
+ def close(self) -> None:
+ pass
+
diff --git a/pyecsca/sca/scope/picoscope_alt.py b/pyecsca/sca/scope/picoscope_alt.py
new file mode 100644
index 0000000..69b705a
--- /dev/null
+++ b/pyecsca/sca/scope/picoscope_alt.py
@@ -0,0 +1,57 @@
+from time import time_ns, sleep
+from typing import Optional, Tuple, Sequence, Union
+
+import numpy as np
+from picoscope.ps4000 import PS4000
+from picoscope.ps6000 import PS6000
+from public import public
+
+from .base import Scope
+
+
+@public
+class PicoScopeAlt(Scope):
+
+ def __init__(self, ps: Union[PS4000, PS6000]):
+ self.ps = ps
+
+ def open(self) -> None:
+ self.ps.open()
+
+ @property
+ def channels(self) -> Sequence[str]:
+ return list(self.ps.CHANNELS.keys())
+
+ def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]:
+ actual_frequency, max_samples = self.ps.setSamplingFrequency(frequency, samples)
+ if max_samples < samples:
+ samples = max_samples
+ return actual_frequency, samples
+
+ def setup_channel(self, channel: str, coupling: str, range: float, enable: bool) -> None:
+ self.ps.setChannel(channel, coupling, range, 0.0, enable)
+
+ def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
+ timeout: int, enable: bool) -> None:
+ self.ps.setSimpleTrigger(channel, threshold, direction, delay, timeout, enable)
+
+ def setup_capture(self, channel: str, enable: bool) -> None:
+ pass
+
+ def arm(self) -> None:
+ self.ps.runBlock()
+
+ def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]:
+ start = time_ns()
+ while not self.ps.isReady():
+ sleep(0.001)
+ if timeout is not None and (time_ns() - start) / 1e6 >= timeout:
+ return None
+
+ return self.ps.getDataV(channel)
+
+ def stop(self) -> None:
+ self.ps.stop()
+
+ def close(self) -> None:
+ self.ps.close()
diff --git a/pyecsca/sca/scope/picoscope.py b/pyecsca/sca/scope/picoscope_sdk.py
index 8497c64..d44d8c8 100644
--- a/pyecsca/sca/scope/picoscope.py
+++ b/pyecsca/sca/scope/picoscope_sdk.py
@@ -1,7 +1,7 @@
import ctypes
-from enum import IntEnum
+from time import time_ns, sleep
from math import log2, floor
-from typing import Mapping, Optional, MutableMapping, Union
+from typing import Mapping, Optional, MutableMapping, Union, Tuple
import numpy as np
from picosdk.functions import assert_pico_ok
@@ -13,13 +13,6 @@ from public import public
from .base import Scope
-class TriggerType(IntEnum): # pragma: no cover
- ABOVE = 1
- BELOW = 2
- RISING = 3
- FALLING = 4
-
-
def adc2volt(adc: Union[np.ndarray, ctypes.c_int16],
volt_range: float, adc_minmax: int) -> Union[np.ndarray, float]: # pragma: no cover
if isinstance(adc, ctypes.c_int16):
@@ -35,7 +28,8 @@ def volt2adc(volt: Union[np.ndarray, float],
return (volt / volt_range) * adc_minmax
-class PicoScope(Scope): # pragma: no cover
+@public
+class PicoScopeSdk(Scope): # pragma: no cover
"""A PicoScope based scope."""
MODULE: Library
PREFIX: str
@@ -45,18 +39,28 @@ class PicoScope(Scope): # pragma: no cover
MIN_ADC_VALUE: int
COUPLING: Mapping
TIME_UNITS: Mapping
+ TRIGGERS: Mapping = {
+ "above": 0,
+ "below": 1,
+ "rising": 2,
+ "falling": 3
+ }
def __init__(self):
self.handle: ctypes.c_int16 = ctypes.c_int16()
- self.frequency: Optional[float] = None
+ self.frequency: Optional[int] = None
self.samples: Optional[int] = None
self.timebase: Optional[int] = None
self.buffers: MutableMapping = {}
self.ranges: MutableMapping = {}
- def open(self):
+ def open(self) -> None:
assert_pico_ok(self.__dispatch_call("OpenUnit", ctypes.byref(self.handle)))
+ @property
+ def channels(self):
+ return list(self.CHANNELS.keys())
+
def get_variant(self):
info = (ctypes.c_int8 * 6)()
size = ctypes.c_int16()
@@ -64,6 +68,9 @@ class PicoScope(Scope): # pragma: no cover
ctypes.byref(size), 3))
return "".join(chr(i) for i in info[:size])
+ def setup_frequency(self, frequency: int, samples: int) -> Tuple[int, int]:
+ return self.set_frequency(frequency, samples)
+
# channel setup (ranges, coupling, which channel is scope vs trigger)
def set_channel(self, channel: str, enabled: bool, coupling: str, range: float):
assert_pico_ok(
@@ -71,17 +78,20 @@ class PicoScope(Scope): # pragma: no cover
self.COUPLING[coupling], self.RANGES[range]))
self.ranges[channel] = range
+ def setup_channel(self, channel: str, coupling: str, range: float, enable: bool):
+ self.set_channel(channel, enable, coupling, range)
+
def _set_freq(self, frequency: int, samples: int, period_bound: float, timebase_bound: int,
- low_freq: int, high_freq: int, high_subtract: int):
+ low_freq: int, high_freq: int, high_subtract: int) -> Tuple[int, int]:
period = 1 / frequency
if low_freq == 0 or period > period_bound:
tb = floor(high_freq / frequency + high_subtract)
- actual_frequency = high_freq / (tb - high_subtract)
+ actual_frequency = high_freq // (tb - high_subtract)
else:
tb = floor(log2(low_freq) - log2(frequency))
if tb > timebase_bound:
tb = timebase_bound
- actual_frequency = low_freq / 2 ** tb
+ actual_frequency = low_freq // 2 ** tb
max_samples = ctypes.c_int32()
assert_pico_ok(self.__dispatch_call("GetTimebase", self.handle, tb, samples, None, 0,
ctypes.byref(max_samples), 0))
@@ -93,44 +103,61 @@ class PicoScope(Scope): # pragma: no cover
return actual_frequency, samples
# frequency setup
- def set_frequency(self, frequency: int, samples: int):
+ def set_frequency(self, frequency: int, samples: int) -> Tuple[int, int]:
raise NotImplementedError
+ def setup_trigger(self, channel: str, threshold: float, direction: str, delay: int,
+ timeout: int, enable: bool):
+ self.set_trigger(direction, enable, threshold, channel, delay, timeout)
+
# triggering setup
- def set_trigger(self, type: TriggerType, enabled: bool, value: float, channel: str,
- range: float, delay: int, timeout: int):
+ def set_trigger(self, type: str, enabled: bool, value: float, channel: str,
+ delay: int, timeout: int):
assert_pico_ok(
self.__dispatch_call("SetSimpleTrigger", self.handle, enabled,
self.CHANNELS[channel],
- volt2adc(value, range, self.MAX_ADC_VALUE),
- type.value, delay, timeout))
+ volt2adc(value, self.ranges[channel], self.MAX_ADC_VALUE),
+ self.TRIGGERS[type], delay, timeout))
+
+ def setup_capture(self, channel: str, enable: bool):
+ self.set_buffer(channel, enable)
# buffer setup
- def set_buffer(self, channel: str):
+ def set_buffer(self, channel: str, enable: bool):
if self.samples is None:
raise ValueError
- buffer = (ctypes.c_int16 * self.samples)()
- self.buffers[channel] = buffer
- assert_pico_ok(self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel],
- ctypes.byref(buffer), self.samples))
+ if enable:
+ if channel in self.buffers:
+ del self.buffers[channel]
+ buffer = (ctypes.c_int16 * self.samples)()
+ assert_pico_ok(self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel],
+ ctypes.byref(buffer), self.samples))
+ self.buffers[channel] = buffer
+ else:
+ assert_pico_ok(self.__dispatch_call("SetDataBuffer", self.handle, self.CHANNELS[channel],
+ None, self.samples))
+ del self.buffers[channel]
- # collection
- def collect(self):
+ def arm(self):
if self.samples is None or self.timebase is None:
raise ValueError
assert_pico_ok(
self.__dispatch_call("RunBlock", self.handle, 0, self.samples, self.timebase, 0,
None,
0, None, None))
+
+ def capture(self, channel: str, timeout: Optional[int] = None) -> Optional[np.ndarray]:
+ start = time_ns()
+ if self.samples is None:
+ raise ValueError
ready = ctypes.c_int16(0)
check = ctypes.c_int16(0)
while ready.value == check.value:
+ sleep(0.001)
assert_pico_ok(self.__dispatch_call("IsReady", self.handle, ctypes.byref(ready)))
+ if timeout is not None and (time_ns() - start) / 1e6 >= timeout:
+ return None
- # get the data
- def retrieve(self, channel: str) -> np.ndarray:
- if self.samples is None:
- raise ValueError
actual_samples = ctypes.c_int32(self.samples)
overflow = ctypes.c_int16()
assert_pico_ok(
@@ -155,7 +182,7 @@ class PicoScope(Scope): # pragma: no cover
@public
-class PS4000Scope(PicoScope): # pragma: no cover
+class PS4000Scope(PicoScopeSdk): # pragma: no cover
MODULE = ps4000
PREFIX = "ps4000"
CHANNELS = {
@@ -200,7 +227,7 @@ class PS4000Scope(PicoScope): # pragma: no cover
@public
-class PS6000Scope(PicoScope): # pragma: no cover
+class PS6000Scope(PicoScopeSdk): # pragma: no cover
MODULE = ps6000
PREFIX = "ps6000"
CHANNELS = {
@@ -241,15 +268,24 @@ class PS6000Scope(PicoScope): # pragma: no cover
self.COUPLING[coupling], self.RANGES[range], 0,
ps6000.PS6000_BANDWIDTH_LIMITER["PS6000_BW_FULL"]))
- def set_buffer(self, channel: str):
+ def set_buffer(self, channel: str, enable: bool):
if self.samples is None:
raise ValueError
- buffer = (ctypes.c_int16 * self.samples)()
- self.buffers[channel] = buffer
- assert_pico_ok(
- ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel],
- ctypes.byref(buffer),
- self.samples, 0))
+ if enable:
+ if channel in self.buffers:
+ del self.buffers[channel]
+ buffer = (ctypes.c_int16 * self.samples)()
+ assert_pico_ok(
+ ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel],
+ ctypes.byref(buffer),
+ self.samples, 0))
+ self.buffers[channel] = buffer
+ else:
+ assert_pico_ok(
+ ps6000.ps6000SetDataBuffer(self.handle, self.CHANNELS[channel],
+ None,
+ self.samples, 0))
+ del self.buffers[channel]
def set_frequency(self, frequency: int, samples: int):
return self._set_freq(frequency, samples, 3.2e-9, 4, 5_000_000_000, 156_250_000, 4)
diff --git a/pyecsca/sca/trace/test.py b/pyecsca/sca/trace/test.py
index e192048..92cda68 100644
--- a/pyecsca/sca/trace/test.py
+++ b/pyecsca/sca/trace/test.py
@@ -49,6 +49,7 @@ def ks_test(first_set: Sequence[Trace], second_set: Sequence[Trace]) -> Optional
"""
Perform the Kolmogorov-Smirnov two sample test on equality of distributions sample wise on
two sets of traces `first_set` and `second_set`.
+
:param first_set:
:param second_set:
:return: Kolmogorov-Smirnov test statistic values (samplewise)
diff --git a/pyecsca/sca/trace_set/inspector.py b/pyecsca/sca/trace_set/inspector.py
index 4964f85..f9475c0 100644
--- a/pyecsca/sca/trace_set/inspector.py
+++ b/pyecsca/sca/trace_set/inspector.py
@@ -138,7 +138,7 @@ class InspectorTraceSet(TraceSet):
}
_set_tags: set = set()
- def __init__(self, input: Optional[Union[str, Path, bytes, BinaryIO]] = None,
+ def __init__(self, input: Optional[Union[str, Path, bytes, RawIOBase, BufferedIOBase]] = None,
keep_raw_traces: bool = True):
"""
Read Inspector trace set from file path, bytes or file-like object.
@@ -151,8 +151,8 @@ class InspectorTraceSet(TraceSet):
with BytesIO(input) as f:
traces = self.__read(f)
elif isinstance(input, (Path, str)):
- with open(input, "rb") as f:
- traces = self.__read(f)
+ with open(input, "rb") as r:
+ traces = self.__read(r)
elif isinstance(input, (RawIOBase, BufferedIOBase)):
traces = self.__read(input)
elif input is not None:
diff --git a/setup.py b/setup.py
index 9cffcb8..60941fc 100644
--- a/setup.py
+++ b/setup.py
@@ -34,7 +34,8 @@ setup(
"asn1crypto"
],
extras_require={
- "picoscope": ["picosdk"],
+ "picoscope_sdk": ["picosdk"],
+ "picoscope_alt": ["picoscope"],
"chipwhisperer": ["chipwhisperer"],
"typecheck": ["mypy"],
"test": ["nose2", "parameterized", "green", "coverage"]