压力测试工具-locust使用

locust简介

Locust直译是“蝗虫”的意思,意在压测时产生的压力就像是漫天蝗虫一样,铺天盖地。

Locust是用Python实现的开源性能测试框架,不同于其他压测工具基于进程/线程产生压力,Locust是完全基于事件,支持分布式,一个Locust节点可以在一个进程中轻松支持上千并发用户。

locust特点

  • 基于协程 ,低成本实现更多并发;
  • 脚本增强(“测试即代码”),不支持脚本录制,通过编写代码,提供更大的灵活性;
  • HttpUser类使用了requests发送http请求,FastHttpUser类基于geventhttpclient实现,性能更高;
  • 分布式压测更加简单,可以在同一服务器上使用分布式,也可以在多台服务器上进行分布式压测;
  • 使用Flask 提供WebUI,同时提供无界面压测;
  • 有第三方插件、 易于扩展,只要有相关协议的包,可以随机扩压测压。

locust安装

pip install locust # 查看安装版本 locust -v

locust使用帮助:locust --help

压力测试工具-locust使用

测试脚本执行事件顺序

由于许多设置和清除操作是相互依赖的,因此以下是它们的执行顺序:

  1. Locust setup (一次)
  2. TaskSet setup (一次)
  3. TaskSet on_start (每个locust一次)
  4. TaskSet tasks…
  5. TaskSet on_stop (每个locust一次)
  6. TaskSet teardown (一次)
  7. Locust teardown (一次) 通常,setup和teardown方法应该是互补的。


http协议示例

  1. 创建测试用户类,用于继承HttpUser或者FastHttpUser;
  2. 创建测试类,继承TaskSet类;
  3. __init__()方法进行数据初始化;
  4. on_start方法进行测试用例执行前准备,如执行登录;
  5. on_end方法测试结束后执行;
  6. 测试用例编写,待测用例方法前加@locust.task(weight: int=1)装饰器
  7. 使用self.client进行get/post/put/delete等请求
import hashlib
import json
import logging
from urllib import parse
from locust import TaskSet, task, between
from locust.contrib.fasthttp import FastHttpUser
logging.basicConfig(level=logging.INFO,  # filename="info.log", filemode="w+",
                    format="%(asctime)s %(filename)s %(funcName)s %(levelname)s Line:%(lineno)s :%(message)s")
logger = logging.getLogger(__name__)

HOST = "10.209.1.12"  
PORT = "8081"

def get_hash_md5(text):
    # 返回MD5值
    text = text.encode('utf-8') if isinstance(text, str) else text  # 转换bytes
    try:
        # m = hashlib.md5(b'credit')
        m = hashlib.md5()
        m.update(text)
        return m.hexdigest()
    except Exception as e:
        return False

def dict_to_params_str(di):
    s = ""
    for k, v in di.items():
        if isinstance(v, dict):
            v = str(v)
        s += "{}={}&".format(k, v)
    return s[:-1]


def get_data_result(data):
    data = data.decode('utf-8') if isinstance(data, bytes) else data
    data = json.loads(data) if isinstance(data, str) else data
    if int(data.get("errno")) == 0:
        return True
    return False

class UserTask(TaskSet):

    def __init__(self, parent):
        super().__init__(parent)
        self._cookies = ""
        self._headers = headers

    def on_start(self):
        logger.info("测试开始...")
        headers = {
            "Accept": "application/json, text/plain, */*",
            "Accept-Encoding": "gzip, deflate",
            "Accept-Language": "zh-CN,zh;q=0.9",
            "Connection": "keep-alive",
            "Cookie": "sidebarStatus=1",
            "Host": "{}:{}".format(HOST, PORT),
            "Origin": "http://{}:{}".format(HOST, PORT),
            "Referer": "http://{}:{}/dist/".format(HOST, PORT),
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36",
        }

        data = {
            "username": "admin",
            "password": "62638c576e044bffcbe34ff77554d43e"}
        resp = self.client.post("/login", data=dict_to_params_str(data), name='登录', 
                                headers=headers, catch_response=True)
        if resp.status_code == 200:
            res = get_data_result(resp.content.decode('utf-8'))
            if res:
                self._cookies = resp.headers['Set-Cookie']
                logger.info(self._cookies)
                self._headers["Cookie"] = self._cookies
                logger.info("登录成功!")
        else:
            logger.warning("登录失败:{}".format(resp.content.decode('utf-8')))
            return

    def on_stop(self):
        logger.info("测试结束!")

    @task
    def test_host_getlist_terminal(self):
        params = {
            "limit": "10",
            "page": "1",
            "prodcombo": "360exthost/pcinfo",
            "filter": '{"id": "-1"}'
        }
        with self.client.get("/host/getlist", name='终端概况', data=dict_to_params_str(params),
                             headers=self._headers, catch_response=True) as resp:
            if resp.status_code == 200:
                if get_data_result(resp.content.decode('utf-8')):
                    resp.success()
                else:
                    resp.failure("resp data error:{}".format(resp.content.decode('utf-8')))
            else:
                resp.failure('request status error:{}'.format(resp.status_code))

