##################### # # # TimeSeries.py # # 03/2023 # # ENIB/ZG2 # # G. desmeulles # # # ##################### import copy import csv import matplotlib.pyplot as plt class TimeSeries : pass #constructor def create(filename=None,time_stamp_column_number=0): ts=TimeSeries() ts.data=[] ts.labels=[] if filename!=None: with open(filename, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=',', quotechar='"') for row in spamreader: ts.data.append(row) ts.labels=ts.data.pop(0) ts.xlabel=ts.labels[0] if time_stamp_column_number: swap_column(ts,0,time_stamp_column_number) #str to float for i in range(len(ts.data)): for j in range(len(ts.data[i])): ts.data[i][j]=float(ts.data[i][j]) return ts #accessor/mutators def get_data(ts): return copy.deepcopy(ts.data) #deepcopy def get_labels(ts): return ts.labels[:] def get_value(ts,index,column_number): return ts.data[index][column_number] def get_size(ts): return len(ts.data) def get_nb_column(ts): return len(ts.data[0]) def set_data(ts,data): ts.data=copy.deepcopy(data) #deepcopy def set_labels(ts,labels): ts.labels=labels[:] def set_label(ts,column_number,label): ts.labels[column_number]=label def set_value(ts,index,column_number,value): ts.data[index][column_number]=value #operations def swap_column(ts,n1,n2): ''' swap two column in ts''' for row in ts.data: row.insert(n1,row.pop(n2)) ts.labels.insert(n1,ts.labels.pop(n2)) def plot(ts,x_label=None,y_label="",title="",filename=None, show=True): '''trace each curve with matplotlib''' if x_label==None: x_label=ts.labels[0] nb_columns=len(ts.data[0]) columns=[[] for x in range(nb_columns)] for i in range(0,len(ts.data)): for j in range(nb_columns) : columns[j].append(float(ts.data[i][j])) fig, ax = plt.subplots() for i in range(nb_columns-1): ax.plot(columns[0], columns[i+1], label=ts.labels[i+1]) ax.set(xlabel=x_label, ylabel=y_label, title=title) ax.legend() if show: plt.show() if filename: fig.savefig(filename) def dump(ts,filename): '''save ts in a file''' with open(filename, 'w', newline='') as csvfile: spamwriter = csv.writer(csvfile, delimiter=',',) spamwriter.writerow(ts.labels) for line in ts.data: output = [] for val in line: output.append(str(val)) spamwriter.writerow(output) def compute_period(ts, column_number): max_index=[] up=None for i in range(1,len(ts.data)-1): if ts.data[i][column_number]>ts.data[i-1][column_number] and ts.data[i][column_number]>ts.data[i+1][column_number]: max_index.append(i) periods=[] if len(max_index)>1: for i in range(1,len(max_index)): t1 = ts.data[max_index[i-1]][0] t2 = ts.data[max_index[i]][0] periods.append(t2-t1) #print (periods) return (sum(periods) / len(periods)) #Nouvelle fonction def clone(ts_in): '''clone ts''' ts_out=create() ts_out.data=get_data(ts_in) ts_out.labels=get_labels(ts_in) return ts_out def round_data(ts, column_number,precision): ''' round data with a certain precision in a column''' for i in range(len(ts.data)): ts.data[i][column_number]=round(ts.data[i][column_number],precision) def fuse(ts1,ts2): '''fuse two ts into a third one''' #column 1 is timestamp in all ts ts_out = create() ts_out.labels = ts1.labels+ts2.labels[1:] d1 = get_data(ts1) d2 = get_data(ts2) #list to build output data missing_data_1 = ['missing' for x in range(len(d1[0])-1)] missing_data_2 = ['missing' for x in range(len(d2[0])-1)] #insert value terminal value max_time=d1[len(d1)-1][0]+d2[len(d2)-1][0] d1.append([max_time]+missing_data_1) d2.append([max_time]+missing_data_1) i1=0 i2=0 data=[] while i1<(len(d1)-1) or i2<(len(d2)-1): dt_value = d2[i2][0] - d1[i1][0] if dt_value == 0: #2 samples at the same time stamp data.append([d1[i1][0]]+d1[i1][1:]+d2[i2][1:]) i1+=1 i2+=1 elif dt_value>0: #next value is in d1 data.append([d1[i1][0]]+d1[i1][1:]+missing_data_2) i1+=1 else : #next value is in d2 data.append([d2[i2][0]]+missing_data_1+d2[i2][1:]) i2+=1 ts_out.data=data return ts_out def interpolate_missing_data(ts): '''fill missing data in a ts''' data=ts.data for column_number in range(1,len(data[0])): #for each column #fill first values: if data[0][column_number]=='missing': #find first value i=1 while data[i][column_number]=='missing': i+=1 first_value=data[i][column_number] while i>=0: data[i][column_number]=first_value i-=1 #fill last values if data[len(data)-1][column_number]=='missing': #find first value i=len(data)-2 while data[i][column_number]=='missing': i-=1 last_value=data[i][column_number] while i<=len(data)-1: data[i][column_number]=last_value i+=1 #interpolate values for raw_number in range(1,len(data)): if data[raw_number][column_number]=='missing': t_missing=data[raw_number][0] missing_value=None #find previous value: i=raw_number-1 while data[i][column_number]=='missing': i-=1 t_prev=data[i][0] previous_value=data[i][column_number] #find next value: i=raw_number+1 while data[i][column_number]=='missing': i+=1 t_next=data[i][0] next_value=data[i][column_number] #interpolate #slope of line m = (next_value - previous_value )/(t_next - t_prev) missing_value= m * (t_missing-t_prev) + previous_value data[raw_number][column_number]=missing_value def shift(ts,time_shift): '''shift ts by time_shift''' for i in range(len(ts.data)): ts.data[i][0]+=time_shift def truncate(ts, t_min, t_max): '''truncate data with timestamps < t_min and > t_max''' for i in range(len(ts.data),0): if ts.data[i][0]t_max: ts.data.pop(i) def compute_error(ts, column_number_1,column_number_2,error_label="error curve (rad/s)"): '''build error curve''' for i in range(len(ts.data)): ts.data[i].append(ts.data[i][column_number_2]-ts.data[i][column_number_1]) ts.labels.append(error_label) def compute_area(ts,column_number): '''integrate data''' area=0 for i in range(1,len(ts.data)-1): d_t=(ts.data[i+1][0]-ts.data[i-1][0])/2. value=abs(ts.data[i][column_number]) d_area=d_t*value area+=d_area return area def pop(ts,column_number): '''remove column in ts''' for i in range(len(ts.data)): ts.data[i].pop(column_number) ts.labels.pop(column_number) def remove_outliers(ts,column_number): for i in range(1,len(ts.data)-1): rate = ts.data[i][column_number]/ts.data[i+1][column_number] if rate > 5.0 or rate<0.2: ts.data[i][column_number]='missing' def apply_to_column(ts,column_number,f): for i in range(len(ts.data)): ts.data[i][column_number]=f(ts.data[i][column_number]) if __name__=='__main__': #ZG2 semaine 1 def test_1(): ts = create('HCSR04_data4_ressort_2022_03_10.csv',3) print(ts) plot(ts,y_label='distance(mm)',title='Courbe ZG2!!',filename='test.png') #ZG2 semaine 2 def test_2(): ts = create('HCSR04_data4_ressort_2022_03_10.csv',3) dump(ts,'test.csv') #ZG2 semaine 3 def test_3(): ts1 = create('ts1.csv') ts2 = create('ts2.csv') round_data(ts1,0,5) round_data(ts2,0,5) dump(ts1,'test1.csv') dump(ts2,'test2.csv') ts3=fuse(ts1,ts2) dump(ts3,'test3.csv') #print("\nts3:") #print(ts3) interpolate_missing_data(ts3) plot(ts3) print('test TimeSeries.py') #test_1() #test_2() test_3()