import numpy as np import pandas as pd import matplotlib.pyplot as plt import re import telnetlib3 import csv import time Z_table = [ [ 0.00007362 , 0.00007713 ], [ 0.00004428 , 0.00013248 ], [ 0.00007899 , 0.0002094 ], [ 0.00019893 , 0.00025827 ], [ 0.00039363 , 0.00020892 ], [ 0.00047154 , 0.00012468 ], [ 0.00050655 , 0.00009711 ], [ 0.00051549 , 0.00004863 ], [ 0.00051693 , 0.00003609 ], [ 0.00052323 , 0.00003429 ], [ 0.00053727 , 0.00004557 ], [ 0.00055146 , 0.00006702 ], [ 0.00058626 , 0.00009252 ], [ 0.00066285 , 0.00008859 ], [ 0.00066801 , 0.00004833 ], [ 0.00068286 , 0.00005697 ], [ 0.00073578 , 0.00000705 ], [ 0.0005028 , -0.00007668 ] ] Z2_table = [ #[ 0.000141451 , -0.001248057 ], [ 0.000482934 , -0.000453644 ], [ 0.000522226 , -0.000231466 ], [ 0.000532942 , -0.000115733 ], [ 0.000533657 , -5.21512E-05 ], [ 0.000528656 , -0.000028576 ], [ 0.000525084 , -1.28592E-05 ], [ 0.000522941 , 7.144E-07 ], [ 0.000527227 , 1.00016E-05 ], [ 0.000532228 , 1.85744E-05 ], [ 0.000540801 , 3.35768E-05 ], [ 0.000557232 , 5.85808E-05 ], [ 0.00059581 , 8.50136E-05 ], [ 0.000670822 , 8.71568E-05 ], [ 0.000706542 , 0.000060724 ] ] data_rows = [] # global 2-D list state = { "remaining_receive_lines": 0, "freq": 10000.0, "freq_step_multiply": 0.92, "stop_freq": 0.1, "initializing": 1 } PATTERN = re.compile( r"Va=(?P-?\d+\.?\d*)\s+" r"Vp=(?P-?\d+\.?\d*)\s+\|\s+" r"Ia=(?P-?\d+\.?\d*)\s+" r"Ip=(?P-?\d+\.?\d*)\s+\|\s+" r"ZR=(?P-?\d+\.?\d*)\s+" r"ZX=(?P-?\d+\.?\d*)" ) def dump_csv(filename="data.csv"): df = pd.DataFrame( data_rows, columns=["Freq", "Va", "Vp", "Ia", "Ip", "ZR", "ZX"] ) df.to_csv(filename, index=False) def append_line_to_file(column_nr, filename): # Extract freq column (index 0) # Extract ZR column (index 5) # Extract ZX column (index 6) zr_values = [row[column_nr] for row in data_rows if len(row) > column_nr] if not zr_values: print("No ZR data to write") return with open(filename, "a", newline="") as f: writer = csv.writer(f) writer.writerow(zr_values) def extract_to_dataframe(line): # Extract Va, Vp, Ia, Ip, ZR, ZX from a line and append them as a row to the global data_rows list. global state match = PATTERN.search(line) if not match: return # silently ignore malformed lines row = [ float(state["freq"]), float(match.group("Va")), float(match.group("Vp")), float(match.group("Ia")), float(match.group("Ip")), float(match.group("ZR")), float(match.group("ZX")), ] data_rows.append(row) def process_line(line): extract_to_dataframe(line) # Example: print last row if data_rows: print("Last row:", data_rows[-1]) def get_dataframe(): return pd.DataFrame( data_rows, columns=["Va", "Vp", "Ia", "Ip", "ZR", "ZX"] ) class TelnetReader: def __init__(self, host, port=23, timeout=10): self.host = host self.port = port self.timeout = timeout self.tn = None def connect(self): print(f"Connecting to {self.host}:{self.port} ...") self.tn = telnetlib3.Telnet(self.host, self.port, self.timeout) print("Connected.") def disconnect(self): if self.tn: self.tn.close() self.tn = None print("Disconnected.") def read_loop(self): global state # Main loop: reads incoming lines forever try: while state["freq"] > state["stop_freq"]: line = self.tn.read_until(b"\n") if not line: break decoded = line.decode("utf-8", errors="ignore").strip() self.process_line(decoded) except KeyboardInterrupt: print("Interrupted by user.") print(data_rows) dump_csv("measurements.csv") def process_line(self, line): global state # Override or extend this method to extract data. # print(f"RAW: {line}") if state["remaining_receive_lines"] > 0 : state["remaining_receive_lines"] -= 1 # we are waiting for settling - just skip the line else: # prepare new frequency measurement if not state["initializing"] == 1: extract_to_dataframe(line) # capture the measurement if state["freq"] > 0.7: state["freq"] *= state["freq_step_multiply"] # calc next freq else: if state["freq"] > 0.4: state["freq"] *= state["freq_step_multiply"] ** 2 # skip 1/2 steps to speed up else: state["freq"] *= state["freq_step_multiply"] ** 4 # skip 3/4 steps to speed up if state["freq"] <= state["stop_freq"]: print("Reached stop frequency") else: if state["freq"] > 1: response = f"\rf{state["freq"]:.1f}\r" # new freq else: response = f"\rf{state["freq"]:.3f}\r" # new freq self.tn.write(response.encode("utf-8")) bandwidth = state["freq"]/10 if bandwidth > 1 : bandwidth = 1 response = f"b{bandwidth:.3f}\r" # new freq self.tn.write(response.encode("utf-8")) state["remaining_receive_lines"] = 1 + 3/bandwidth print(line) print(state) state["initializing"] = 0 # Example: # value = self.extract_value(line) # self.store_value(value) # Example placeholder methods def extract_value(self, line): pass def store_value(self, value): pass if __name__ == "__main__": reader = TelnetReader(host="10.1.122.152", port=2002) try: reader.connect() reader.read_loop() finally: reader.disconnect() Z_points = [complex(r, i) for r, i in Z_table] Zp = np.array(Z_points) Z2_points = [complex(r, i) for r, i in Z2_table] Z2p = np.array(Z2_points) plt.plot( Zp.real, Zp.imag, marker='x', color='blue', label="shunt-300u" ) plt.plot( Z2p.real, Z2p.imag, marker='x', color='red', label="clamp" ) plt.xlabel("Re{Z} [Ω]") plt.ylabel("-Im{Z} [Ω]") plt.title("Nyquist Plot") plt.grid(True) plt.legend() plt.show()