Skip to content

Instantly share code, notes, and snippets.

@inf3rtil
Created May 9, 2025 21:30
Show Gist options
  • Save inf3rtil/207b830241716a5855a6b5484d91ef7c to your computer and use it in GitHub Desktop.
Save inf3rtil/207b830241716a5855a6b5484d91ef7c to your computer and use it in GitHub Desktop.
Rigol DP800
"""Use a Rigol DP800 to monitor load and store test on database."""
import pyvisa
import time
import mysql.connector
from datetime import datetime, timezone
def connect_to_database():
"""Connect to mysql database."""
try:
connection = mysql.connector.connect(
host="host",
user="user",
password="pass",
database="database"
)
print("Successfully connected to the database")
return connection
except mysql.connector.Error as err:
print(f"Error connecting to MySQL: {err}")
return None
def insert_measurements(connection, power, current, test_pass):
"""Insert measurements on database."""
cursor = connection.cursor()
timestamp = datetime.now(timezone.utc)
query = "INSERT INTO measurements (power, current, timestamp, pass) VALUES (%s, %s, %s, %s)"
values = (power, current, timestamp, test_pass)
try:
cursor.execute(query, values)
connection.commit()
print(f"Successfully inserted: Power={power}, Current={current}, Timestamp={timestamp}, Pass={test_pass}")
except mysql.connector.Error as err:
print(f"Error inserting data: {err}")
connection.rollback()
finally:
if cursor:
cursor.close()
class RigolDP800:
"""Rigol DP800 programable power supply."""
def connect(self):
"""List and connect to first resource."""
self.rm = pyvisa.ResourceManager('@py')
print(self.rm.list_resources())
self.device = self.rm.open_resource(self.rm.list_resources()[0])
print(self.device.query('*IDN?'))
def setup(self):
"""Define test parameters."""
self.writeOnScreen('Carregando Parametros...')
time.sleep(2)
self.device.write('SOUR1:CURR 0.5')
self.device.write('SOUR1:VOLT 12')
self.device.write('OUTP CH1,ON')
def waitConnection(self):
"""Check if device is connected."""
current = self.device.query('MEAS:CURR? CH1')
current = int(current.replace('.', ''))
if (current != 0):
return True
else:
return False
def endTest(self):
"""Power off the power supply."""
self.device.write('DISP:TEXT "Teste finalizado",20,60')
time.sleep(1)
def getTest(self):
"""Return values read values from power supply."""
self.curr = self.device.query('MEAS:CURR? CH1')
self.power = self.device.query('MEAS:POWE? CH1')
return self.curr, self.power
def writeOnScreen(self, message):
"""Write on DP800 Screen."""
self.device.write(f'DISP:TEXT "{message}",20,60')
def main():
connection = connect_to_database()
if not connection:
return
print("Database connected")
dp800 = RigolDP800()
dp800.connect()
dp800.setup()
while (True):
counter = 0
current_acc = 0
power_acc = 0
test_pass = 0
while (not dp800.waitConnection()):
dp800.writeOnScreen('Conecte dispositivo')
continue
dp800.writeOnScreen('Dispositivo conectado!')
while (counter < 10):
counter = counter + 1
time.sleep(1)
current_read, power_read = dp800.getTest()
current_acc += int(current_read.replace('.', ''))
power_acc += int(power_read.replace('.', ''))
dp800.writeOnScreen(f'Testando... {current_read.rstrip()} A, {power_read.rstrip()} W')
power = int(power_acc/counter)
current = int(current_acc/counter)
if (current > 1000):
test_pass = 1
dp800.writeOnScreen(f'Nao aprovado: {current/10000}mA.')
else:
dp800.writeOnScreen(f'Aprovado: {current/10000}mA')
time.sleep(1)
insert_measurements(connection, power, current, test_pass)
while (dp800.waitConnection()):
dp800.writeOnScreen('Finalizado! Remova a carga.')
dp800.endTest()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment