MQTT自动化测试——So Easy

        try:

            host = self.host

            port = self.port

            username = self.username

            password = self.password

            # 连接成功回调函数

            sub = str(topic)

            print sub  

            def on_connect(client, userdata, flags, rc):  

                print("Connected with result code " + str(rc))  

                client.subscribe(sub)  

            # 消息推送回调函数  

            def on_message(client, userdata, msg):

                if((msg.payload).find(keyword) >= 0):

                    print("Success:find the keyword>" + keyword) 

                    print(msg.topic+" "+str(msg.payload))

                    sys.exit(0)

                else:

                    print("Failed:not find the keyword>" + keyword)

                    raise AssertionError(msg.topic+" "+str(msg.payload))

            client = mqtt.Client()

            client.username_pw_set(username, password)

            client.connect(host, port=int(port), keepalive=120, bind_address="")

            client.on_connect = on_connect  

            client.on_message = on_message

            count = int(timeout) + 5

            j = 1

            while (j < count):

                client.loop()

                j = j + 1

            raise AssertionError("Failed:Timeout>" + str(timeout) + "second")

        except SystemExit:           

            pass

现在你觉得编写一个MQTT的关键字还难嘛?So easy,怎么样,当关键字都写好后,就是根据关键字来组装你的自动化测试用例库了,现在开始你的MQTT自动化之旅吧。

上一页12下一页


留言