class WebsiteUser(FastHttpUser):  # HttpUser
    host = 'http://10.209.1.12:8081'
    wait_time = between(0, 0.01)  # 每个请求间隔/秒
    tasks = [UserTask]

if __name__ == '__main__':
    # os.system("locust -f web_api.py --host=http://10.173.220.46:8081 --csv=req --logfile=info.log")
    os.system("locust -f web_api.py  --loglevel=INFO")

Websocket协议示例

class WebSocketClient(InstanceClass):
    def __init__(self):
        headers = ["Accept-Encoding:gzip, deflate, br",
                   "Accept-Language:zh-CN,zh;q=0.9",
                   "Cache-Control:no-cache",
                   "Connection:Upgrade",
                   "Host:{}:36060".format(HOST),
                   "Origin:file://",
                   "Pragma:no-cache",
                   "Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits",
                   "Sec-WebSocket-Key:{}".format(get_websocket_key()),
                   "Sec-WebSocket-Version:13",
                   "Upgrade:websocket",
                   "User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.366",
                   ]

        self.ws = create_connection(WS_URL, header=headers, sslopt={"cert_reqs": ssl.CERT_NONE})

    def on_message(self, message):
        message = message.decode('utf-8') if isinstance(message, bytes) else message
        logger.info("接收数据: {}".format(message))

    def on_error(self, error):
        logger.info("错误信息: {}".format(error))

    def on_close(self):
        logger.info("### closed ###")

    def on_open(self):
        logger.info("####on openen####")


class WebSocketLocust(User):
    abstract = True

    def __init__(self, parent):
        super(WebSocketLocust, self).__init__(parent)
        self.s_client = WebSocketClient()
        
class UserBehavior(TaskSet):

    def __init__(self, parent):
        super().__init__(parent)
        self.wss = self.parent.s_client.ws

    def get_m2_info(self):
        try:
            param = self.parent.q.get()
            # self.parent.q.put_nowait(param)  # 获取完数据重新加入队列
            return param
        except queue.Empty:
            logger.error("queue is Empty!!!")
            exit(0)

    def on_start(self):
        logger.info(f"发送AUTH数据...")
        self.client_auth()
        logger.info(f"资产上报...")
        self.test_asset_push()
        # self.wss.ping()
        self.wss.pong()
        threading.Thread(target=self.recv).start()
        # self.recv()

    def on_stop(self):
        logger.info('---------- task stop ------------')
        self.wss.close()

    def recv(self):
        logger.info("进入接收数据方法...")
        while True:
            res = self.wss.recv()
            try:
                data = json.loads(res)
                logger.info("接收数据receive:{}".format(data))
            except websocket.WebSocketTimeoutException as e:
                logger.exception("接收数据异常:{}".format(str(e)))
            except websocket.WebSocketConnectionClosedException as e:
                logger.warning("websocket close!!!")

    def assert_result(self, name, result, start_time, msg=""):
        if result:
            events.request_success.fire(
                request_type=name,
                name=name + "_succ",
                response_time=int(time.time()) - start_time,
                response_length=0)
        else:
            events.request_failure.fire(request_type=name, name=name + "_fail",
                                        response_time=int(time.time()) - start_time,
                                        exception=msg, response_length=1)

    def sends(self, msg, name):
        # logger.info("发送类型:{}, 数据类型:{} 数据: {}!".format(name, type(msg), msg))
        start_time = int(time.time())
        try:
            self.wss.send(json.dumps(msg))
            self.assert_result(name, True, start_time)
        except Exception as e:
            logger.error(f"发送错误: {str(e)}")
            self.assert_result(name, False, start_time, msg=str(e))

    def client_auth(self):
        # 客户端认证
        req = {"msgType": 0, "epGuid": self.uuid, "authData": get_sha256(self.uuid + ".eppmc")}
        logger.info("客户端认证,发送AUTH数据: {}".format(req))
        self.sends(req, "auth_conn")
        
    @task
    def policy_type_task(self):
        # 3. 获取策略 POLICY_REQUEST: 101
        _policy_type = random.choice(policy_type)
        name = sys._getframe().f_code.co_name
        req = {"epGuid": self.uuid, "_os": 1, "msgType": 104,
               "_guid": self.uuid, "policyHash": get_str_md5(self.uuid),
               "policyType": "SetSecurityReinforce"}
        self.sends(req, name=name)

NSQ协议示例

import queue
import gnsq
from locust import between, task, User, TaskSet, events, tag

class InstanceClass(object):
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(InstanceClass, cls).__new__(cls, *args, **kwargs)
        return cls._instance

