forked from fossasia/pslab-python
/
logic_analyzer.py
784 lines (675 loc) · 28.6 KB
/
logic_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
"""Classes and functions related to the PSLab's logic analyzer instrument.
Example
-------
>>> from pslab import LogicAnalyzer
>>> la = LogicAnalyzer()
>>> t = la.capture(channels=2, events=1600, modes=["falling", "any"])
"""
import time
from collections import OrderedDict
from typing import Dict, List, Tuple, Union
import numpy as np
import pslab.protocol as CP
from pslab.instrument.digital import DigitalInput, DIGITAL_INPUTS, MODES
from pslab.serial_handler import ADCBufferMixin, SerialHandler
class LogicAnalyzer(ADCBufferMixin):
"""Investigate digital signals on up to four channels simultaneously.
Parameters
----------
device : :class:`SerialHandler`, optional
Serial connection to PSLab device. If not provided, a new one will be
created.
Attributes
----------
trigger_channel : str
See :meth:`configure_trigger`.
trigger_mode : str
See :meth:`configure_trigger`.
"""
_PRESCALERS = {
0: 1,
1: 8,
2: 64,
3: 256,
}
# When capturing multiple channels, there is a two clock cycle
# delay between channels.
_CAPTURE_DELAY = 2
def __init__(self, device: SerialHandler = None):
self._device = SerialHandler() if device is None else device
self._channels = {d: DigitalInput(d) for d in DIGITAL_INPUTS}
self.trigger_channel = "LA1"
self._trigger_channel = self._channels["LA1"]
self.trigger_mode = "disabled"
self._trigger_mode = 0
self._prescaler = 0
self._channel_one_map = "LA1"
self._channel_two_map = "LA2"
self._trimmed = 0
def measure_frequency(
self, channel: str, simultaneous_oscilloscope: bool = False, timeout: float = 1
) -> float:
"""Measure the frequency of a signal.
Parameters
----------
channel : {"LA1", "LA2", "LA3", "LA4"}
Name of the digital input channel in which to measure the
frequency.
simultaneous_oscilloscope: bool, optional
Set this to True if you need to use the oscilloscope at the same
time. Uses firmware instead of software to measure the frequency,
which may fail and return 0. Will not give accurate results above
10 MHz. The default value is False.
timeout : float, optional
Timeout in seconds before cancelling measurement. The default value
is 1 second.
Returns
-------
frequency : float
The signal's frequency in Hz.
"""
if simultaneous_oscilloscope:
return self._measure_frequency_firmware(channel, timeout)
else:
tmp = self._channel_one_map
self._channel_one_map = channel
t = self.capture(1, 2, modes=["sixteen rising"], timeout=timeout)[0]
self._channel_one_map = tmp
try:
period = (t[1] - t[0]) * 1e-6 / 16
frequency = period**-1
except IndexError:
frequency = 0
if frequency >= 1e7:
frequency = self._get_high_frequency(channel)
return frequency
def _measure_frequency_firmware(
self, channel: str, timeout: float, retry: bool = True
) -> float:
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.GET_FREQUENCY)
self._device.send_int(int(timeout * 64e6) >> 16)
self._device.send_byte(self._channels[channel].number)
self._device.wait_for_data(timeout)
error = self._device.get_byte()
t = [self._device.get_long() for a in range(2)]
self._device.get_ack()
edges = 16
period = (t[1] - t[0]) / edges / CP.CLOCK_RATE
if error or period == 0:
# Retry once.
if retry:
return self._measure_frequency_firmware(channel, timeout, False)
else:
return 0
else:
return period**-1
def _get_high_frequency(self, channel: str) -> float:
"""Measure high frequency signals using firmware.
The input frequency is fed to a 32 bit counter for a period of 100 ms.
The value of the counter at the end of 100 ms is used to calculate the
frequency.
"""
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.GET_ALTERNATE_HIGH_FREQUENCY)
self._device.send_byte(self._channels[channel].number)
scale = self._device.get_byte()
counter_value = self._device.get_long()
self._device.get_ack()
return scale * counter_value / 1e-1 # 100 ms sampling
def measure_interval(
self, channels: List[str], modes: List[str], timeout: float = 1
) -> float:
"""Measure the time between two events.
This method cannot be used at the same time as the oscilloscope.
Parameters
----------
channels : List[str]
A pair of digital inputs, LA1, LA2, LA3, or LA4. Both can be the
same.
modes : List[str]
Type of logic event to listen for on each channel. See
:class:`DigitalInput` for available modes.
timeout : float, optional
Timeout in seconds before cancelling measurement. The default value
is 1 second.
Returns
-------
interval : float
Time between events in microseconds. A negative value means that
the event on the second channel happend first.
"""
tmp_trigger = self._trigger_channel.name
self.configure_trigger(channels[0], self.trigger_mode)
tmp_map = self._channel_one_map, self._channel_two_map
self._channel_one_map = channels[0]
self._channel_two_map = channels[1]
if channels[0] == channels[1]:
# 34 edges contains 17 rising edges, i.e two
# 'every sixteenth rising edge' events.
t = self.capture(1, 34, modes=["any"], timeout=timeout)[0]
initial = self.get_initial_states()[self._channel_one_map]
t1 = self._get_first_event(t, modes[0], initial)
if modes[0] == modes[1]:
idx = 1 if modes[1] == "any" else 2
initial = initial if idx == 2 else not initial
t2 = self._get_first_event(t[idx:], modes[1], initial)
else:
t2 = self._get_first_event(t, modes[1], initial)
else:
t1, t2 = self.capture(2, 1, modes=modes, timeout=timeout)
t1, t2 = t1[0], t2[0]
self.configure_trigger(tmp_trigger, self.trigger_mode)
self._channel_one_map = tmp_map[0]
self._channel_two_map = tmp_map[1]
return t2 - t1
@staticmethod
def _get_first_event(events: np.ndarray, mode: str, initial: bool) -> np.ndarray:
if mode == "any":
return events[0]
elif mode == "rising":
return events[int(initial)]
elif mode == "falling":
return events[int(not initial)]
elif mode == "four rising":
return events[initial::2][3]
elif mode == "sixteen rising":
return events[initial::2][15]
def measure_duty_cycle(self, channel: str, timeout: float = 1) -> Tuple[float]:
"""Measure duty cycle and wavelength.
This method cannot be used at the same time as the oscilloscope.
Parameters
----------
channel : {"LA1", "LA2", "LA3", "LA4"}
Digital input on which to measure.
timeout : float, optional
Timeout in seconds before cancelling measurement. The default value
is 1 second.
Returns
-------
wavelength : float
Wavelength in microseconds.
duty_cycle : float
Duty cycle as a value between 0 - 1.
"""
tmp_trigger_mode = self.trigger_mode
tmp_trigger_channel = self._trigger_channel.name
self.configure_trigger(trigger_channel=channel, trigger_mode="rising")
tmp_map = self._channel_one_map
self._channel_one_map = channel
t = self.capture(1, 3, modes=["any"], timeout=timeout)[0]
self._channel_one_map = tmp_map
self.configure_trigger(tmp_trigger_channel, tmp_trigger_mode)
period = t[2] - t[0]
# First change is HIGH -> LOW since we trigger on rising.
duty_cycle = 1 - (t[1] - t[0]) / period
return period, duty_cycle
def capture(
self,
channels: Union[int, str, List[str]],
events: int = CP.MAX_SAMPLES // 4,
timeout: float = 1,
modes: List[str] = 4 * ("any",),
e2e_time: float = None,
block: bool = True,
) -> Union[List[np.ndarray], None]:
"""Capture logic events.
This method cannot be used at the same time as the oscilloscope.
Parameters
----------
channels : {1, 2, 3, 4} or str or list of str
Number of channels to capture events on. Events will be captured on
LA1, LA2, LA3, and LA4, in that order. Alternatively, the name of
of a single digital input, or a list of two names of digital inputs
can be provided. In that case, events will be captured only on that
or those specific channels.
events : int, optional
Number of logic events to capture on each channel. The default and
maximum value is 2500.
timeout : float, optional
Timeout in seconds before cancelling measurement in blocking mode.
If the timeout is reached, the events captured up to that point
will be returned. The default value is 1 second.
modes : List[str], optional
List of strings specifying the type of logic level change to
capture on each channel. See :class:`DigitalInput` for available
modes. The default value is ("any", "any", "any", "any").
e2e_time : float, optional
The maximum time between events in seconds. This is only required
in three and four channel mode, which uses 16-bit counters as
opposed to 32-bit counters which are used in one and two channel
mode. The 16-bit counter normally rolls over after 1024 µs, so if
the time between events is greater than that the timestamp
calculations will be incorrect. By setting this to a value greater
than 1024 µs, the counter will be slowed down by a prescaler, which
can extend the maximum allowed event-to-event time gap to up to
262 ms. If the time gap is greater than that, three and four
channel mode cannot be used. One and two channel mode supports
timegaps up to 67 seconds.
block : bool, optional
Whether to block while waiting for events to be captured. If this
is False, this method will return None immediately and the captured
events must be manually fetched by calling :meth:`fetch_data`. The
default value is True.
Returns
-------
events : list of numpy.ndarray or None
List of numpy.ndarrays containing timestamps in microseconds when
logic events were detected, or None if block is False. The length
of the list is equal to the number of channels that were used to
capture events, and each list element corresponds to a channel.
Raises
------
ValueError if too many events are requested, or
ValueError if too many channels are selected.
"""
channels = self._check_arguments(channels, events)
self.stop()
self._prescaler = 0
self.clear_buffer(CP.MAX_SAMPLES)
self._invalidate_buffer()
self._configure_trigger(channels)
modes = [MODES[m] for m in modes]
start_time = time.time()
for e, c in enumerate(
[self._channel_one_map, self._channel_two_map, "LA3", "LA4"][:channels]
):
c = self._channels[c]
c.events_in_buffer = events
c.datatype = "long" if channels < 3 else "int"
c.buffer_idx = 2500 * e * (1 if c.datatype == "int" else 2)
c._logic_mode = modes[e]
if channels == 1:
self._capture_one()
elif channels == 2:
self._capture_two()
else:
self._capture_four(e2e_time)
if block:
# Discard 4:th channel if user asked for 3.
timestamps = self.fetch_data()[:channels]
progress = min([len(t) for t in timestamps])
while progress < events:
timestamps = self.fetch_data()[:channels]
progress = min([len(t) for t in timestamps])
if time.time() - start_time >= timeout:
break
if progress >= CP.MAX_SAMPLES // 4 - self._trimmed:
break
else:
return
for e, t in enumerate(timestamps):
timestamps[e] = t[:events] # Don't surprise the user with extra events.
return timestamps
def _check_arguments(self, channels: Union[int, str, List[str]], events: int):
if isinstance(channels, str):
self._channel_one_map = channels
channels = 1
if isinstance(channels, list):
self._channel_one_map = channels[0]
self._channel_two_map = channels[1]
channels = 2
max_events = CP.MAX_SAMPLES // 4
if events > max_events:
raise ValueError(f"Events must be fewer than {max_events}.")
elif channels < 0 or channels > 4:
raise ValueError("Channels must be between 1-4.")
return channels
def _capture_one(self):
self._channels[self._channel_one_map]._prescaler = 0
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.START_ALTERNATE_ONE_CHAN_LA)
self._device.send_int(CP.MAX_SAMPLES // 4)
self._device.send_byte(
(self._channels[self._channel_one_map].number << 4)
| self._channels[self._channel_one_map]._logic_mode
)
self._device.send_byte(
(self._channels[self._channel_one_map].number << 4) | self._trigger_mode
)
self._device.get_ack()
def _capture_two(self):
for c in list(self._channels.values())[:2]:
c._prescaler = 0
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.START_TWO_CHAN_LA)
self._device.send_int(CP.MAX_SAMPLES // 4)
self._device.send_byte((self._trigger_channel.number << 4) | self._trigger_mode)
self._device.send_byte(
self._channels[self._channel_one_map]._logic_mode
| (self._channels[self._channel_two_map]._logic_mode << 4)
)
self._device.send_byte(
self._channels[self._channel_one_map].number
| (self._channels[self._channel_two_map].number << 4)
)
self._device.get_ack()
def _capture_four(self, e2e_time: float):
rollover_time = (2**16 - 1) / CP.CLOCK_RATE
e2e_time = 0 if e2e_time is None else e2e_time
if e2e_time > rollover_time * self._PRESCALERS[3]:
raise ValueError("Timegap too big for four channel mode.")
elif e2e_time > rollover_time * self._PRESCALERS[2]:
self._prescaler = 3
elif e2e_time > rollover_time * self._PRESCALERS[1]:
self._prescaler = 2
elif e2e_time > rollover_time:
self._prescaler = 1
else:
self._prescaler = 0
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.START_FOUR_CHAN_LA)
self._device.send_int(CP.MAX_SAMPLES // 4)
self._device.send_int(
self._channels["LA1"]._logic_mode
| (self._channels["LA2"]._logic_mode << 4)
| (self._channels["LA3"]._logic_mode << 8)
| (self._channels["LA4"]._logic_mode << 12)
)
self._device.send_byte(self._prescaler)
try:
trigger = {
0: 4,
1: 8,
2: 16,
}[self._trigger_channel.number] | self._trigger_mode
except KeyError:
e = "Triggering is only possible on LA1, LA2, or LA3."
raise NotImplementedError(e)
self._device.send_byte(trigger)
self._device.get_ack()
def fetch_data(self) -> List[np.ndarray]:
"""Collect captured logic events.
It is possible to call fetch_data while the capture routine is still running.
Doing so will not interrupt the capture process. In multi-channel mode, the
number of timestamps may differ between channels when fetch_data is called
before the capture is complete.
Returns
-------
data : list of numpy.ndarray
List of numpy.ndarrays holding timestamps in microseconds when logic events
were detected. The length of the list is equal to the number of channels
that were used to capture events, and each list element corresponds to a
channel.
"""
counter_values = []
channels = list(
OrderedDict.fromkeys(
[self._channel_one_map, self._channel_two_map, "LA3", "LA4"]
)
)
for c in channels:
c = self._channels[c]
if c.events_in_buffer:
if c.datatype == "long":
raw_timestamps = self._fetch_long(c)
else:
raw_timestamps = self._fetch_int(c)
counter_values.append(raw_timestamps)
prescaler = [1 / 64, 1 / 8, 1.0, 4.0][self._prescaler]
timestamps = []
capture_delay = self._CAPTURE_DELAY if self._prescaler == 0 else 0
for e, cv in enumerate(counter_values):
adjusted_counter = cv + e * capture_delay
timestamps.append(adjusted_counter * prescaler)
return timestamps
def _fetch_long(self, channel: DigitalInput) -> np.ndarray:
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.FETCH_LONG_DMA_DATA)
self._device.send_int(CP.MAX_SAMPLES // 4)
self._device.send_byte(channel.buffer_idx // 5000)
raw = self._device.read(CP.MAX_SAMPLES)
self._device.get_ack()
raw_timestamps = [
CP.Integer.unpack(raw[a * 4 : a * 4 + 4])[0]
for a in range(CP.MAX_SAMPLES // 4)
]
raw_timestamps = np.array(raw_timestamps)
raw_timestamps = np.trim_zeros(raw_timestamps, "b")
pretrim = len(raw_timestamps)
raw_timestamps = np.trim_zeros(raw_timestamps, "f")
self._trimmed = pretrim - len(raw_timestamps)
return raw_timestamps
def _fetch_int(self, channel: DigitalInput) -> np.ndarray:
raw_timestamps = self.fetch_buffer(CP.MAX_SAMPLES // 4, channel.buffer_idx)
raw_timestamps = np.array(raw_timestamps)
raw_timestamps = np.trim_zeros(raw_timestamps, "b")
pretrim = len(raw_timestamps)
raw_timestamps = np.trim_zeros(raw_timestamps, "f")
self._trimmed = pretrim - len(raw_timestamps)
for i, diff in enumerate(np.diff(raw_timestamps)):
if diff <= 0: # Counter has rolled over.
raw_timestamps[i + 1 :] += 2**16 - 1
return raw_timestamps
def get_progress(self) -> int:
"""Return the number of captured events per channel held in the buffer.
Returns
-------
progress : int
Number of events held in buffer. If multiple channels have events
in buffer, the lowest value will be returned.
"""
active_channels = []
a = 0
for c in self._channels.values():
if c.events_in_buffer:
active_channels.append(a * (1 if c.datatype == "int" else 2))
a += 1
p = CP.MAX_SAMPLES // 4
progress = self._get_initial_states_and_progress()[0]
for a in active_channels:
p = min(progress[a], p)
return p
def get_initial_states(self) -> Dict[str, bool]:
"""Return the initial state of each digital input at the beginning of capture.
Returns
-------
dict of four str: bool pairs
Dictionary containing pairs of channel names and the corresponding initial
state, e.g. {'LA1': True, 'LA2': True, 'LA3': True, 'LA4': False}.
True means HIGH, False means LOW.
"""
before, after = self._get_initial_states_and_progress()[1:]
initial_states = {
"LA1": (before & 1 != 0),
"LA2": (before & 2 != 0),
"LA3": (before & 4 != 0),
"LA4": (before & 8 != 0),
}
if before != after:
disagreements = before ^ after
uncertain_states = [
disagreements & 1 != 0,
disagreements & 2 != 0,
disagreements & 4 != 0,
disagreements & 8 != 0,
]
timestamps = self.fetch_data()
if len(timestamps) == 1:
# One channel mode does not record states after start.
return initial_states
channels = ["LA1", "LA2", "LA3", "LA4"]
for i, state in enumerate(uncertain_states[: len(timestamps)]):
if state:
if timestamps[i][0] > 0.1: # µs
# States captured immediately after start are usually
# better than immediately before, except when an event
# was captures within ~100 ns of start.
initial_states[channels[i]] = not initial_states[channels[i]]
return initial_states
def get_xy(
self, timestamps: List[np.ndarray], initial_states: Dict[str, bool] = None
) -> List[np.ndarray]:
"""Turn timestamps into plottable data.
Parameters
----------
timestamps : list of numpy.ndarray
List of timestamps as returned by :meth:`capture` or
:meth:`fetch_data`.
initial_states : dict of str, bool
Initial states of digital inputs at beginning of capture, as
returned by :meth:`get_initial_states`. If no additional capture
calls have been issued before calling :meth:`get_xy`, this can be
omitted.
Returns
-------
list of numpy.ndarray
List of x, y pairs suitable for plotting using, for example,
matplotlib.pyplot.plot. One pair of x and y values is returned for
each list of timestamps given as input.
"""
xy = []
initial_states = (
initial_states if initial_states is not None else self.get_initial_states()
)
for e, c in enumerate(
[self._channel_one_map, self._channel_two_map, "LA3", "LA4"][
: len(timestamps)
]
):
c = self._channels[c]
if c.events_in_buffer:
x, y = c._get_xy(initial_states[c.name], timestamps[e])
xy.append(x)
xy.append(y)
return xy
def _get_initial_states_and_progress(self) -> Tuple[int, int, int]:
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.GET_INITIAL_DIGITAL_STATES)
initial = self._device.get_int()
progress = [0, 0, 0, 0]
progress[0] = (self._device.get_int() - initial) // 2
progress[1] = (self._device.get_int() - initial) // 2 - CP.MAX_SAMPLES // 4
progress[2] = (self._device.get_int() - initial) // 2 - 2 * CP.MAX_SAMPLES // 4
progress[3] = (self._device.get_int() - initial) // 2 - 3 * CP.MAX_SAMPLES // 4
states_immediately_before_start = self._device.get_byte()
states_immediately_after_start = self._device.get_byte()
self._device.get_ack()
for e, i in enumerate(progress):
if i == 0:
progress[e] = CP.MAX_SAMPLES // 4
elif i < 0:
progress[e] = 0
return progress, states_immediately_before_start, states_immediately_after_start
def configure_trigger(self, trigger_channel: str, trigger_mode: str):
"""Set up trigger channel and trigger condition.
Parameters
----------
trigger_channel : {"LA1", "LA2", "LA3", "LA4"}
The digital input on which to trigger.
trigger_mode : {"disabled", "falling", "rising"}
The type of logic level change on which to trigger.
"""
self.trigger_channel = trigger_channel
self._trigger_channel = self._channels[trigger_channel]
self.trigger_mode = trigger_mode
def _configure_trigger(self, channels: int):
# For some reason firmware uses different values for trigger_mode
# depending on number of channels.
if channels == 1:
self._trigger_mode = {
"disabled": 0,
"any": 1,
"falling": 2,
"rising": 3,
"four rising": 4,
"sixteen rising": 5,
}[self.trigger_mode]
elif channels == 2:
self._trigger_mode = {
"disabled": 0,
"falling": 3,
"rising": 1,
}[self.trigger_mode]
elif channels == 4:
self._trigger_mode = {
"disabled": 0,
"falling": 1,
"rising": 3,
}[self.trigger_mode]
def stop(self):
"""Stop a running :meth:`capture` function."""
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.STOP_LA)
self._device.get_ack()
def get_states(self) -> Dict[str, bool]:
"""Return the current state of the digital inputs.
Returns
-------
dict of four str: bool pairs
Dictionary containing pairs of channel names and the corresponding
current state, e.g. {'LA1': True, 'LA2': True, 'LA3': True,
'LA4': False}. True means HIGH, False means LOW.
"""
self._device.send_byte(CP.DIN)
self._device.send_byte(CP.GET_STATES)
s = self._device.get_byte()
self._device.get_ack()
return {
"LA1": (s & 1 != 0),
"LA2": (s & 2 != 0),
"LA3": (s & 4 != 0),
"LA4": (s & 8 != 0),
}
def count_pulses(
self, channel: str = "FRQ", interval: float = 1, block: bool = True
) -> Union[int, None]:
"""Count pulses on a digital input.
The counter is 16 bits, so it will roll over after 65535 pulses. This
method can be used at the same time as the oscilloscope.
Parameters
----------
channel : {"LA1", "LA2", "LA3", "LA4", "FRQ"}, optional
Digital input on which to count pulses. The default value is "FRQ".
interval : float, optional
Time in seconds during which to count pulses. The default value is
1 second.
block : bool, optional
Whether to block while counting pulses or not. If False, this
method will return None, and the pulses must be manually fetched
using :meth:`fetch_pulse_count`. Additionally, the interval
argument has no meaning if block is False; the counter will keep
counting even after the interval time has expired. The default
value is True.
Returns
-------
Union[int, None]
Number of pulses counted during the interval, or None if block is
False.
"""
self._reset_prescaler()
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.START_COUNTING)
self._device.send_byte(self._channels[channel].number)
self._device.get_ack()
if block:
time.sleep(interval)
else:
return
return self.fetch_pulse_count()
def fetch_pulse_count(self) -> int:
"""Return the number of pulses counted since calling :meth:`count_pulses`.
Returns
-------
int
Number of pulses counted since calling :meth:`count_pulses`.
"""
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.FETCH_COUNT)
count = self._device.get_int()
self._device.get_ack()
return count
def _reset_prescaler(self):
self._device.send_byte(CP.TIMING)
self._device.send_byte(CP.START_FOUR_CHAN_LA)
self._device.send_int(0)
self._device.send_int(0)
self._device.send_byte(0)
self._device.send_byte(0)
self._device.get_ack()
self.stop()
self._prescaler = 0
def _invalidate_buffer(self):
for c in self._channels.values():
c.events_in_buffer = 0
c.buffer_idx = None