Make persistence tests easier to use for other plugins.

pull/2558/merge
Roger A. Light 3 years ago
parent 86b314c624
commit ef078b31b5

@ -15,7 +15,7 @@ test : utest
ptest : utest
$(MAKE) -C broker ptest
$(MAKE) -C lib ptest
$(MAKE) -C client test
$(MAKE) -C client ptest
utest :
$(MAKE) -C unit test

@ -4,7 +4,7 @@
# QoS 2 flow. Is it received?
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -4,7 +4,7 @@
# QoS 2 flow. Is it received?
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -1,24 +1,23 @@
#!/usr/bin/env python3
from mosq_test_helper import *
import sqlite3
import sqlite_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
persist_help.write_config(conf_file, port)
rc = 1
keepalive = 10
sqlite_help.init(port)
persist_help.init(port)
client_id = "sqlite-cmsg-out-dup-v3-1-1"
client_id = "persist-cmsg-out-dup-v3-1-1"
payload = "queued message 1"
payload_b = payload.encode("UTF-8")
qos = 2
topic = "client-msg/test"
source_id = "sqlite-cmsg-v3-1-1-helper"
source_id = "persist-cmsg-v3-1-1-helper"
proto_ver = 4
keepalive = 10
@ -47,15 +46,12 @@ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=Tr
con = None
try:
#con = sqlite3.connect(f"file:{port}/mosquitto.sqlite3?mode=ro", uri=True)
#cur = con.cursor()
# Connect and set up subscription, then disconnect
sock = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=5, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
sock.close()
#sqlite_help.check_counts(cur, clients=1, client_msgs=0, base_msgs=0, retains=0, subscriptions=1)
#persist_help.check_counts(port, clients=1, client_msgs=0, base_msgs=0, retains=0, subscriptions=1)
# Helper - send message then disconnect
sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=5, port=port)
@ -63,13 +59,13 @@ try:
mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "pubcomp")
sock.close()
#sqlite_help.check_counts(cur, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
#persist_help.check_counts(port, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
# Reconnect, receive publish, disconnect
sock = mosq_test.do_client_connect(connect1_packet, connack1_packet2, timeout=5, port=port)
mosq_test.expect_packet(sock, "publish 1", publish_packet_r1)
#sqlite_help.check_counts(cur, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
#persist_help.check_counts(port, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
# Reconnect, receive publish, disconnect - dup should now be set
sock = mosq_test.do_client_connect(connect1_packet, connack1_packet2, timeout=5, port=port)
@ -86,21 +82,19 @@ try:
(stdo, stde) = broker.communicate()
broker = None
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
sqlite_help.check_counts(cur, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
persist_help.check_counts(port, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
# Check client
sqlite_help.check_client(cur, client_id, None, 0, 0, port, 0, 2, 1, -1, 0)
persist_help.check_client(port, client_id, None, 0, 0, port, 0, 2, 1, -1, 0)
# Check subscription
sqlite_help.check_subscription(cur, client_id, topic, qos, 0)
persist_help.check_subscription(port, client_id, topic, qos, 0)
# Check stored message
store_id = sqlite_help.check_store_msg(cur, 0, topic, payload_b, source_id, None, len(payload_b), source_mid, port, qos, 0)
store_id = persist_help.check_store_msg(port, 0, topic, payload_b, source_id, None, len(payload_b), source_mid, port, qos, 0)
# Check client msg
sqlite_help.check_client_msg(cur, client_id, store_id, 1, sqlite_help.dir_out, 1, qos, 0, sqlite_help.ms_wait_for_pubrec)
persist_help.check_client_msg(port, client_id, store_id, 1, persist_help.dir_out, 1, qos, 0, persist_help.ms_wait_for_pubrec)
rc = broker_terminate_rc
finally:
@ -113,10 +107,9 @@ finally:
if con is not None:
con.close()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -4,7 +4,7 @@
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -1,35 +1,35 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import sqlite3
import sqlite_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
persist_help.write_config(conf_file, port)
rc = 1
keepalive = 10
sqlite_help.init(port)
persist_help.init(port)
client_id = "sqlite-cmsg-v3-1-1"
client_id = "persist-cmsg-v3-1-1"
payload = "queued message 1"
payload_b = payload.encode("UTF-8")
qos = 1
topic = "client-msg/test"
source_id = "sqlite-cmsg-v3-1-1-helper"
source_id = "persist-cmsg-v3-1-1-helper"
proto_ver = 4
keepalive = 10
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=qos, proto_ver=proto_ver)
connect2_packet = mosq_test.gen_connect(source_id, keepalive=keepalive, proto_ver=proto_ver)
connect2_packet = mosq_test.gen_connect(source_id, proto_ver=proto_ver)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
mid = 18
@ -56,23 +56,20 @@ try:
(stdo, stde) = broker.communicate()
broker = None
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
sqlite_help.check_counts(cur, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
persist_help.check_counts(port, clients=1, client_msgs=1, base_msgs=1, retains=0, subscriptions=1)
# Check client
sqlite_help.check_client(cur, client_id, None, 0, 0, port, 0, 2, 1, -1, 0)
persist_help.check_client(port, client_id, None, 0, 0, port, 0, 2, 1, -1, 0)
# Check subscription
sqlite_help.check_subscription(cur, client_id, topic, qos, 0)
persist_help.check_subscription(port, client_id, topic, qos, 0)
# Check stored message
store_id = sqlite_help.check_store_msg(cur, 0, topic, payload_b, source_id, None, len(payload_b), mid, port, qos, 0)
store_id = persist_help.check_store_msg(port, 0, topic, payload_b, source_id, None, len(payload_b), mid, port, qos, 0)
# Check client msg
sqlite_help.check_client_msg(cur, client_id, store_id, 0, sqlite_help.dir_out, 1, qos, 0, sqlite_help.ms_queued)
persist_help.check_client_msg(port, client_id, store_id, 0, persist_help.dir_out, 1, qos, 0, persist_help.ms_queued)
con.close()
rc = broker_terminate_rc
finally:
if broker is not None:
@ -81,10 +78,8 @@ finally:
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
if con is not None:
con.close()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))