class WebNsqclient(InstanceClass):
    """nsq连接"""
    def __init__(self):
        super(WebNsqclient, self).__init__()
        """建立nsq连接"""
        self.producer_list = []
        self.topic = "CloudSafeLine.EEPP"
        # self.cwpp_topic = "CloudSafeLine.CCPP"
        # _nsq_addr = f"{random.choice(nsq_host_list)}:{nsq_port}"
        self.producer = gnsq.Producer(nsq_addr)
        self.producer.start()

    @property
    def producers(self):
        return random.choice(self.producer_list)

    def on_message(self, data):
        """nsq消息发送,发送成功返回OK"""
        zip_data = data_compress(data)
        res = self.producer.publish(self.topic, zip_data)  # 报文发送
        return res

    def on_end(self):
        """nsq连接关闭"""
        self.producer.close()

class WebNsqLocust(User):
    """自定义locust User类"""
    abstract = True

    def __init__(self, parent):
        super(WebNsqLocust, self).__init__(parent)
        self.client = WebNsqclient()


class UserTask(TaskSet):

    def __init__(self, parent):
        super().__init__(parent)
        monitor_name = "monitor"

    def get_m2_info(self):
        try:
            param = self.parent.q.get()  # 参数化
            # self.parent.q.put_nowait(param)  # 获取完数据重新加入队列
            return param
        except queue.Empty:
            logger.error("queue is Empty!!!")
            exit(0)

    def on_start(self):
        logger.debug("开始测试...")
        # threading.Thread(target=monitor_redis).start()
        global START_FLAG
        if not START_FLAG:
            START_FLAG = True
            threading.Thread(target=monitor_redis).start()  # redis监控

            monitor_name = "monitor"
            monitor_time = 5  # 分钟 minutes
            easynmon_monitor_start(easynmon_ip_port, name=monitor_name, monitor_time=monitor_time)  # easynmon监控

    def on_end(self):
        logger.debug("测试结束!!!")
        self.client.on_end()
        easynmon_monitor_stop(easynmon_ip_port)

    def assert_result(self, name, result, start_time):
        if result.decode() == "OK":
            events.request_success.fire(
                request_type=name,
                name=name + "_succ",
                response_time=int((time.time() - start_time) * 1000),
                response_length=0)
        else:
            events.request_failure.fire(request_type=name, name=name + "_fail",
                                        response_time=int((time.time() - start_time) * 1000),
                                        exception=result, response_length=1)

    @task
    @tag("pluginmgr", "test")
    def pluginmgr_task(self):
        name = sys._getframe().f_code.co_name
        param = self.get_m2_info()
        data = pluginmgr_info(param)
        start_time = time.time()
        res = self.client.on_message(data)
        self.assert_result(name, res, start_time)

locust扩展增强

1、参数化

  • 引入队列的概念 queue ,实现方式是将参数推入队列,测试时依次取出,全部取完后 locust 会自动停止。若是使用参数循环压测,需要将取出的参数再推入队尾。

2、检查点

  • 使用self.client提供的catch_response=True`参数, 添加locust提供的ResponseContextManager类的上下文方法手动设置检查点。
  • ResponseContextManager里面的有两个方法来声明成功和失败,分别是success和failure。其中failure方法需要我们传入一个参数,内容就是失败的原因。

3、思考时间

  • wait_time = between(0, 0.01)
  • wait_time = constant(1)
  • wait_time = constant_pacing/1)

4、权重

  • 类中测试方法执行比例通过@task(int)进行控制,默认1;
  • 一个文件多个类中可以通过weight=1进行控制;

5、集合点:模拟一定数量的用户,同时并发请求。

from locust import HttpUser, TaskSet, task,between,events
from gevent._semaphore import Semaphore

all_locusts_spawned = Semaphore()  # 创建集合点
all_locusts_spawned.acquire()  # 阻塞线程


# 注册事件
@events.spawning_complete.add_listener
def on_hatch_complete(**kwargs):
    # Select_task类的钩子方法
    # 挂在到locust钩子函数(所有的Locust实例产生完成时触发
    all_locusts_spawned.release()

num = 0
class UserTask(TaskSet):
    def login(self):
        global num
        num += 1
        print("%s个虚拟用户开始启动,并登录"%n)
        self.client.post("/login", data={"username":"wantest001gwu", "password":"test00123"})

    def logout(self):
        print("退出登录")

    def on_start(self):
        self.login()
        all_locusts_spawned.wait()

    @task(4)
    def test1(self):
        param = {"limit":1, "offset":0}
        with self.client.get("/list", params=param, headers={}, catch_response = True) as response:
            print("用户浏览首页商品列表")

6、分布式

locust -f locust.py --master --web-host=0.0.0.0 --loglevel=ERROR

locust -f locust.py --worker --master-host=10.20.14.24 --loglevel=ERROR

7、无界面测试


压力测试工具-locust使用

总结

locust不支持录制,使用python进行脚本开发,虽然需要一定编码基础,但是灵活性更大;可以通过代码组合方式实现更多场景!

通过代码增强,测试脚本中日常用到的检查点、参数化、集合、思考时间等都可以实现,而且分布式压测特别方便!

缺点是压测过程中,对服务器的检测需要借助其他工具进行!

压力测试工具-locust使用

#学习##python##性能测试#

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章