Created
          May 9, 2025 21:30 
        
      - 
      
- 
        Save inf3rtil/207b830241716a5855a6b5484d91ef7c to your computer and use it in GitHub Desktop. 
    Rigol DP800
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | """Use a Rigol DP800 to monitor load and store test on database.""" | |
| import pyvisa | |
| import time | |
| import mysql.connector | |
| from datetime import datetime, timezone | |
| def connect_to_database(): | |
| """Connect to mysql database.""" | |
| try: | |
| connection = mysql.connector.connect( | |
| host="host", | |
| user="user", | |
| password="pass", | |
| database="database" | |
| ) | |
| print("Successfully connected to the database") | |
| return connection | |
| except mysql.connector.Error as err: | |
| print(f"Error connecting to MySQL: {err}") | |
| return None | |
| def insert_measurements(connection, power, current, test_pass): | |
| """Insert measurements on database.""" | |
| cursor = connection.cursor() | |
| timestamp = datetime.now(timezone.utc) | |
| query = "INSERT INTO measurements (power, current, timestamp, pass) VALUES (%s, %s, %s, %s)" | |
| values = (power, current, timestamp, test_pass) | |
| try: | |
| cursor.execute(query, values) | |
| connection.commit() | |
| print(f"Successfully inserted: Power={power}, Current={current}, Timestamp={timestamp}, Pass={test_pass}") | |
| except mysql.connector.Error as err: | |
| print(f"Error inserting data: {err}") | |
| connection.rollback() | |
| finally: | |
| if cursor: | |
| cursor.close() | |
| class RigolDP800: | |
| """Rigol DP800 programable power supply.""" | |
| def connect(self): | |
| """List and connect to first resource.""" | |
| self.rm = pyvisa.ResourceManager('@py') | |
| print(self.rm.list_resources()) | |
| self.device = self.rm.open_resource(self.rm.list_resources()[0]) | |
| print(self.device.query('*IDN?')) | |
| def setup(self): | |
| """Define test parameters.""" | |
| self.writeOnScreen('Carregando Parametros...') | |
| time.sleep(2) | |
| self.device.write('SOUR1:CURR 0.5') | |
| self.device.write('SOUR1:VOLT 12') | |
| self.device.write('OUTP CH1,ON') | |
| def waitConnection(self): | |
| """Check if device is connected.""" | |
| current = self.device.query('MEAS:CURR? CH1') | |
| current = int(current.replace('.', '')) | |
| if (current != 0): | |
| return True | |
| else: | |
| return False | |
| def endTest(self): | |
| """Power off the power supply.""" | |
| self.device.write('DISP:TEXT "Teste finalizado",20,60') | |
| time.sleep(1) | |
| def getTest(self): | |
| """Return values read values from power supply.""" | |
| self.curr = self.device.query('MEAS:CURR? CH1') | |
| self.power = self.device.query('MEAS:POWE? CH1') | |
| return self.curr, self.power | |
| def writeOnScreen(self, message): | |
| """Write on DP800 Screen.""" | |
| self.device.write(f'DISP:TEXT "{message}",20,60') | |
| def main(): | |
| connection = connect_to_database() | |
| if not connection: | |
| return | |
| print("Database connected") | |
| dp800 = RigolDP800() | |
| dp800.connect() | |
| dp800.setup() | |
| while (True): | |
| counter = 0 | |
| current_acc = 0 | |
| power_acc = 0 | |
| test_pass = 0 | |
| while (not dp800.waitConnection()): | |
| dp800.writeOnScreen('Conecte dispositivo') | |
| continue | |
| dp800.writeOnScreen('Dispositivo conectado!') | |
| while (counter < 10): | |
| counter = counter + 1 | |
| time.sleep(1) | |
| current_read, power_read = dp800.getTest() | |
| current_acc += int(current_read.replace('.', '')) | |
| power_acc += int(power_read.replace('.', '')) | |
| dp800.writeOnScreen(f'Testando... {current_read.rstrip()} A, {power_read.rstrip()} W') | |
| power = int(power_acc/counter) | |
| current = int(current_acc/counter) | |
| if (current > 1000): | |
| test_pass = 1 | |
| dp800.writeOnScreen(f'Nao aprovado: {current/10000}mA.') | |
| else: | |
| dp800.writeOnScreen(f'Aprovado: {current/10000}mA') | |
| time.sleep(1) | |
| insert_measurements(connection, power, current, test_pass) | |
| while (dp800.waitConnection()): | |
| dp800.writeOnScreen('Finalizado! Remova a carga.') | |
| dp800.endTest() | |
| if __name__ == "__main__": | |
| main() | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment