-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathosc_class.py
More file actions
102 lines (94 loc) · 4.2 KB
/
osc_class.py
File metadata and controls
102 lines (94 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from moku.instruments import Oscilloscope
import pandas as pd
import numpy as np
from scipy import signal
class osc():
def __init__(self, ip_address, timebase, input_ch):
self.ip_address = ip_address
self.timebase = timebase
self.input_ch = input_ch
#8928571.42857143 # in Hz
self.obj = Oscilloscope(self.ip_address, force_connect = True)
#t = self.obj.get_data()['time']
#self.fs = 1/(t[1] - t[0])
self.osc_input_collector = []
self.osc_time_collector = []
self.c = 0
def config(self):
# Trigger on input Channel 1, auto, 0V
self.obj.set_trigger(mode='Auto', level=0, source='Input1')
# Set timebase
self.obj.set_timebase(self.timebase[0], self.timebase[1])
# Set acquisition mode
self.obj.set_acquisition_mode(mode="Precision")
# Set source
self.obj.set_source(self.input_ch, 'Input1')
# Enable rollmode
self.obj.enable_rollmode(roll=False)
# set Interpolation
self.obj.set_interpolation(interpolation='Linear')
def get_osc_input(self, append, dith_freq, filtered):
output = self.get_one_ch_data()
print("Output" + str(len(output)))
data = output['ch'].values.tolist()
for item in data:
self.osc_input_collector.append(item)
# if len(self.osc_input_collector) <= 30720:
if len(self.osc_input_collector) > 30720:
del self.osc_input_collector[0:1024]
del self.osc_time_collector[0:1024]
osc_input = np.reshape(np.array(self.osc_input_collector), (-1,1))
self.c = self.c+1
n_start = self.c*1024
n_axis_new = np.arange(n_start, n_start+1024)
print(np.shape(n_axis_new))
#n_axis = np.arange(len(self.osc_input_collector))
print(self.obj.get_samplerate()['sample_rate'])
t_axis_new = np.reshape(n_axis_new/self.obj.get_samplerate()['sample_rate'], (-1,1)) # in s
print(np.shape(t_axis_new))
for item in t_axis_new:
self.osc_time_collector.append(item)
print(len(self.osc_time_collector))
t_axis = np.reshape(np.array(self.osc_time_collector), (-1,1))
print(np.shape(t_axis))
print(self.obj.get_samplerate())
#t_axis = np.reshape(n_axis/self.obj.get_samplerate()['sample_rate'], (-1,1)) # in s
# osc_input = np.reshape(np.array(self.osc_input_collector), (-1,1))
# self.c = self.c+1
# n_start = self.c*1024
# n_axis = np.arange(n_start, n_start+30720)
# t_axis = np.reshape(n_axis/self.obj.get_samplerate()['sample_rate'], (-1,1)) # in s
print(len(self.osc_input_collector))
print(np.shape(t_axis))
print(np.shape(osc_input))
osc_input = np.concatenate((t_axis, osc_input), axis=1)
output = pd.DataFrame(osc_input, columns=['time', 'ch'])
if filtered:
myfilter = signal.firwin(1000, dith_freq*2, fs=self.obj.get_samplerate()['sample_rate'])
output['ch'] = signal.lfilter(myfilter, [1.0], output['ch'])
# Get mean of input
sig_mean = np.mean(output['ch'][-1024:]) * np.ones((1024, 1))
sig_mean = np.concatenate((np.reshape(np.array(output['time'].iloc[-1024:]), (-1,1)), sig_mean), axis=1)
sig_mean = pd.DataFrame(sig_mean, columns=['time', 'ch'])
if append == False:
output_short = output.iloc[-1024:, :]
output_short = output_short.reset_index(drop=True, inplace=False)
return output_short, sig_mean
else:
return output, sig_mean
def get_one_ch_data(self):
data = self.obj.get_data()
self.fs = 1/(data['time'][1] - data['time'][0])
if self.input_ch:
signal_df = pd.DataFrame(data).iloc[:,[0,1]]
signal_df = signal_df.rename({'ch1': 'ch'}, axis=1)
elif self.input_ch == 2:
signal_df = pd.DataFrame(data).iloc[:,[0,2]]
signal_df = signal_df.rename({'ch2': 'ch'}, axis=1)
return signal_df
def clear_collector(self):
self.osc_input_collector.clear()
self.osc_time_collector.clear()
self.c = 0
def stop(self):
self.obj.disable_input(channel=self.input_ch)