@ -4,7 +4,7 @@
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
@ -93,7 +93,7 @@ finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)

@ -4,7 +4,7 @@
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -3,7 +3,7 @@
# Connect a client, check it is restored, clear the client, check it is not there.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -3,7 +3,7 @@
# Connect a client, check it is restored, clear the client, check it is not there.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
@ -18,6 +18,7 @@ client_id = "persist-client-v5-0"
proto_ver = 5
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_props += mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 10000)
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False, properties=connect_props)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
@ -40,6 +41,9 @@ try:
print("broker not terminated")
broker_terminate_rc = 1
persist_help.check_counts(port, clients=1, client_msgs=0, base_msgs=0, retains=0, subscriptions=0)
persist_help.check_client(port, "persist-client-v5-0", None, 0, 1, port, 10000, 2, 1, 60, 0)
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
@ -64,6 +68,8 @@ try:
print("broker not terminated")
broker_terminate_rc = 1
persist_help.check_counts(port, clients=0, client_msgs=0, base_msgs=0, retains=0, subscriptions=0)
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
@ -72,12 +78,13 @@ try:
mosq_test.do_ping(sock)
sock.close()
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)

@ -3,7 +3,7 @@
# Publish a retained messages, check they are restored, with properties attached
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
@ -24,10 +24,10 @@ props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 1)
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CONTENT_TYPE, "plain/text")
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_RESPONSE_TOPIC, "/dev/null")
#props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CORRELATION_DATA, "2357289375902345")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value4")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value3")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value2")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value1")
publish_packet = mosq_test.gen_publish(topic, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver, properties=props)
mid = 1

@ -1,41 +1,41 @@
#!/usr/bin/env python3
# Publish a retained messages, check they are restored
# Publish retained messages, check they are restored, delete one
from mosq_test_helper import *
import sqlite_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
persist_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
persist_help.init(port)
topic1 = "test/retain1"
topic2 = "test/retain2"
source_id = "persist-retain-v5-0"
source_id = "persist-retain-delete"
qos = 0
payload1 = "retained message 1"
payload2 = "retained message 2"
proto_ver = 5
proto_ver = 4
connect_packet = mosq_test.gen_connect(source_id, proto_ver=proto_ver, clean_session=True)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
publish1_packet = mosq_test.gen_publish(topic1, qos=qos, payload=payload1, retain=True, proto_ver=proto_ver)
publish2_packet = mosq_test.gen_publish(topic2, qos=qos, payload=payload2, retain=True, proto_ver=proto_ver)
publish2_clear_packet = mosq_test.gen_publish(topic2, qos=qos, payload="", retain=True, proto_ver=proto_ver)
publish2_clear_echo = mosq_test.gen_publish(topic2, qos=qos, payload="", retain=False, proto_ver=proto_ver)
publish2_clear_packet = mosq_test.gen_publish(topic2, qos=qos, retain=True, proto_ver=proto_ver)
publish2_clear_echo = mosq_test.gen_publish(topic2, qos=qos, retain=False, proto_ver=proto_ver)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=proto_ver)
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=4)
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=4)
mid = 2
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=proto_ver)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=proto_ver)
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=4)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=4)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
@ -56,12 +56,22 @@ try:
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Subscribe
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Check retained messages exist
mosq_test.receive_unordered(sock, publish1_packet, publish2_packet, "publish 1 / 2")
mosq_test.receive_unordered(sock, publish1_packet, publish2_packet, "publish 1a / 2")
mosq_test.do_ping(sock)
# Clear retained (and wait for the publish to avoid race condition)
@ -81,8 +91,9 @@ try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Subscribe
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Check single retained message exists
mosq_test.expect_packet(sock, "publish 1", publish1_packet)
# Check retained message exist
mosq_test.expect_packet(sock, "publish 1b", publish1_packet)
# If the other retained message still exists, this will cause an error
mosq_test.do_ping(sock)
rc = broker_terminate_rc
@ -94,7 +105,7 @@ finally:
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))

@ -3,7 +3,7 @@
# Publish a retained messages, check they are restored
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -3,7 +3,7 @@
# Publish a retained messages, check they are restored
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')

@ -4,7 +4,7 @@
# message with a different client, check it is received.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
def helper(port, packets):
helper_id = "persist-subscription-v3-1-1-helper"

@ -4,7 +4,7 @@
# message with a different client, check it is received.
from mosq_test_helper import *
import persist_help
persist_help = persist_module()
def helper(port, packets):
helper_id = "persist-subscription-v5-0-helper"

