Skip to content

Instantly share code, notes, and snippets.

@NelsonMinar
Created January 20, 2022 18:59
Show Gist options
  • Save NelsonMinar/37208cc6e33e76a6ee63cf0a9d0b4b71 to your computer and use it in GitHub Desktop.
Save NelsonMinar/37208cc6e33e76a6ee63cf0a9d0b4b71 to your computer and use it in GitHub Desktop.

Revisions

  1. NelsonMinar created this gist Jan 20, 2022.
    7 changes: 7 additions & 0 deletions README.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    Data imports for a custom solar monitoring system I build using Sunpower's PVS6, Telegraf, Influx, and Grafana.

    See http://nelsonslog.wordpress.com/ for details

    This code is provided without license or support. If you do something interested and related let me know!

    [email protected]
    26 changes: 26 additions & 0 deletions forecast-to-influx.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,26 @@
    #!/usr/bin/env python3
    """Get a solar forecast from forecast.solar and emit it in Influx format
    Nelson Minar <[email protected]>
    """

    import json, sys, urllib.request
    import pendulum

    forecast_url = 'https://api.forecast.solar/estimate/watts/39.2/-121.1/35/70/11.5'


    def parse(fp):
    data = json.load(fp)
    tz_name = data['message']['info']['timezone']
    for date, watts in data.get('result', {}).items():
    dt = pendulum.parse(date, tz=tz_name)
    ts = dt.int_timestamp * 1_000_000_000
    kw = watts / 1000.0
    print(f'forecast_dot_solar kw={kw:.4} {ts}')


    if __name__ == '__main__':
    if len(sys.argv) == 1:
    parse(urllib.request.urlopen(forecast_url, timeout=120))
    for fn in sys.argv[1:]:
    parse(open(fn))
    28 changes: 28 additions & 0 deletions pge-to-influx.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,28 @@
    #!/usr/bin/env python3
    """Import CSV Green Button usage files from PG&E and emit them in Influx format
    Nelson Minar <[email protected]>
    """

    import csv, sys, gzip
    import pendulum


    def parse(fn: str):
    fp = gzip.open(fn, mode='rt', encoding='utf8') if fn.endswith('.gz') else open(fn)

    for _ in range(5):
    fp.readline()
    rows = csv.DictReader(fp)
    for row in rows:
    assert row['TYPE'] == 'Electric usage'
    assert row['UNITS'] == 'kWh'
    dt = pendulum.parse(f'{row["DATE"]} {row["END TIME"]}', tz='America/Los_Angeles')
    ts = dt.int_timestamp * 1_000_000_000
    kwh = row["USAGE"]
    cost = float(row["COST"].replace('$', ''))
    print(f'pge kwh={kwh},cost={cost:.4} {ts}')


    if __name__ == '__main__':
    for fn in sys.argv[1:]:
    parse(fn)
    203 changes: 203 additions & 0 deletions pvs6-to-influx.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,203 @@
    #!/usr/bin/python3
    """Parse status data from the Sunpower PVS6 and turn it into Influx format, for use with Telegraf.
    PVS6 data comes from its endpoint /cgi-bin/dl_cgi?Command=DeviceList'
    The PVS6 itself can be the server for this, or a Raspberry Pi or something running as a proxy.
    Output contains lots of data from the Sunpower mointors.
    Computer status, production and consumption wattages etc (as measured by CT sensors),
    also power data from each individual inverter.
    Also produces some summary statistics rolling up info about all the inverters.
    By Nelson Minar <[email protected]>.
    I have no intent to support this or turn it into a resuable product.
    """
    import json, sys, os.path, re, statistics, urllib.request, time

    _PVS6_URL = 'http://192.168.3.140/cgi-bin/dl_cgi?Command=DeviceList'

    output = []


    def extract_fields_tags(record, field_keys, tag_keys, ignore_keys):
    """Given a JSON blob, return three dicts.
    One containing all keys in field_keys, one for tag_keys, and one for things not in fields, tags, or ignore."""
    field_keys = field_keys.split(',')
    tag_keys = tag_keys.split(',')
    ignore_keys = ignore_keys.split(',')
    fields = {k: v for k, v in record.items() if k in field_keys}
    tags = {k: v for k, v in record.items() if k in tag_keys}
    others = {k: v for k, v in record.items() if not (k in (field_keys + tag_keys + ignore_keys))}

    return fields, tags, others


    _strip_whitespace = re.compile(r'\s+')


    def _s(s):
    "Remove all whitespace from string"
    return _strip_whitespace.sub("_", s)


    def _v(v):
    "Turn the input into a string for influx format. Quotes strings, converts numbers"
    try:
    return float(v)
    except ValueError:
    pass
    return f'"{v}"'


    # Some rollup data populated by the parsing system
    summary = {}
    inverter_kws = []
    inverter_temps = []


    def init_summary():
    global inverter_kws, inverter_temps, summary
    inverter_kws = []
    inverter_temps = []
    summary = {
    'count_not_working': 0,
    'count_not_working_inverters': 0,
    'avg_inverter_kw': 0,
    'sd_inverter_kw': 0,
    'count_inverter_low_kw': 0,
    'avg_inverter_temp': 0,
    'sd_inverter_temp': 0,
    }


    def create_summary():
    summary['avg_inverter_kw'] = statistics.fmean(inverter_kws)
    summary['sd_inverter_kw'] = statistics.pstdev(inverter_kws)
    summary['avg_inverter_temp'] = statistics.fmean(inverter_temps)
    summary['sd_inverter_temp'] = statistics.pstdev(inverter_temps)

    # Count "low" inverters
    mean_kw = summary['avg_inverter_kw']
    if mean_kw < 0.02: # if no power is being generated then skip this check
    low = 0
    else:
    # low means less than 80% of average
    low_factor = 0.8
    low = sum([1 for x in inverter_kws if x < low_factor * summary['avg_inverter_kw']])
    summary['count_inverter_low_kw'] = low

    return "pvs6_summary", summary, {}, {}


    def parse(data, timestamp):
    output = []
    skipped = 0
    init_summary()

    # Parse every record in the JSON input
    for record in data['devices']:
    device_type = record.get("DEVICE_TYPE")
    if device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-P':
    output.append(parse_production(record))
    elif device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-C':
    output.append(parse_consumption(record))
    elif device_type == 'Inverter':
    output.append(parse_inverter(record))
    elif device_type == 'PVS':
    output.append(parse_pvs(record))
    else:
    skipped += 1
    if skipped > 0:
    sys.stderr.write(f'Skipped {skipped} records of unknown type.\n')

    # Make a rollup from the collected data
    output.append(create_summary())

    # Emit the records in Influx format

    for measurement, fields, tags, others in output:
    if len(others) != 0:
    sys.stderr.write(f'Ignoring {len(others)} unknown data items in {measurement}: {others}\n')

    # Strip whitespace from tag names and values and format as k=v,k=v lists for Influx
    tag_set = ','.join(f"{_s(k)}={_s(v)}" for k, v in tags.items())
    field_set = ','.join(f"{_s(k)}={_v(v)}" for k, v in fields.items())

    # Format timestamp into nanoseconds
    ts = int(1_000_000_000 * timestamp)

    # Only print this delimeter if there are tags
    d = ',' if len(tag_set) > 0 else ''

    # Emit the Influx record
    print(f'{measurement}{d}{tag_set} {field_set} {ts}')


    def parse_pvs(record):
    fields, tags, others = extract_fields_tags(
    record,
    "STATE,SWVER,dl_comm_err,dl_cpu_load,dl_err_count,dl_flash_avail,dl_mem_used,dl_scan_time,dl_skipped_scans,dl_untransmitted,dl_uptime",
    "SERIAL", "CURTIME,DATATIME,DETAIL,DEVICE_TYPE,HWVER,MODEL,STATEDESCR,panid")
    if fields['STATE'] != 'working':
    summary['count_not_working'] += 1
    return "pvs6_computer", fields, tags, others


    def parse_production(record):
    fields, tags, others = extract_fields_tags(
    record, "STATE,freq_hz,net_ltea_3phsum_kwh,p_3phsum_kw,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto", "SERIAL",
    "CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype"
    )
    if fields['STATE'] != 'working':
    summary['count_not_working'] += 1
    return "pvs6_production", fields, tags, others


    def parse_consumption(record):
    fields, tags, others = extract_fields_tags(
    record,
    "STATE,freq_hz,i1_a,i2_a,neg_ltea_3phsum_kwh,net_ltea_3phsum_kwh,p1_kw,p2_kw,p_3phsum_kw,pos_ltea_3phsum_kwh,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto,v12_v,v1n_v,v2n_v",
    "SERIAL",
    "CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype"
    )
    if fields['STATE'] != 'working':
    summary['count_not_working'] += 1
    return "pvs6_consumption", fields, tags, others


    def parse_inverter(record):
    fields, tags, others = extract_fields_tags(
    record,
    "STATE,freq_hz,i_3phsum_a,i_mppt1_a,ltea_3phsum_kwh,p_3phsum_kw,p_mppt1_kw,stat_ind,t_htsnk_degc,v_mppt1_v,vln_3phavg_v",
    "SERIAL",
    "CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,MOD_SN,NMPLT_SKU,OPERATION,PANEL,PORT,STATEDESCR,SWVER,TYPE,hw_version,interface,origin"
    )
    if fields['STATE'] != 'working':
    summary['count_not_working'] += 1
    summary['count_not_working_inverters'] += 1
    inverter_kws.append(float(fields['p_3phsum_kw']))
    inverter_temps.append(float(fields['t_htsnk_degc']))
    return "pvs6_inverter", fields, tags, others


    def load_from_file(fn):
    ts = os.path.getmtime(fn)
    try:
    data = json.load(open(fn))
    except:
    sys.stderr.write(f'Error parsing JSON file {fn}, skipping.\n')
    return
    parse(data, ts)


    def load_from_url(url):
    # PVS6 often takes 10-15 seconds to respond
    fp = urllib.request.urlopen(url, timeout=50)
    data = json.load(fp)
    parse(data, time.time())


    if __name__ == '__main__':
    if len(sys.argv) > 1:
    for fn in sys.argv[1:]:
    load_from_file(fn)
    else:
    load_from_url(_PVS6_URL)