MQTT自动化测试——So Easy

摘要

现在智慧城市是国家的重点鼓励扶持项目,当下物联网越来越盛行。而在物联网中,硬件与平台的沟通通常情况都采用MQTT协议。而怎么对平台做自动化测试,就是需要我们测试人员需要考虑的问题。

万物互联怎么做物联网项目的自动化测试呢?首先还是需要开发的支持,需要结合开发进行测试的自动化。比如增加必要的结果记录查询MQTT协议接口,增加用于调试的通信通道,因为笔者作为测试兼需求,所以直接在需求中就考虑了这一点,大家也可以向自己的leader提出对应的需求。当这些都准备好后,就需要我们对自动化测试工具选型了。这里我建议大家选用robotframework框架,robotframework的介绍可以参看文章《自动化框架》一文,按照robotframework官网搭建好环境。然后再下载并安装MQTT客户端支持库http://www.eclipse.org/paho/clients/python/,这样就把MQTT自动化测试环境搭建起来了。因为笔者更擅于python语言,所以笔者在ubuntu15.10下使用python作为自动化脚本语言。

要想MQTT自动化,肯定需要根据ROBOT框架的思想,编写自己的测试关键字。当然MQTT库已经提供了一些方法,所以我们首先要去了解这些方法,MQTT Client自带库的文档说明见http://www.eclipse.org/paho/clients/python/docs/,纯英文文档,但不难,相信大家认真读,或者借助在线翻译工具都能读懂。

下面就是编写关键字库了,具体编写字库可以参考本网站文章robot之python扩展库编写》。里面讲了具体如何用python语言编写自己的关键字库。其中的示例即是MQTT关键字的编写。这里我针对于MQTT关键字的开发,结合文档提出其中几个大家可能需要认真理解的地方,否则会在关键字开发过程中占用大量的时间,毕竟是笔者的经验教训。

1、如关键词 “openDebug(self,deviceNo,qos=1):……”中qos传递过来的不是数字类型,是字符串。如果调用的方法需要的是整型,需要对其进行强制转换。如qos=int(qos)。

2、注意on_connect和on_message的用法,笔者尝试了各种方式,最后还是只能使用官网提供的使用方式,否则都不能成功,被打败。

def on_connect(client, userdata, flags, rc):  

    print("Connected with result code " + str(rc))  //打印返回结果码

    client.subscribe(sub)  //订阅消息,sub为订阅消息头

# 消息推送回调函数  

def on_message(client, userdata, msg):

   //DO:自己具体的逻辑,可以为检查到某个指定关键字,则PASS并结束

client = mqtt.Client()

client.username_pw_set(username, password)

client.connect(host, port=int(port), keepalive=120, bind_address="") //bind_address为空时,即随机生成client的唯一标识

client.on_connect = on_connect  //如果要订阅,必须这样写

client.on_message = on_message //如果要进行消息的订阅,必须这样写

3、当没有超时处理这个概念时,使用loop_forver()作为阻塞线程接收消息,如果要终端,比如见到都某个关键字,只需要在上面2的on_message逻辑中添加关键字检查,并调用disconnect()就可以了。

如:

def on_message(client, userdata, msg):

    if((msg.payload).find(keyword) >= 0):

        print("Success:find the keyword>" + keyword) 

        print(msg.topic+" "+str(msg.payload))

        client.disconnect()

    else:

        print("Failed:not find the keyword>" + keyword)

        raise AssertionError(msg.topic+" "+str(msg.payload))

4、当然,往往我们需要超时这个概念,所以此时就不能使用loop_forover()了,此时我们需要使用loop()。笔者因为使用的python版本为2.7,没有定时器,这里使用while循环作为超时检查。所以代码如下:

count = int(timeout) + 5 //这里具体为什么为5,试出来的。当然,你可以使用python3.0以上版本,使用定时器实现超时报错功能。

j = 1

while (j < count):

     client.loop()

     j = j + 1

     raise AssertionError("Failed:Timeout>" + str(timeout) + "second")

下面为一个完整的关键字代码:

    def checkSubContent(self,topic,keyword,timeout=30):

        """说明:检查指定订阅消息的内容中是否包含关键字keyword

 topic - 指定订阅消息主题

 keyword - 关键字,可以是字符或字符串

 timeout - 检查超时时间,超过该时间后,默认本次检查内容失败"""

上一页12下一页


留言