@ -1,44 +0,0 @@
#!/usr/bin/env python3
# Check whether the sqlite plugin cleans everything up before closing - this
# means the WAL journal file will not exist when it has closed.
from mosq_test_helper import *
import json
import shutil
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
proto_ver = 4
connect_packet = mosq_test.gen_connect("sqlite-clean-shutdown", proto_ver=4)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=4)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
# Check broker is running
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
sock.close()
except mosq_test.TestError:
pass
finally:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc = sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,96 +0,0 @@
#!/usr/bin/env python3
# Connect a client, start a QoS 2 flow, disconnect, restore, carry on with the
# QoS 2 flow. Is it received?
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
client_id = "persist-client-msg-in-v3-1-1"
proto_ver = 4
helper_id = "persist-client-msg-in-v3-1-1-helper"
topic = "client-msg-in/2"
qos = 2
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
publish1_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message1", mid=mid, proto_ver=proto_ver)
pubrec1_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel1_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp1_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
mid = 2
publish2_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message2", mid=mid, proto_ver=proto_ver)
pubrec2_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel2_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp2_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos=qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid=mid, qos=qos, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, start flow, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
mosq_test.do_send_receive(sock, publish1_packet, pubrec1_packet, "pubrec1 send")
mosq_test.do_send_receive(sock, publish2_packet, pubrec2_packet, "pubrec2 send")
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect helper and subscribe
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port, connack_error="helper connack")
mosq_test.do_send_receive(helper, subscribe_packet, suback_packet, "suback helper")
# Complete the flow
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
mosq_test.do_send_receive(sock, pubrel1_packet, pubcomp1_packet, "pubrel1 send")
mosq_test.do_send_receive(sock, pubrel2_packet, pubcomp2_packet, "pubrel2 send")
mosq_test.expect_packet(helper, "publish1 receive", publish1_packet)
mosq_test.expect_packet(helper, "publish2 receive", publish2_packet)
helper.send(pubrec1_packet)
mosq_test.do_receive_send(helper, pubrel1_packet, pubcomp1_packet, "pubcomp1 receive")
helper.send(pubrec2_packet)
mosq_test.do_receive_send(helper, pubrel2_packet, pubcomp2_packet, "pubcomp2 receive")
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,97 +0,0 @@
#!/usr/bin/env python3
# Connect a client, start a QoS 2 flow, disconnect, restore, carry on with the
# QoS 2 flow. Is it received?
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
client_id = "persist-client-msg-in-v5-0"
proto_ver = 5
helper_id = "persist-client-msg-in-v5-0-helper"
topic = "client-msg-in/2"
qos = 2
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False, properties=connect_props)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
publish1_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message1", mid=mid, proto_ver=proto_ver)
pubrec1_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel1_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp1_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
mid = 2
publish2_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message2", mid=mid, proto_ver=proto_ver)
pubrec2_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel2_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp2_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos=qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid=mid, qos=qos, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, start flow, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
mosq_test.do_send_receive(sock, publish1_packet, pubrec1_packet, "pubrec1 send")
mosq_test.do_send_receive(sock, publish2_packet, pubrec2_packet, "pubrec2 send")
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect helper and subscribe
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port, connack_error="helper connack")
mosq_test.do_send_receive(helper, subscribe_packet, suback_packet, "suback helper")
# Complete the flow
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
mosq_test.do_send_receive(sock, pubrel1_packet, pubcomp1_packet, "pubrel1 send")
mosq_test.do_send_receive(sock, pubrel2_packet, pubcomp2_packet, "pubrel2 send")
mosq_test.expect_packet(helper, "publish1 receive", publish1_packet)
mosq_test.expect_packet(helper, "publish2 receive", publish2_packet)
helper.send(pubrec1_packet)
mosq_test.do_receive_send(helper, pubrel1_packet, pubcomp1_packet, "pubcomp1 receive")
helper.send(pubrec2_packet)
mosq_test.do_receive_send(helper, pubrel2_packet, pubcomp2_packet, "pubcomp2 receive")
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,107 +0,0 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
keepalive = 10
client_id = "persist-client-msg-v3-1-1"
proto_ver = 4
helper_id = "persist-client-msg-v3-1-1-helper"
topic0 = "client-msg/0"
topic1 = "client-msg/1"
topic2 = "client-msg/2"
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
mid = 1
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, subscribe, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
sock.close()
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
helper.send(publish_packet0)
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
helper.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
# Does the client get the messages
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
sock.close()
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
# If there are messages, the ping will fail
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,107 +0,0 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
keepalive = 10
client_id = "persist-client-msg-v3-1-1"
proto_ver = 4
helper_id = "persist-client-msg-v3-1-1-helper"
topic0 = "client-msg/0"
topic1 = "client-msg/1"
topic2 = "client-msg/2"
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
mid = 1
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, subscribe, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
sock.close()
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
helper.send(publish_packet0)
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
helper.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
# Does the client get the messages
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
sock.close()
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
# If there are messages, the ping will fail
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,107 +0,0 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
client_id = "persist-client-msg-v5-0"
proto_ver = 5
helper_id = "persist-client-msg-v5-0-helper"
topic0 = "client-msg/0"
topic1 = "client-msg/1"
topic2 = "client-msg/2"
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_packet = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False, properties=connect_props)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=proto_ver, clean_session=True)
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
mid = 1
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, subscribe, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
sock.close()
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port, connack_error="helper connack")
helper.send(publish_packet0)
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
helper.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
# Does the client get the messages
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
sock.close()
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 3")
# If there are messages, the ping will fail
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,94 +0,0 @@
#!/usr/bin/env python3
# Connect a client, check it is restored, clear the client, check it is not there.
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
keepalive = 10
client_id = "persist-client-v3-1-1"
proto_ver = 4
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
connect_packet_clean = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
mosq_test.do_ping(sock)
sock.close()
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 3")
mosq_test.do_ping(sock)
sock.close()
# Clear the client
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 4")
mosq_test.do_ping(sock)
sock.close()
# Connect client, it should not have a session
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 5")
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client, it should not have a session
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 6")
mosq_test.do_ping(sock)
sock.close()
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (3)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,74 +0,0 @@
#!/usr/bin/env python3
# Connect a single client with session expiry interval > 0 and check the
# persisted DB is correct
from mosq_test_helper import *
import sqlite3
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
keepalive = 10
sqlite_help.init(port)
keepalive = 10
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
props += mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 10000)
connect_packet = mosq_test.gen_connect("sqlite-client-v5-0", keepalive=keepalive, proto_ver=5, properties=props)
props = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10)
props += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
#props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props, property_helper=False)
connect_packet_clean = mosq_test.gen_connect("sqlite-client-v5-0-clean", keepalive=keepalive, proto_ver=5)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port, connack_error="connack 1")
sock.close()
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet, timeout=5, port=port, connack_error="connack 2")
sock.close()
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
(stdo, stde) = broker.communicate()
broker = None
# Verify sqlite db
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
sqlite_help.check_counts(cur, clients=1, client_msgs=0, base_msgs=0, retains=0, subscriptions=0)
# Check client
sqlite_help.check_client(cur, "sqlite-client-v5-0", None, 0, 1, port, 10000, 2, 1, 60, 0)
con.close()
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
if con is not None:
con.close()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,90 +0,0 @@
#!/usr/bin/env python3
# Connect a client, check it is restored, clear the client, check it is not there.
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
keepalive = 10
client_id = "persist-client-v5-0"
proto_ver = 5
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False, properties=connect_props)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
connect_packet_clean = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
mosq_test.do_ping(sock)
sock.close()
# Clear the client
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 3")
mosq_test.do_ping(sock)
sock.close()
# Connect client, it should not have a session
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 4")
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client, it should not have a session
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 5")
mosq_test.do_ping(sock)
sock.close()
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (3)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,91 +0,0 @@
#!/usr/bin/env python3
# Publish a retained messages, check they are restored, with properties attached
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
topic = "test/retainprop"
source_id = "persist-retain-properties-v5-0"
qos = 0
proto_ver = 5
connect_packet = mosq_test.gen_connect(source_id, proto_ver=proto_ver, clean_session=True)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 1)
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CONTENT_TYPE, "plain/text")
props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_RESPONSE_TOPIC, "/dev/null")
#props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_CORRELATION_DATA, "2357289375902345")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
props += mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "name", "value")
publish_packet = mosq_test.gen_publish(topic, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver, properties=props)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "test/retainprop", 0, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=proto_ver)
mid = 2
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "test/retainprop", proto_ver=proto_ver)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Check no retained messages exist
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Ping will fail if a PUBLISH is received
mosq_test.do_ping(sock)
# Unsubscribe, so we don't receive the messages
mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback")
# Send some retained messages
sock.send(publish_packet)
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Subscribe
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Check retained messages exist
mosq_test.expect_packet(sock, "publish", publish_packet)
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,94 +0,0 @@
#!/usr/bin/env python3
# Publish a retained messages, check they are restored
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
keepalive = 10
topic1 = "test/retain1"
topic2 = "test/retain2"
topic3 = "test/retain3"
source_id = "persist-retain-v3-1-1"
qos = 0
payload2 = "retained message 2"
payload3 = "retained message 3"
proto_ver = 4
connect_packet = mosq_test.gen_connect(source_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
publish1_packet = mosq_test.gen_publish(topic1, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver)
publish2_packet = mosq_test.gen_publish(topic2, qos=qos, payload=payload2, retain=False, proto_ver=proto_ver)
publish3_packet = mosq_test.gen_publish(topic3, qos=qos, payload=payload3, retain=True, proto_ver=proto_ver)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=4)
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=4)
mid = 2
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=4)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=4)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Check no retained messages exist
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Ping will fail if a PUBLISH is received
mosq_test.do_ping(sock)
# Unsubscribe, so we don't receive the messages
mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback")
# Send some retained messages
sock.send(publish1_packet)
mosq_test.do_ping(sock)
sock.send(publish2_packet) # Not retained
mosq_test.do_ping(sock)
sock.send(publish3_packet)
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Subscribe
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Check retained messages exist
mosq_test.receive_unordered(sock, publish1_packet, publish3_packet, "publish 1 / 3")
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,93 +0,0 @@
#!/usr/bin/env python3
# Publish a retained messages, check they are restored
from mosq_test_helper import *
import sqlite_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
topic1 = "test/retain1"
topic2 = "test/retain2"
topic3 = "test/retain3"
source_id = "persist-retain-v5-0"
qos = 0
payload2 = "retained message 2"
payload3 = "retained message 3"
proto_ver = 5
connect_packet = mosq_test.gen_connect(source_id, proto_ver=proto_ver, clean_session=True)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
publish1_packet = mosq_test.gen_publish(topic1, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver)
publish2_packet = mosq_test.gen_publish(topic2, qos=qos, payload=payload2, retain=False, proto_ver=proto_ver)
publish3_packet = mosq_test.gen_publish(topic3, qos=qos, payload=payload3, retain=True, proto_ver=proto_ver)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=proto_ver)
mid = 2
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=proto_ver)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Check no retained messages exist
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Ping will fail if a PUBLISH is received
mosq_test.do_ping(sock)
# Unsubscribe, so we don't receive the messages
mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback")
# Send some retained messages
sock.send(publish1_packet)
mosq_test.do_ping(sock)
sock.send(publish2_packet) # Not retained
mosq_test.do_ping(sock)
sock.send(publish3_packet)
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Subscribe
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Check retained messages exist
mosq_test.receive_unordered(sock, publish1_packet, publish3_packet, "publish 1 / 3")
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,135 +0,0 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, restore, reconnect, send a
# message with a different client, check it is received.
from mosq_test_helper import *
import sqlite_help
def helper(port, packets):
helper_id = "persist-subscription-v3-1-1-helper"
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=4, clean_session=True)
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, packets["connack1"], timeout=5, port=port)
helper.send(packets["publish0"])
mosq_test.do_send_receive(helper, packets["publish1"], packets["puback1"], "puback helper")
mosq_test.do_send_receive(helper, packets["publish2"], packets["pubrec2"], "pubrec helper")
mosq_test.do_send_receive(helper, packets["pubrel2"], packets["pubcomp2"], "pubcomp helper")
helper.close()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
client_id = "persist-subscription-v3-1-1"
proto_ver = 4
topic0 = "subscription/0"
topic1 = "subscription/1"
topic2 = "subscription/2"
packets = {}
packets["connect"] = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False)
packets["connack1"] = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
packets["connack2"] = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
packets["subscribe0"] = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
packets["suback0"] = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
packets["subscribe1"] = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
packets["suback1"] = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
packets["subscribe2"] = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
packets["suback2"] = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
packets["unsubscribe2"] = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver)
packets["unsuback2"] = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver)
packets["publish0"] = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
mid = 1
packets["publish1"] = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
packets["puback1"] = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
packets["publish2"] = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
packets["pubrec2"] = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
packets["pubrel2"] = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
packets["pubcomp2"] = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client
sock = mosq_test.do_client_connect(packets["connect"], packets["connack1"], timeout=5, port=port)
mosq_test.do_send_receive(sock, packets["subscribe0"], packets["suback0"], "suback 0")
mosq_test.do_send_receive(sock, packets["subscribe1"], packets["suback1"], "suback 1")
mosq_test.do_send_receive(sock, packets["subscribe2"], packets["suback2"], "suback 2")
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port)
mosq_test.do_ping(sock)
helper(port, packets)
# Does the client get the messages
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
mosq_test.do_receive_send(sock, packets["publish2"], packets["pubrec2"], "publish 2")
mosq_test.do_receive_send(sock, packets["pubrel2"], packets["pubcomp2"], "pubrel 2")
# Unsubscribe
mosq_test.do_send_receive(sock, packets["unsubscribe2"], packets["unsuback2"], "unsuback 2")
sock.close()
# Kill broker
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port)
mosq_test.do_ping(sock)
# Connect helper and publish
helper(port, packets)
# Does the client get the messages
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (3)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -1,146 +0,0 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, restore, reconnect, send a
# message with a different client, check it is received.
from mosq_test_helper import *
import sqlite_help
def helper(port, packets):
helper_id = "persist-subscription-v5-0-helper"
connect_packet_helper = mosq_test.gen_connect(helper_id, proto_ver=5, clean_session=True)
connack_packet_helper = mosq_test.gen_connack(rc=0, proto_ver=5)
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet_helper, timeout=5, port=port, connack_error="helper connack")
helper.send(packets["publish0-helper"])
mosq_test.do_send_receive(helper, packets["publish1-helper"], packets["puback1"], "puback helper")
mosq_test.do_send_receive(helper, packets["publish2-helper"], packets["pubrec2"], "pubrec helper")
mosq_test.do_send_receive(helper, packets["pubrel2"], packets["pubcomp2"], "pubcomp helper")
helper.close()
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
sqlite_help.write_config(conf_file, port)
rc = 1
sqlite_help.init(port)
client_id = "persist-subscription-v5-0"
proto_ver = 5
topic0 = "subscription/0"
topic1 = "subscription/1"
topic2 = "subscription/2"
packets = {}
connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
packets["connect"] = mosq_test.gen_connect(client_id, proto_ver=proto_ver, clean_session=False, properties=connect_props)
packets["connack1"] = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
packets["connack2"] = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
publish_props0 = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 100)
packets["subscribe0"] = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver, properties=publish_props0)
packets["suback0"] = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
publish_props1 = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 101)
packets["subscribe1"] = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver, properties=publish_props1)
packets["suback1"] = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
publish_props2 = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 102)
packets["subscribe2"] = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver, properties=publish_props2)
packets["suback2"] = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
packets["unsubscribe2"] = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver)
packets["unsuback2"] = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver)
packets["publish0-helper"] = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
packets["publish0"] = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver, properties=publish_props0)
mid = 1
packets["publish1-helper"] = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
packets["publish1"] = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver, properties=publish_props1)
packets["puback1"] = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
packets["publish2-helper"] = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
packets["publish2"] = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver, properties=publish_props2)
packets["pubrec2"] = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
packets["pubrel2"] = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
packets["pubcomp2"] = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client
sock = mosq_test.do_client_connect(packets["connect"], packets["connack1"], timeout=5, port=port, connack_error="connack1")
mosq_test.do_send_receive(sock, packets["subscribe0"], packets["suback0"], "suback 0")
mosq_test.do_send_receive(sock, packets["subscribe1"], packets["suback1"], "suback 1")
mosq_test.do_send_receive(sock, packets["subscribe2"], packets["suback2"], "suback 2")
sock.close()
# Kill broker
broker.terminate()
broker_terminate_rc = 0
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port)
mosq_test.do_ping(sock)
helper(port, packets)
# Does the client get the messages
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
mosq_test.do_receive_send(sock, packets["publish2"], packets["pubrec2"], "publish 2")
mosq_test.do_receive_send(sock, packets["pubrel2"], packets["pubcomp2"], "pubrel 2")
# Unsubscribe
mosq_test.do_send_receive(sock, packets["unsubscribe2"], packets["unsuback2"], "unsuback 2")
sock.close()
# Kill broker
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (2)")
broker_terminate_rc = 1
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(packets["connect"], packets["connack2"], timeout=5, port=port, connack_error="connack2")
mosq_test.do_ping(sock)
# Connect helper and publish
helper(port, packets)
# Does the client get the messages
mosq_test.expect_packet(sock, "publish 0", packets["publish0"])
mosq_test.do_receive_send(sock, packets["publish1"], packets["puback1"], "publish 1")
mosq_test.do_ping(sock)
rc = broker_terminate_rc
finally:
if broker is not None:
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated (3)")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += sqlite_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -2,33 +2,31 @@ add_subdirectory(c)
file(GLOB PY_TEST_FILES [0-9][0-9]-*.py)
file(GLOB PY_PERSIST_TEST_FILES 15-*.py)
list(APPEND PY_TEST_FILES "${CMAKE_CURRENT_SOURCE_DIR}/msg_sequence_test.py")
set(PERSIST_LIST
persist_sqlite
)
set(EXCLUDE_LIST
01-connect-uname-password-success-no-tls
03-publish-qos1-queued-bytes
09-extended-auth-single2
15-persist-client-msg-in-v3-1-1
15-persist-client-msg-in-v5-0
15-persist-client-msg-out-queue-v3-1-1
15-persist-client-msg-out-v3-1-1
15-persist-client-msg-out-v5-0
15-persist-client-v3-1-1
15-persist-client-v5-0
15-persist-publish-properties-v5-0
15-persist-retain-v3-1-1
15-persist-retain-v5-0
15-persist-subscription-v3-1-1
15-persist-subscription-v5-0
# Not a test
06-bridge-clean-session-core
08-ssl-bridge-helper
)
foreach(PY_PERSIST_TEST_FILE ${PY_PERSIST_TEST_FILES})
get_filename_component(PY_PERSIST_TEST_NAME ${PY_PERSIST_TEST_FILE} NAME_WE)
list(APPEND EXCLUDE_LIST ${PY_PERSIST_TEST_NAME})
endforeach()
foreach(PY_TEST_FILE ${PY_TEST_FILES})
get_filename_component(PY_TEST_NAME ${PY_TEST_FILE} NAME_WE)
if(${PY_TEST_NAME} IN_LIST EXCLUDE_LIST)
if(${PY_TEST_NAME} IN_LIST EXCLUDE_LIST OR ${PY_TEST_NAME} IN_LIST SQLITE_LIST)
continue()
endif()
add_test(NAME broker-${PY_TEST_NAME}
@ -39,3 +37,16 @@ foreach(PY_TEST_FILE ${PY_TEST_FILES})
ENVIRONMENT "BUILD_ROOT=${CMAKE_BINARY_DIR}"
)
endforeach()
foreach(PERSIST_TYPE ${PERSIST_LIST})
foreach(PY_TEST_FILE ${PY_PERSIST_TEST_FILES})
get_filename_component(PY_TEST_NAME ${PY_TEST_FILE} NAME_WE)
add_test(NAME broker-${PY_TEST_NAME}-${PERSIST_TYPE}
COMMAND ${PY_TEST_FILE} ${PERSIST_TYPE}
)
set_tests_properties(broker-${PY_TEST_NAME}-${PERSIST_TYPE}
PROPERTIES
ENVIRONMENT "BUILD_ROOT=${CMAKE_BINARY_DIR}"
)
endforeach()
endforeach()

