Python+485继电器 实现自动化上下电稳定性测试

需求:

之前公司做上下电稳定性测试使用定时开关(到了设定时间就断电上电),这样就有一个缺点,就是假如某次上下电系统启动后设备出现异常或某些模块没有正常工作,就不容易发现或保留现场,就需要自动化脚本灵活控制上下电稳定性测试,并监控设备各项指标是否正常

代码:

import paramikoimport time,reimport osimport requestsimport jsonimport datetimefrom openpyxl import Workbookfrom openpyxl import load_workbookimport threadingimport serialdef rv_env(ip,system):        #-------------------------判断设备连接--------------------------    user = "ssh账号"    password = "ssh密码"    #创建一个ssh的客户端,用来连接服务器    ssh = paramiko.SSHClient()    #创建一个ssh的白名单    know_host = paramiko.AutoAddPolicy()    #加载创建的白名单    ssh.set_missing_host_key_policy(know_host)    try:        ssh.connect(hostname = "%s"%ip,port = 22,username = "%s"%user,password = "%s"%password)        print ("IP:%s连接成功.."%ip)                try:                            if system == "linux-2.0":                stdin,stdout,stderr = ssh.exec_command("命令")                  stdin,stdout,stderr = ssh.exec_command("命令")                                             elif system == "linux-2.1":                stdin,stdout,stderr = ssh.exec_command("命令")                  stdin,stdout,stderr = ssh.exec_command("命令")             print ("测试数据准完成")        except Exception as e:            print ("测试数据准备异常",e)        finally:            pass    except Exception as e:        print ("ssh登录失败")    #关闭ssh连接    ssh.close()# 上下电重启def updown_power(updown_power_com,down_power_time,up_power_time):    try:        ser = serial.Serial(port= updown_power_com, baudrate='9600', timeout=10)        print ("开始测试",ser.portstr,ser.isOpen())        print ("下电")        ser.write(serial.to_bytes([0x01, 0x0F, 0x00, 0x00, 0x00, 0x04, 0x01, 0x0F, 0x7E, 0x92]))        print ("下电时间间隔",down_power_time)        time.sleep(down_power_time)        print ("上电")        # 全部 01 0F 00 00 00 04 01 00 3E 96        ser.write(serial.to_bytes([0x01, 0x0F, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x3E, 0x96]))        print ("上电时间间隔",up_power_time)        time.sleep(up_power_time)    except Exception as e:        print (e)def rv_check(ip,system):    #-------------------------判断设备连接--------------------------    user = "ssh账号"    password = "ssh密码"    #创建一个ssh的客户端,用来连接服务器    ssh = paramiko.SSHClient()    #创建一个ssh的白名单    know_host = paramiko.AutoAddPolicy()    #加载创建的白名单    ssh.set_missing_host_key_policy(know_host)    try:        ssh.connect(hostname = "%s"%ip,port = 22,username = "%s"%user,password = "%s"%password)        #print ("IP:%s连接成功.."%ip)        try:            #print ('
------------IR摄像头帧率------------')            stdin,stdout,stderr = ssh.exec_command(" 命令")            data = stdout.read().decode("utf-8").strip()            data1 = data.split(':')[1]            time.sleep(10)            stdin,stdout,stderr = ssh.exec_command("命令")            data = stdout.read().decode("utf-8").strip()            data2 = data.split(':')[1]            if int(data2) > int(data1):                ir = "pass-IR摄像头帧率正常:%s,%s"%(int(data1),int(data2))            else:                ir = "fail-%s,%s:ir帧率未更新,请检查摄像头是否异常"%(int(data1),int(data2))        except Exception as e:            ir = "ir数据获取失败或摄像头异常"        finally:            pass        try:            #print ('
------------RGB摄像头帧率------------')            stdin,stdout,stderr = ssh.exec_command("命令")            data = stdout.read().decode("utf-8").strip()            data1 = data.split(':')[1]            time.sleep(10)            stdin,stdout,stderr = ssh.exec_command("命令")            data = stdout.read().decode("utf-8").strip()            data2 = data.split(':')[1]            if int(data2) > int(data1):                rgb = "pass-RGB摄像头帧率正常:%s,%s"%(int(data1),int(data2))            else:                rgb = "fail-%s,%s:rgb帧率未更新,请检查摄像头是否异常"%(int(data1),int(data2))        except Exception as e:            rgb = "rgb获取数据失败或摄像头异常"        finally:            pass        if system == "linux-2.0":            #print ("linux-2.0")            try:                stdin,stdout,stderr = ssh.exec_command("命令")                                data = stdout.read().decode("utf-8").strip()                if len(data) > 0:                    conf = "fail-配置文件重启前后不一致:%s"%data                else:                    conf = "pass-配置文件正常"                            except Exception as e:                conf = "conf获取数据异常%s"%e            finally:                pass        elif system == "linux-2.1":            try:                stdin,stdout,stderr = ssh.exec_command("命令")                                data = stdout.read().decode("utf-8").strip()                if len(data) > 0:                    conf = "fail-配置文件重启前后不一致:%s"%data                else:                    conf = "pass-配置文件正常"                            except Exception as e:                conf = "conf获取数据异常%s"%e            finally:                pass                try:            #print ('
------------系统时间差------------')            stdin,stdout,stderr = ssh.exec_command("命令")                        data = stdout.read().decode("utf-8").strip()            CST_FORMAT = "%a %b %d %H:%M:%S CST %Y"            devicetime = datetime.datetime.strptime(data.strip("
"),CST_FORMAT) #            devicetime_str = str(devicetime)  # devicetime为datetime.datetime类型            device_time = time.mktime(time.strptime(devicetime_str, '%Y-%m-%d %H:%M:%S'))            now_time = time.time()            time_lag = now_time-device_time            if int(time_lag) > 10:                result_time = "fail-设备时间异常:%s"%time_lag            else:                result_time = "pass-设备时间正常:%s"%time_lag                    except Exception as e:            result_time = "date获取数据异常%s"%e        finally:            pass        try:            #print ('
------------外网连通情况------------')            stdin,stdout,stderr = ssh.exec_command("命令")                                    data = stdout.read().decode("utf-8").strip()            if 'ttl' in data:                internet = "pass-外网连通正常:%s"%data            else:                internet = "fail-外网连通失败或丢包:%s"%data        except Exception as e:            internet = "ping获取数据异常%s"%e        finally:            pass        try:            #print ('
------------写入的数据------------')            #stdin,stdout,stderr = ssh.exec_command("命令")  # rv1109            #stdin,stdout,stderr = ssh.exec_command("命令")  # 海思            stdin,stdout,stderr = ssh.exec_command("命令")                        data = stdout.read().decode("utf-8").strip()            if data == "write data test":                userdata = "pass-写入的数据正常:%s"%data            else:                userdata = "fail-写入的数据不一致:%s"%data        except Exception as e:            userdata = "cat获取数据异常或数据丢失%s"%e        finally:            pass        check_result = [ir,rgb,conf,result_time,internet,userdata]        #print (check_result)        return check_result        except Exception as e:        print ("ssh登录失败")        test = 'ssh登录失败'        check_result = [test]        return check_result    #关闭ssh连接    ssh.close()def write_excel_append(tabal_title,tabal_content,excel_file_name):    if not os.path.exists(excel_file_name):                mybook = Workbook()                wa = mybook.active        #tabal_title = ["重启次数","重启时间","IR摄像头帧率","RGB摄像头帧率","配置文件检验","系统时间差","外网连通情况","写入的数据"]        wa.append(tabal_title)                    mybook.save(excel_file_name)    workbook=load_workbook(excel_file_name)    sheet=workbook.active    sheet.append(tabal_content)    workbook.save(excel_file_name)    def main(updown_power_total_num,updown_power_ip_list,updown_power_pwd,down_power_time,up_power_time,updown_power_system,updown_power_com):    num = 1    while True:            try:            #环境搭建            for updown_power_ip in updown_power_ip_list:                rv_env(ip=updown_power_ip,system=updown_power_system)            #重启            start_time = time.strftime('%Y-%m-%d_%H:%M:%S',time.localtime())            print ("上电时间:",start_time)            updown_power(updown_power_com,down_power_time,up_power_time)            # 检查信息            for updown_power_ip in updown_power_ip_list:                check_result = rv_check(ip=updown_power_ip,system=updown_power_system)                # 写入excel                tabal_title = ["IP","重启次数","重启时间","IR摄像头帧率","RGB摄像头帧率","配置文件检验","系统时间差","外网连通情况","写入的数据"]                                tabal_content = [updown_power_ip,num,start_time]+check_result                print (tabal_content)                excel_file_name = "上下电测试结果_%s.xlsx"%updown_power_ip                write_excel_append(tabal_title,tabal_content,excel_file_name)        except Exception as e:            print ("异常",e)                    if num == updown_power_total_num:            break        num = num+1if __name__ == '__main__':    updown_power_com = "COM3"    updown_power_ip_list = ["192.168.x.x","192.168.x.x"]    updown_power_pwd = "test1234"    updown_power_system = "linux-2.1"    down_power_time = 5    up_power_time = 60        updown_power_total_num = 3    # 多单台设备    main(updown_power_total_num,updown_power_ip_list,updown_power_pwd,down_power_time,up_power_time,updown_power_system,updown_power_com)    print ("测试结束")


环境:

1、485继电器一个(某宝上可购买,会配套调试工具,里面的调试命令会用到)



2、485通讯线


执行:


测试结果:


发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章