通过工具,将能够根据需要监控cpu、内存状态。脚本默认每隔10s钟获取一次当前的cpu、内存状态并记录,设定的时间到了后,停止获取并开始分析数据,最后自动发出结果邮件。运行结果截图如下
2.1使用场景
在手工测试app的时候,可同步监测系统、该app的性能
在做自动化测试的时候,可同步监测系统、指定的apps的性能
在做monkey测试的时候,可同步监测系统、指定的apps的性能
2.2运行环境
2.2.1 .ubuntu系统,安装好python2.7、adb工具,pychart插件
2.2.2 手机需要支持top、procrank命令
2.3.1根据需要修改systemmonitor.conf配置文件,配置项有:
[runtime]#脚步运行时长,以分钟为单位
rumtime=1
[updatetime]#刷新间隔,默认为10秒
updatetime=10
[packagenamelist]#过滤进程名,多个进程以逗号分隔
packagenamelist=com.android.email,com.android.providers.calendar,
[mailto_list]#收件人,以逗号分隔
mailto_list= ***@163.com
[mailinfo]#邮件服务器参数
mailserver=***
user=***
password=*********
me=guojianbiao@le.com
python systemmonitor.py
3.实际代码
分为三个文件,下面将逐个展现。
[runtime]#运行时长,以分钟为单位
rumtime=10
[updatetime]#刷新间隔,默认为10秒
updatetime=10
[packagenamelist]#过滤进程名,多个进程以逗号分隔
packagenamelist=com.android.providers.calendar,
[mailto_list]#收件人,以逗号分隔
mailto_list=***@163.com
[mailinfo]#邮件服务器参数
mailserver=***
user=***
password=***
me=***@163.com
#encoding:utf-8
import os
import sys
import datetime
from threading import thread
import threading
import subprocess
import configparser
from analysisinfo import *
from pychartdir import *
import smtplib
from email.mime.image import mimeimage
from email.mimetext import mimetext
from email.mimemultipart import mimemultipart
import time
def gettime():
now = datetime.datetime.now()
return now.strftime("%y-%m-%d %h:%m:%s")
def run_cmd(cmd):
process = subprocess.popen(cmd , shell = true, stdout=subprocess.pipe)
return process.stdout.read().strip()
def timestamp_datetime(value):
value = time.localtime(value)
dt = time.strftime('%y-%m-%d %h:%m:%s', value)
return dt
#获取设备型号
def get_model():
return run_cmd('adb shell getprop ro.product.model')
#获取android版本
def get_androidversion():
return run_cmd('adb shell getprop ro.build.version.release')
#获取cpu型号
def get_cpuinfo():
return run_cmd('adb shell getprop ro.product.cpu.abi')
#获取系统编译版本增量
def get_buildversionincremental():
return run_cmd('adb shell getprop ro.build.version.incremental')
#获取系统编译时间
def get_builddate():
return run_cmd('adb shell getprop ro.build.date.utc')
#生成表
#title:标题
#lable:
#ytitle
#data
#chartpath
#title_list
#style
def get_chart(title,label,ytitle,data,chartpath,title_list,style):
color_list = [0x008800,0xff0000,0x8a2be2,0xa52a2a,0x7fff00,0xff7f50,0xdc143c,0x8b008b,0xff1493,0xffd700]
c = xychart(1400, 600, 0xcceeff, 0x000000, 1)
c.addtitle(title, "simsun.ttc", 18).setbackground(0xcceeff, 0x000000, glasseffect())
c.setplotarea(50, 55, 1300, 500)
c.addlegend(50, 28, 0, "arialbd.ttf", 10).setbackground(transparent)
c.xaxis().setlabels(label)
c.yaxis().settickdensity(30)
c.xaxis().setlabelstyle("arialbd.ttf", 8)
c.yaxis().setlabelstyle("arialbd.ttf", 8)
c.xaxis().setwidth(2)
c.yaxis().setwidth(2)
c.yaxis().settitle(ytitle, "arialbd.ttf", 10)
layer = c.addlinelayer2()
layer.setlinewidth(3)
#c.addtext(1280, 70,"最大值:111\n最小值:111").setalignment(bottomright)
for i in range(0,len(data)):
layer.adddataset(data[i], color_list[i],title_list[i]).setdatasymbol(circleshape, 1)
#layer.setdatalabelstyle("",10)
#layer.setdatalabelformat(none)
c.setdefaultfonts("simsun.ttc")
c.makechart(chartpath)
#发送邮件
def send_mail_image(mailserver,user,password,me,mailto_list,title,bodyinfo,system_file_list,system_maxminavg,appinfo_list,app_file_list):
msg = mimemultipart('related')
msg['subject']=title
msg['from']=me
msg['to']=';'.join(mailto_list)
content = ""
content =""
content =""
content =""
content =""
content =""
content =""
content ="运行时长(分):".decode('utf-8').encode('gbk') str(bodyinfo[9]) ""
content = "
"
content ="开始时间:".decode('utf-8').encode('gbk') bodyinfo[0] ""
content = "
"
content ="结束时间:".decode('utf-8').encode('gbk') bodyinfo[1] ""
content = "
"
content ="刷新次数:".decode('utf-8').encode('gbk') str(bodyinfo[2]) ""
content = "
"
content ="刷新频率(秒):".decode('utf-8').encode('gbk') str(bodyinfo[3]) ""
content = "
"
content ="设备名称:".decode('utf-8').encode('gbk') str(bodyinfo[4]) ""
content = "
"
content ="系统版本:".decode('utf-8').encode('gbk') str(bodyinfo[5]) ""
content = "
"
content ="cpu信息:".decode('utf-8').encode('gbk') str(bodyinfo[6]) ""
content = "
"
content ="系统编译增量:".decode('utf-8').encode('gbk') str(bodyinfo[7]) ""
content = "
"
content ="系统编译日期:".decode('utf-8').encode('gbk') str(bodyinfo[8]) ""
content = "
"
content = "
"
content ="名词解释:".decode('utf-8').encode('gbk')
content = "
"
content ="pss:实际使用的物理内存".decode('utf-8').encode('gbk')
content = "
"
content ="uss:进程独自占用的物理内存".decode('utf-8').encode('gbk')
content = "
"
content = "
"
content ="system性能趋势:".decode('utf-8').encode('gbk')
content ="" system_maxminavg.decode('utf-8').encode('gbk') ""
content = "
"
for i in system_file_list:
pic_name = i.split(sep)[-1]
content = ""
content = "
"
content = "
"
for i in range(0,len(appinfo_list)):
content = "
"
content ="appinfo:" appinfo_list[i].decode('utf-8').encode('gbk') ""
content = "
"
for j in app_file_list[i]:
pic_name = j.split(sep)[-1]
content = ""
content = "
"
content = "
"
content =""
content =""
msg.attach(mimetext(content,_subtype='html',_charset='gb2312'))
for i in system_file_list:
file_name = i.split(sep)[-1]
fp = open(i,'rb')
img = mimeimage(fp.read())
img.add_header('content-id',file_name)
msg.attach(img)
for i in app_file_list:
for j in i:
file_name = j.split(sep)[-1]
fp = open(j,'rb')
img = mimeimage(fp.read())
img.add_header('content-id',file_name)
msg.attach(img)
try:
ott_mail=smtplib.smtp()
ott_mail.connect(mailserver)
ott_mail.login(user, password)
ott_mail.sendmail(me,mailto_list,msg.as_string())
ott_mail.close()
return true
except exception,e:
print e
return false
#同步获取cpu线程
#path:写cpu信息的文件路径
#count:获取次数
#updatetime:刷新次数
class cputhread(threading.thread):
def __init__(self,path,count,updatetime):
threading.thread.__init__(self)
self.thread_stop = false
self.path = path
self.count = count
self.updatetime = int(updatetime)
def run(self):
if not self.thread_stop:
print "cpu thread is start"
self.output = open(self.path, 'w ')
for i in range(self.count):
print "cpu thread run count:" str(i) " " time.strftime('%y-%m-%d %x', time.localtime( time.time())) "\n"
self.output.write(run_cmd("adb shell top -n 1 -d " str(self.updatetime)) "\n")
#time.sleep(self.updatetime)
self.output.close()
print "cpu thread is stop"
def stop(self):
self.thread_stop = true
#同步获取mem线程
#path:写mem信息的文件路径
#count:获取次数
#updatetime:刷新次数
class memthread(threading.thread):
def __init__(self,path,count,updatetime):
threading.thread.__init__(self)
self.thread_stop = false
self.path = path
self.count = count
self.updatetime = int(updatetime)
def run(self):
if not self.thread_stop:
print "mem thread is start"
self.output = open(self.path, 'w ')
for i in range(self.count):
print "mem thread run count:" str(i) " " time.strftime('%y-%m-%d %x', time.localtime( time.time())) "\n"
self.output.write(run_cmd("adb shell procrank") "\n")
time.sleep(self.updatetime)
self.output.close()
print "mem thread is stop"
def stop(self):
self.thread_stop = true
def analyse_log_and_sendmail(path,sep,starttime,endtime,getcount,updatetime,model,androidversion,cpuinfo,buildversionincremental,builddate,runtime,packagenamelist,mailserver,user,password,me,mailto_list):
system_file_list = []
system_cpu_png = path sep "system_cpu.png"
system_mem_png = path sep "system_mem.png"
system_cpu = analysissystemcpu(cpu_log_path)
x_list = []
x_list.append(starttime)
for i in range(0,len(system_cpu[0]) - 2):
x_list.append("")
x_list.append(endtime)
get_chart("system_cpu trend",x_list,"the numerical",[system_cpu[0],system_cpu[1]],system_cpu_png,['user cpu(%)','system cpu(%)'],"{value|0}%")
system_mem = analysissystemmem(mem_log_path)
x_list = []
x_list.append(starttime)
for i in range(0,len(system_mem) - 2):
x_list.append("")
x_list.append(endtime)
get_chart("system_mem trend",x_list,"the numerical",[system_mem],system_mem_png,['mem(m)'],"{value|2}")
system_maxminavg = "
user cpu(%):max=" str(max(system_cpu[0])) ",min=" str(min(system_cpu[0])) ",avg=" str(round(sum(system_cpu[0])/len(system_cpu[0]),2))\
"
system cpu(%):max=" str(max(system_cpu[1])) ",min=" str(min(system_cpu[1])) ",avg=" str(round(sum(system_cpu[1])/len(system_cpu[1]),2))\
"
mem(m):max=" str(max(system_mem)) ",min=" str(min(system_mem)) ",avg=" str(round(sum(system_mem)/len(system_mem),2));
system_file_list.append(system_cpu_png)
system_file_list.append(system_mem_png)
bodyinfo = [starttime,endtime,getcount,updatetime,model,androidversion,cpuinfo,buildversionincremental,str(builddate),runtime]
appinfo_list = []
cpumemchatfile_list = []
#分析日志
for packagename in packagenamelist:
cpu_png = path sep packagename "_cpu.png"
mem_png = path sep packagename "_mem.png"
cpu_list = analysisappcpu(cpu_log_path,packagename)
if len(cpu_list) == 0:
continue
x_list = []
x_list.append(starttime)
for i in range(0,len(cpu_list) - 2):
x_list.append("")
x_list.append(endtime)
get_chart(packagename "_cpu trend",x_list,"the numerical",[cpu_list],cpu_png,['cpu(%)'],"{value|0}%")
mem_list = analysisappmem(mem_log_path,packagename)
x_list = []
x_list.append(starttime)
for i in range(0,len(mem_list) - 2):
x_list.append("")
x_list.append(endtime)
get_chart(packagename "_pss_uss trend",x_list,"the numerical",[mem_list[2],mem_list[3]],mem_png,['pss(m)','uss(m)'],"{value|2}")
appinfo_list.append(packagename "
cpu(%):max=" str(max(cpu_list)) ",min=" str(min(cpu_list)) ",avg=" str(round(sum(cpu_list)/len(cpu_list),2))\
"
pss(m):max=" str(max(mem_list[2])) ",min=" str(min(mem_list[2])) ",avg=" str(round(sum(mem_list[2])/len(mem_list[2]),2))\
"
uss(m):max=" str(max(mem_list[3])) ",min=" str(min(mem_list[3])) ",avg=" str(round(sum(mem_list[3])/len(mem_list[3]),2)))
cpumemchatfile_list.append([cpu_png,mem_png])
#print appinfo_list
#print cpumemchatfile_list
print "send mail!!!"
print send_mail_image(mailserver,user,password,me,mailto_list,model "-system性能测试报告-" starttime,bodyinfo,system_file_list,system_maxminavg,appinfo_list,cpumemchatfile_list)
if __name__=='__main__':
sep = os.path.sep #获取系统分隔符
path = sys.path[0] #获取脚本路径
conf_path = path sep "systemmonitor.conf" #监控配置文件
cpu_log_path = path sep "monitor_cpu.txt" #cpu日志文件
mem_log_path = path sep "monitor_mem.txt" #mem日志文件
#读取配置文件
cf = configparser.configparser()
cf.read(conf_path)
runtime = cf.get("runtime","rumtime")
updatetime = cf.get("updatetime","updatetime")
packagenamelist = cf.get("packagenamelist","packagenamelist").split(",")
mailserver = cf.get("mailinfo","mailserver")
user = cf.get("mailinfo","user")
password = cf.get("mailinfo","password")
me = cf.get("mailinfo","me")
mailto_list = cf.get("mailto_list","mailto_list").split(',')
#检查配置文件是否存在
if not os.path.exists(conf_path):
print "systemmonitor.conf is null"
sys.exit()
#生成cpu日志文件,如果存在,则先删除
if os.path.exists(cpu_log_path):
os.remove(cpu_log_path)
#生成mem日志文件,如果存在,则先删除
if os.path.exists(mem_log_path):
os.remove(mem_log_path)
#删除当前目录下的.png文件
for parent,dirnames,filenames in os.walk(path):
for filename in filenames:
if filename[-4:] == ".png":
os.remove(os.path.join(parent,filename))
#检查环境配置
if runtime == "" or runtime == 0:
print "runtime is null"
sys.exit()
if updatetime == "" or updatetime == 0:
print "updatetime is null"
sys.exit()
if packagenamelist == "":
print "packagenamelist is null"
sys.exit()
#获取设备信息
model = get_model()
androidversion = get_androidversion()
cpuinfo = get_cpuinfo()
buildversionincremental = get_buildversionincremental()
builddate = timestamp_datetime(float(get_builddate()))
#计算top命令-n参数的值,即刷新多少次
getcount = int(runtime) * 60/int(updatetime)
print "runtime:" runtime
print "updatetime:" updatetime
print "count:" str(getcount)
starttime = gettime()
print "starttime:" starttime
threads = []
#开始执行获取cpu线程
t1 = cputhread(cpu_log_path,getcount,updatetime)
#开始执行获取mem线程
t2 = memthread(mem_log_path,getcount,updatetime)
threads.append(t2)
threads.append(t1)
#开始执行所有线程
for t in threads:
t.start()
#等待所有线程结束
for t in threads:
t.join()
endtime = gettime()
print "endtime:" endtime
analyse_log_and_sendmail(path,sep,starttime,endtime,getcount,updatetime,model,androidversion,cpuinfo,buildversionincremental,str(builddate),runtime,packagenamelist,mailserver,user,password,me,mailto_list)
#encoding:utf-8
import subprocess
import threading
import time
#分析系统cpu
def analysissystemcpu(filepath):
fp = open(filepath)
try:
lines = fp.readlines()
finally:
fp.close()
cpu_user_list = []
cpu_system_list = []
for line in lines:
if "user" in line and "system" in line and "iow" in line and "irq" in line:
line_split = line.strip().rstrip('\r').lstrip().split(' ')
cpu_user_list.append(int(line_split[1].replace('%,','')))
cpu_system_list.append(int(line_split[3].replace('%,','')))
return cpu_user_list,cpu_system_list
#分析app cpu信息
def analysisappcpu(filepath,packagename):
fp = open(filepath)
try:
lines = fp.readlines()
finally:
fp.close()
cpu_list = []
for line in lines:
if packagename in line:
line_split = line.strip().rstrip('\r').lstrip().split(' ')
for j in line_split:
if "%" in j:
cpu_list.append(int(j.replace('%','')))
return cpu_list
#分析系统mem信息
def analysissystemmem(filepath):
fp = open(filepath)
try:
lines = fp.readlines()
finally:
fp.close()
mem_system_list = []
for line in lines:
if "total" in line and "free" in line and "buffers" in line and "cached" in line:
line_split = line.strip().rstrip('\r').lstrip().split(' ')
system_mem = int(line_split[1].replace('k','')) - int(line_split[3].replace('k',''))
mem_system_list.append(round(system_mem/float(1024),2))
return mem_system_list
#分析app mem信息
def analysisappmem(filepath,packagename):
fp = open(filepath)
try:
lines = fp.readlines()
finally:
fp.close()
vss_list = []
rss_list = []
pss_list = []
uss_list = []
result_list = []
count = 0
for line in lines:
if packagename in line:
line_split = line.strip().rstrip('\r').lstrip().split(' ')
temp_list = []
for j in line_split:
if "k" in j:
temp_list.append(j.replace('k',''))
count = 1
result_list.append(temp_list)
if count % 4 == 0:
for i in result_list:
vss_list.append(round(float(i[0])/float(1024),2))
rss_list.append(round(float(i[1])/float(1024),2))
pss_list.append(round(float(i[2])/float(1024),2))
uss_list.append(round(float(i[3])/float(1024),2))
return vss_list,rss_list,pss_list,uss_list
else:
return vss_list,rss_list,pss_list,uss_list
def analysisinfo_bak(filepath,packagename):
fp = open(filepath)
lines = fp.readlines()
fp.close()
cpu_list = []
vss_list = []
rss_list = []
cpu_col = 0
vss_col = 0
rss_col = 0
for line in lines:
if packagename in line:
line_split = line.strip().rstrip('\r').lstrip().split(' ')
for j in line_split:
if "%" in j:
cpu_col = line_split.index(j)
if "k" in j:
vss_col = line_split.index(j)
break
for j in line_split[vss_col 1:]:
if "k" in j:
rss_col = line_split.index(j)
break
break
for line in lines:
if packagename in line:
line_split = line.strip().rstrip('\r').lstrip().split(' ')
cpu_list.append(line_split[cpu_col].replace('%',''))
vss_list.append(int(line_split[vss_col].replace('k',''))/float(1024))
rss_list.append(line_split[rss_col].replace('k',''))
return cpu_list,vss_list,rss_list
def run_cmd(cmd):
process = subprocess.popen(cmd , shell = true, stdout=subprocess.pipe)
return process.stdout.read().strip()
if __name__=='__main__':
#print analysissystemmem("d:\monitor_mem.txt")
#print analysissystemcpu("d:\monitor_cpu.txt")
#print analysisappcpu("d:\monitor_cpu.txt","com.bestv.ott")
#print analysisappmem("d:\monitor_mem.txt","com.bestv.ott")
pass