@ -259,33 +259,21 @@ endif
endif
15 :
#./15-persist-client-msg-in-v5-0.py
#./15-persist-client-msg-out-queue-v3-1-1.py
#./15-persist-client-msg-out-v3-1-1.py
#./15-persist-client-msg-out-v5-0.py
#./15-persist-client-v3-1-1.py
#./15-persist-client-v5.0.py
#./15-persist-publish-properties-v5-0.py
#./15-persist-retain-v3-1-1.py
#./15-persist-retain-v5-0.py
#./15-persist-subscription-v3-1-1.py
#./15-persist-subscription-v5-0.py
./15-sqlite-client-message-in-v3-1-1.py
./15-sqlite-client-message-out-dup-v3-1-1.py
./15-sqlite-client-message-out-v3-1-1.py
./15-sqlite-client-msg-in-v5-0.py
./15-sqlite-client-msg-out-queue-v3-1-1.py
./15-sqlite-client-msg-out-v3-1-1.py
./15-sqlite-client-msg-out-v5-0.py
./15-sqlite-client-v3-1-1.py
./15-sqlite-client-v5-0.py
./15-sqlite-client-v5.0.py
./15-sqlite-publish-properties-v5-0.py
./15-sqlite-retain-clear-v5-0.py
./15-sqlite-retain-v3-1-1.py
./15-sqlite-retain-v5-0.py
./15-sqlite-subscription-v3-1-1.py
./15-sqlite-subscription-v5-0.py
./15-persist-client-msg-in-v3-1-1.py persist_sqlite
./15-persist-client-msg-in-v5-0.py persist_sqlite
./15-persist-client-msg-out-dup-v3-1-1.py persist_sqlite
./15-persist-client-msg-out-queue-v3-1-1.py persist_sqlite
./15-persist-client-msg-out-v3-1-1-db.py persist_sqlite
./15-persist-client-msg-out-v3-1-1.py persist_sqlite
./15-persist-client-msg-out-v5-0.py persist_sqlite
./15-persist-client-v3-1-1.py persist_sqlite
./15-persist-client-v5-0.py persist_sqlite
./15-persist-publish-properties-v5-0.py persist_sqlite
./15-persist-retain-clear.py persist_sqlite
./15-persist-retain-v3-1-1.py persist_sqlite
./15-persist-retain-v5-0.py persist_sqlite
./15-persist-subscription-v3-1-1.py persist_sqlite
./15-persist-subscription-v5-0.py persist_sqlite
16 :
./16-cmd-args.py

