#!/usr/bin/python3 """Parse status data from the Sunpower PVS6 and turn it into Influx format, for use with Telegraf. PVS6 data comes from its endpoint /cgi-bin/dl_cgi?Command=DeviceList' The PVS6 itself can be the server for this, or a Raspberry Pi or something running as a proxy. Output contains lots of data from the Sunpower mointors. Computer status, production and consumption wattages etc (as measured by CT sensors), also power data from each individual inverter. Also produces some summary statistics rolling up info about all the inverters. By Nelson Minar . I have no intent to support this or turn it into a resuable product. """ import json, sys, os.path, re, statistics, urllib.request, time _PVS6_URL = 'http://192.168.3.140/cgi-bin/dl_cgi?Command=DeviceList' output = [] def extract_fields_tags(record, field_keys, tag_keys, ignore_keys): """Given a JSON blob, return three dicts. One containing all keys in field_keys, one for tag_keys, and one for things not in fields, tags, or ignore.""" field_keys = field_keys.split(',') tag_keys = tag_keys.split(',') ignore_keys = ignore_keys.split(',') fields = {k: v for k, v in record.items() if k in field_keys} tags = {k: v for k, v in record.items() if k in tag_keys} others = {k: v for k, v in record.items() if not (k in (field_keys + tag_keys + ignore_keys))} return fields, tags, others _strip_whitespace = re.compile(r'\s+') def _s(s): "Remove all whitespace from string" return _strip_whitespace.sub("_", s) def _v(v): "Turn the input into a string for influx format. Quotes strings, converts numbers" try: return float(v) except ValueError: pass return f'"{v}"' # Some rollup data populated by the parsing system summary = {} inverter_kws = [] inverter_temps = [] def init_summary(): global inverter_kws, inverter_temps, summary inverter_kws = [] inverter_temps = [] summary = { 'count_not_working': 0, 'count_not_working_inverters': 0, 'avg_inverter_kw': 0, 'sd_inverter_kw': 0, 'count_inverter_low_kw': 0, 'avg_inverter_temp': 0, 'sd_inverter_temp': 0, } def create_summary(): summary['avg_inverter_kw'] = statistics.fmean(inverter_kws) summary['sd_inverter_kw'] = statistics.pstdev(inverter_kws) summary['avg_inverter_temp'] = statistics.fmean(inverter_temps) summary['sd_inverter_temp'] = statistics.pstdev(inverter_temps) # Count "low" inverters mean_kw = summary['avg_inverter_kw'] if mean_kw < 0.02: # if no power is being generated then skip this check low = 0 else: # low means less than 80% of average low_factor = 0.8 low = sum([1 for x in inverter_kws if x < low_factor * summary['avg_inverter_kw']]) summary['count_inverter_low_kw'] = low return "pvs6_summary", summary, {}, {} def parse(data, timestamp): output = [] skipped = 0 init_summary() # Parse every record in the JSON input for record in data['devices']: device_type = record.get("DEVICE_TYPE") if device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-P': output.append(parse_production(record)) elif device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-C': output.append(parse_consumption(record)) elif device_type == 'Inverter': output.append(parse_inverter(record)) elif device_type == 'PVS': output.append(parse_pvs(record)) else: skipped += 1 if skipped > 0: sys.stderr.write(f'Skipped {skipped} records of unknown type.\n') # Make a rollup from the collected data output.append(create_summary()) # Emit the records in Influx format for measurement, fields, tags, others in output: if len(others) != 0: sys.stderr.write(f'Ignoring {len(others)} unknown data items in {measurement}: {others}\n') # Strip whitespace from tag names and values and format as k=v,k=v lists for Influx tag_set = ','.join(f"{_s(k)}={_s(v)}" for k, v in tags.items()) field_set = ','.join(f"{_s(k)}={_v(v)}" for k, v in fields.items()) # Format timestamp into nanoseconds ts = int(1_000_000_000 * timestamp) # Only print this delimeter if there are tags d = ',' if len(tag_set) > 0 else '' # Emit the Influx record print(f'{measurement}{d}{tag_set} {field_set} {ts}') def parse_pvs(record): fields, tags, others = extract_fields_tags( record, "STATE,SWVER,dl_comm_err,dl_cpu_load,dl_err_count,dl_flash_avail,dl_mem_used,dl_scan_time,dl_skipped_scans,dl_untransmitted,dl_uptime", "SERIAL", "CURTIME,DATATIME,DETAIL,DEVICE_TYPE,HWVER,MODEL,STATEDESCR,panid") if fields['STATE'] != 'working': summary['count_not_working'] += 1 return "pvs6_computer", fields, tags, others def parse_production(record): fields, tags, others = extract_fields_tags( record, "STATE,freq_hz,net_ltea_3phsum_kwh,p_3phsum_kw,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto", "SERIAL", "CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype" ) if fields['STATE'] != 'working': summary['count_not_working'] += 1 return "pvs6_production", fields, tags, others def parse_consumption(record): fields, tags, others = extract_fields_tags( record, "STATE,freq_hz,i1_a,i2_a,neg_ltea_3phsum_kwh,net_ltea_3phsum_kwh,p1_kw,p2_kw,p_3phsum_kw,pos_ltea_3phsum_kwh,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto,v12_v,v1n_v,v2n_v", "SERIAL", "CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype" ) if fields['STATE'] != 'working': summary['count_not_working'] += 1 return "pvs6_consumption", fields, tags, others def parse_inverter(record): fields, tags, others = extract_fields_tags( record, "STATE,freq_hz,i_3phsum_a,i_mppt1_a,ltea_3phsum_kwh,p_3phsum_kw,p_mppt1_kw,stat_ind,t_htsnk_degc,v_mppt1_v,vln_3phavg_v", "SERIAL", "CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,MOD_SN,NMPLT_SKU,OPERATION,PANEL,PORT,STATEDESCR,SWVER,TYPE,hw_version,interface,origin" ) if fields['STATE'] != 'working': summary['count_not_working'] += 1 summary['count_not_working_inverters'] += 1 inverter_kws.append(float(fields['p_3phsum_kw'])) inverter_temps.append(float(fields['t_htsnk_degc'])) return "pvs6_inverter", fields, tags, others def load_from_file(fn): ts = os.path.getmtime(fn) try: data = json.load(open(fn)) except: sys.stderr.write(f'Error parsing JSON file {fn}, skipping.\n') return parse(data, ts) def load_from_url(url): # PVS6 often takes 10-15 seconds to respond fp = urllib.request.urlopen(url, timeout=50) data = json.load(fp) parse(data, time.time()) if __name__ == '__main__': if len(sys.argv) > 1: for fn in sys.argv[1:]: load_from_file(fn) else: load_from_url(_PVS6_URL)