import sys, csv SPACE = 0 MARK = 1 class StreamWithPush: def __init__(self, data): self.data = data self.buffer = [] def __iter__(self): return self def next(self): if len(self.buffer) == 0: return self.data.next() else: x = self.buffer[-1] del self.buffer[-1] return x def push(self, x): self.buffer.append(x) def push_l(self, l): self.buffer.extend(l) class Signal: def __init__(self, frequency, duty_cycle, data): self.frequency = frequency self.duty_cycle = duty_cycle self.data = data def __str__(self): s = 'On time (periods)\tOff time (periods)\n' tm = 0 assert len(self.data) % 2 == 1, 'Uh oh.' for i in range(len(self.data) / 2): s += '%.3fms (%.1f)\t\t%.3fms (%.1f)\n' % (self.data[2*i][1] * 1000, self.data[2*i][1] * self.frequency, self.data[2*i+1][1] * 1000, self.data[2*i+1][1] * self.frequency) tm += self.data[2*i][1] + self.data[2*i+1][1] s += '%.3fms (%.1f)' % (self.data[-1][1] * 1000, self.data[-1][1] * self.frequency) return ('%.2fkHz, %.3fms, %d%%\n' % (self.frequency * 0.001, tm * 1000, int(self.duty_cycle * 100))) + s def get_freq_dcycle(mark_data, sample_tm_ns): # TODO Check mark_data is consistent, i.e. all same frequency and duty cycle (or even same shape). tot = on = 0 for (h, l) in mark_data: tot += h + l on += h return (len(mark_data) / (tot * sample_tm_ns * 1e-9), float(on) / tot) def get_signals(pattern, sample_tm_ns): NEW_SIG_TM = 0.38 # I measure a gap of ~391ms between signals. TODO learn this from the data. signals = [] i_pattern = iter(pattern) done = False while not done: signal_data = [] mark_data = [] l_mark = 0 while True: try: packet = i_pattern.next() except StopIteration: signal_data.append((MARK, l_mark * sample_tm_ns * 1e-9)) done = True break if packet[0] == SPACE: signal_data.append((MARK, l_mark * sample_tm_ns * 1e-9)) l_mark = 0 if packet[1] * sample_tm_ns * 1e-9 >= NEW_SIG_TM: # End of this signal. break else: signal_data.append((SPACE, packet[1] * sample_tm_ns * 1e-9)) else: # packet[0] == MARK l_mark += packet[1] + packet[2] mark_data.append((packet[1], packet[2])) (frequency, duty_cycle) = get_freq_dcycle(mark_data, sample_tm_ns) signals.append(Signal(frequency, duty_cycle, signal_data)) return signals def markate(csv_data, channel): pattern = [] marking = False l_not_marking = l_last_period = 0 slippage = 15 # samples. sample_tm_ns = 333.333333333 # nanoseconds between samples. TODO learn this from data and verify all samples consistent. for row in csv_data: (tm, v) = (int(''.join(row[0].split('.'))), row[channel]) if not marking: if v == '0': l_not_marking += 1 else: marking = True pattern.append((SPACE, l_not_marking)) l_not_marking = l_last_period = 0 csv_data.push(row) else: # marking == True if v == '1': for (n_ones, row) in enumerate(csv_data): if row[channel] != '1': break n_ones += 1 buffer = [] for (n_zeros, row) in enumerate(csv_data): if row[channel] != '0': csv_data.push(row) break if l_last_period > 0: d = n_ones + n_zeros + 2 - l_last_period if d > 0: buffer.append(row) if d > slippage: csv_data.push_l(buffer) n_zeros -= slippage marking = False break n_zeros += 1 pattern.append((MARK, n_ones, n_zeros)) l_last_period = n_ones + n_zeros else: # v == '0' sys.stderr.write('Uh oh\n') # Shouldn't happen and causes us to lose data. marking = False csv_data.push(row) return (sample_tm_ns, pattern[1:]) # Exclude first element which is just ('SPACE', 0). def main(argv=None): if argv is None: argv = sys.argv if len(argv) != 3: sys.stderr.write('Usage: %s \n' % argv[0]) sys.exit(1) csv_data = StreamWithPush(csv.reader(open(argv[1]))) csv_data.next() # Field headings. (sample_tm_ns, pattern) = markate(csv_data, int(argv[2]) + 1) signals = get_signals(pattern, sample_tm_ns) for (i, sig) in enumerate(signals): print 'Signal %d : %s\n\n' % (i, sig) if __name__ == '__main__': sys.exit(main())