@ -20,3 +20,12 @@ from pathlib import Path
source_dir = Path(__file__).resolve().parent
ssl_dir = source_dir.parent / "ssl"
import importlib
def persist_module():
if len(sys.argv) > 1:
mod = sys.argv.pop(1)
else:
raise RuntimeError("Not enough command line arguments - need persist module")
return importlib.import_module(mod)

@ -1,13 +0,0 @@
# This can be rewritten to easily support persistence plugins with all of
# the 15-persist-* tests.
def init(port):
pass
def cleanup(port):
return 0
def write_config(filename, port):
with open(filename, 'w') as f:
#f.write("plugin ..\n")
pass

@ -1,5 +1,6 @@
import os
from pathlib import Path
import sqlite3
dir_in = 0
dir_out = 1
@ -47,13 +48,21 @@ def cleanup(port):
# some versions of sqlite3 do not remove the wal file
# thus we make sure that the file is at least empty (no pending db transactions)
rc = 0
os.remove(f"{port}/mosquitto.sqlite3-shm")
os.remove(f"{port}/mosquitto.sqlite3-wal")
try:
os.remove(f"{port}/mosquitto.sqlite3-shm")
except FileNotFoundError:
pass
try:
os.remove(f"{port}/mosquitto.sqlite3-wal")
except FileNotFoundError:
pass
os.rmdir(f"{port}")
return rc
def check_counts(cur, clients, client_msgs, base_msgs, retains, subscriptions):
def check_counts(port, clients, client_msgs, base_msgs, retains, subscriptions):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
cur.execute('SELECT COUNT(*) FROM clients')
row = cur.fetchone()
if row[0] != clients:
@ -78,12 +87,15 @@ def check_counts(cur, clients, client_msgs, base_msgs, retains, subscriptions):
row = cur.fetchone()
if row[0] != retains:
raise ValueError("Found %d retains, expected %d" % (row[0], retains))
con.close()
def check_client(cur, client_id, username, will_delay_time, session_expiry_time,
def check_client(port, client_id, username, will_delay_time, session_expiry_time,
listener_port, max_packet_size, max_qos, retain_available,
session_expiry_interval, will_delay_interval):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
cur.execute('SELECT client_id, username, will_delay_time, session_expiry_time, ' +
'listener_port, max_packet_size, max_qos, retain_available, ' +
'session_expiry_interval, will_delay_interval ' +
@ -119,9 +131,12 @@ def check_client(cur, client_id, username, will_delay_time, session_expiry_time,
if row[9] != will_delay_interval:
raise ValueError("Invalid will_delay_interval %d / %d" % (row[9], will_delay_interval))
con.close()
def check_subscription(cur, client_id, topic, subscription_options, subscription_identifier):
def check_subscription(port, client_id, topic, subscription_options, subscription_identifier):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
cur.execute('SELECT client_id, topic, subscription_options, subscription_identifier ' +
'FROM subscriptions')
row = cur.fetchone()
@ -137,9 +152,12 @@ def check_subscription(cur, client_id, topic, subscription_options, subscription
if row[3] != subscription_identifier:
raise ValueError("Invalid subscription_identifier %d / %d" % (row[3], subscription_identifier))
con.close()
def check_client_msg(cur, client_id, store_id, dup, direction, mid, qos, retain, state):
def check_client_msg(port, client_id, store_id, dup, direction, mid, qos, retain, state):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
cur.execute('SELECT client_id,store_id,dup,direction,mid,qos,retain,state ' +
'FROM client_msgs')
row = cur.fetchone()
@ -167,11 +185,14 @@ def check_client_msg(cur, client_id, store_id, dup, direction, mid, qos, retain,
if row[7] != state:
raise ValueError("Invalid state %d / %d" % (row[7], state))
con.close()
def check_store_msg(cur, expiry_time, topic, payload, source_id, source_username,
def check_store_msg(port, expiry_time, topic, payload, source_id, source_username,
payloadlen, source_mid, source_port, qos, retain, idx=0):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
cur.execute('SELECT store_id,expiry_time,topic,payload,source_id,source_username, ' +
'payloadlen, source_mid, source_port, qos, retain ' +
'FROM base_msgs')
@ -212,12 +233,16 @@ def check_store_msg(cur, expiry_time, topic, payload, source_id, source_username
if row[10] != retain:
raise ValueError("Invalid retain %d / %d" % (row[10], retain))
con.close()
return row[0]
def check_retain(cur, topic, store_id):
def check_retain(port, topic, store_id):
con = sqlite3.connect(f"{port}/mosquitto.sqlite3")
cur = con.cursor()
cur.execute('SELECT store_id FROM retains WHERE topic=?', (topic,))
row = cur.fetchone()
if row[0] != store_id:
raise ValueError("Invalid store_id %d / %d" % (row[0], store_id))
con.close()

@ -218,41 +218,28 @@ tests = [
(1, './14-dynsec-role-invalid.py'),
(1, './14-dynsec-role.py'),
#(1, './15-persist-client-msg-in-v5-0.py'),
#(1, './15-persist-client-msg-out-queue-v3-1-1.py'),
#(1, './15-persist-client-msg-out-v3-1-1.py'),
#(1, './15-persist-client-msg-out-v5-0.py'),
#(1, './15-persist-client-v3-1-1.py'),
#(1, './15-persist-client-v5.0.py'),
#(1, './15-persist-publish-properties-v5-0.py'),
#(1, './15-persist-retain-v3-1-1.py'),
#(1, './15-persist-retain-v5-0.py'),
#(1, './15-persist-subscription-v3-1-1.py'),
#(1, './15-persist-subscription-v5-0.py'),
(1, './15-sqlite-clean-shutdown.py'),
(1, './15-sqlite-client-message-in-v3-1-1.py'),
(1, './15-sqlite-client-message-out-dup-v3-1-1.py'),
(1, './15-sqlite-client-message-out-v3-1-1.py'),
(1, './15-sqlite-client-msg-in-v5-0.py'),
(1, './15-sqlite-client-msg-out-queue-v3-1-1.py'),
(1, './15-sqlite-client-msg-out-v3-1-1.py'),
(1, './15-sqlite-client-msg-out-v5-0.py'),
(1, './15-sqlite-client-v3-1-1.py'),
(1, './15-sqlite-client-v5-0.py'),
(1, './15-sqlite-client-v5.0.py'),
(1, './15-sqlite-publish-properties-v5-0.py'),
(1, './15-sqlite-retain-clear-v5-0.py'),
(1, './15-sqlite-retain-v3-1-1.py'),
(1, './15-sqlite-retain-v5-0.py'),
(1, './15-sqlite-subscription-v3-1-1.py'),
(1, './15-sqlite-subscription-v5-0.py'),
(1, './15-persist-client-msg-in-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-client-msg-in-v5-0.py', 'persist_sqlite'),
(1, './15-persist-client-msg-out-dup-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-client-msg-out-queue-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-client-msg-out-v3-1-1-db.py', 'persist_sqlite'),
(1, './15-persist-client-msg-out-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-client-msg-out-v5-0.py', 'persist_sqlite'),
(1, './15-persist-client-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-client-v5-0.py', 'persist_sqlite'),
(1, './15-persist-publish-properties-v5-0.py', 'persist_sqlite'),
(1, './15-persist-retain-clear.py', 'persist_sqlite'),
(1, './15-persist-retain-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-retain-v5-0.py', 'persist_sqlite'),
(1, './15-persist-subscription-v3-1-1.py', 'persist_sqlite'),
(1, './15-persist-subscription-v5-0.py', 'persist_sqlite'),
(1, './16-cmd-args.py'),
(1, './16-config-includedir.py'),
(1, './16-config-parse-errors.py'),
(4, './17-control-list-listeners.py'),
(1, './17-control-list-plugins.py'),
]
(4, './17-control-list-listeners.py'),
(1, './17-control-list-plugins.py'),
]
ptest.run_tests(tests)

@ -20,6 +20,11 @@ def next_test(tests, ports):
else:
args = [test[1]]
try:
args.append(test[2])
except IndexError:
pass
for i in range(0, test[0]):
proc_port = ports.pop()
proc_ports = proc_ports + (proc_port,)

Loading…
Cancel
Save