From 43c159b9a50de430348219d27910a218aeddfce5 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 25 Mar 2019 08:09:24 +0000 Subject: [PATCH] Support and tests for saving message expiry interval. --- src/database.c | 1 + src/persist.h | 1 + src/persist_read.c | 9 +- src/persist_write.c | 1 + test/broker/11-message-expiry.py | 102 ++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + .../v5-client-message-props.test-db | Bin 177 -> 185 bytes .../persist_read/v5-client-message.test-db | Bin 174 -> 182 bytes .../v5-message-store-props.test-db | Bin 177 -> 185 bytes .../persist_read/v5-message-store.test-db | Bin 108 -> 116 bytes .../unit/files/persist_read/v5-retain.test-db | Bin 124 -> 132 bytes test/unit/persist_write_test.c | 10 +- 13 files changed, 122 insertions(+), 4 deletions(-) create mode 100755 test/broker/11-message-expiry.py diff --git a/src/database.c b/src/database.c index ed4bc70d..ea7fd078 100644 --- a/src/database.c +++ b/src/database.c @@ -938,6 +938,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) tail = context->inflight_msgs; while(tail){ msg_count++; + expiry_interval = 0; if(tail->store->message_expiry_time){ if(now == 0){ now = time(NULL); diff --git a/src/persist.h b/src/persist.h index 03c05fc9..66eff87c 100644 --- a/src/persist.h +++ b/src/persist.h @@ -87,6 +87,7 @@ struct P_client_msg{ struct PF_msg_store{ dbid_t store_id; + uint64_t expiry_time; uint32_t payloadlen; uint16_t source_mid; uint16_t source_id_len; diff --git a/src/persist_read.c b/src/persist_read.c index d0722c6a..4bfe0273 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -226,6 +226,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp struct P_msg_store chunk; struct mosquitto_msg_store *stored = NULL; struct mosquitto_msg_store_load *load; + uint32_t message_expiry_interval; int rc = 0; int i; @@ -260,9 +261,15 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp return MOSQ_ERR_NOMEM; } + if(chunk.F.expiry_time > 0){ + message_expiry_interval = chunk.F.expiry_time - time(NULL); + }else{ + message_expiry_interval = 0; + } + rc = db__message_store(db, &chunk.source, chunk.F.source_mid, chunk.topic, chunk.F.qos, chunk.F.payloadlen, - &chunk.payload, chunk.F.retain, &stored, 0, chunk.properties, chunk.F.store_id); + &chunk.payload, chunk.F.retain, &stored, message_expiry_interval, chunk.properties, chunk.F.store_id); if(stored){ stored->source_listener = chunk.source.listener; diff --git a/src/persist_write.c b/src/persist_write.c index 1cf4f646..aac3554c 100644 --- a/src/persist_write.c +++ b/src/persist_write.c @@ -115,6 +115,7 @@ static int persist__message_store_save(struct mosquitto_db *db, FILE *db_fptr) } chunk.F.store_id = stored->db_id; + chunk.F.expiry_time = stored->message_expiry_time; chunk.F.payloadlen = stored->payloadlen; chunk.F.source_mid = stored->source_mid; if(stored->source_id){ diff --git a/test/broker/11-message-expiry.py b/test/broker/11-message-expiry.py new file mode 100755 index 00000000..a184a31b --- /dev/null +++ b/test/broker/11-message-expiry.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# Test whether the broker reduces the message expiry interval when republishing. +# MQTT v5, with a broker restart and persistence. + +# Client connects with clean session set false, subscribes with qos=1, then disconnects +# Helper publishes two messages, one with a short expiry and one with a long expiry +# We wait until the short expiry will have expired but the long one not. +# Client reconnects, expects delivery of the long expiry message with a reduced +# expiry interval property. + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("port %d\n" % (port)) + f.write("persistence true\n") + f.write("persistence_file mosquitto-%d.db\n" % (port)) + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port) + + +rc = 1 +keepalive = 60 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60) +connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False, properties=props) +connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) +connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1) + +mid = 53 +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1, proto_ver=5) +suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=5) + + + +helper_connect = mosq_test.gen_connect("helper", proto_ver=5) +helper_connack = mosq_test.gen_connack(rc=0, proto_ver=5) + +mid=1 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 1) +publish1s_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message1", proto_ver=5, properties=props) +puback1s_packet = mosq_test.gen_puback(mid) + +mid=2 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 10) +publish2s_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message2", proto_ver=5, properties=props) +puback2s_packet = mosq_test.gen_puback(mid) + +mid=3 +publish3_packet = mosq_test.gen_publish("subpub/qos1", mid=mid, qos=1, payload="message3", proto_ver=5) +puback3_packet = mosq_test.gen_puback(mid) + + +if os.path.exists('mosquitto-%d.db' % (port)): + os.unlink('mosquitto-%d.db' % (port)) + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack1_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + sock.close() + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1") + mosq_test.do_send_receive(helper, publish2s_packet, puback2s_packet, "puback 2") + mosq_test.do_send_receive(helper, publish3_packet, puback3_packet, "puback 3") + + broker.terminate() + broker.wait() + (stdo1, stde1) = broker.communicate() + sock.close() + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + time.sleep(2) + + sock = mosq_test.do_client_connect(connect_packet, connack2_packet, timeout=20, port=port) + packet = sock.recv(len(publish2s_packet)) + for i in range(9, 1, -1): + props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, i) + publish2r_packet = mosq_test.gen_publish("subpub/qos1", mid=2, qos=1, payload="message2", proto_ver=5, properties=props) + if packet == publish2r_packet: + mosq_test.expect_packet(sock, "publish3", publish3_packet) + rc = 0 + break + + sock.close() +finally: + os.remove(conf_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + if os.path.exists('mosquitto-%d.db' % (port)): + os.unlink('mosquitto-%d.db' % (port)) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index 3b15a867..1fd1df8e 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -178,6 +178,7 @@ endif ./10-listener-mount-point.py 11 : + ./11-message-expiry.py ./11-persistent-subscription.py ./11-persistent-subscription-v5.py ./11-persistent-subscription-no-local.py diff --git a/test/broker/test.py b/test/broker/test.py index 211237e0..0c86329b 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -146,6 +146,7 @@ tests = [ (2, './10-listener-mount-point.py'), + (1, './11-message-expiry.py'), (1, './11-persistent-subscription.py'), (1, './11-persistent-subscription-v5.py'), (1, './11-persistent-subscription-no-local.py'), diff --git a/test/unit/files/persist_read/v5-client-message-props.test-db b/test/unit/files/persist_read/v5-client-message-props.test-db index 26db31814294f83fec4d6a7941104b76c7f656d7..b34f6db8dd02e26ef52404fd5833b19476ba7690 100644 GIT binary patch delta 20 WcmdnUxRY^$8JBGc0~AcOs008pX#@}e delta 12 TcmdnVxRG&!8Ix(qM2AWM8o>kz diff --git a/test/unit/files/persist_read/v5-client-message.test-db b/test/unit/files/persist_read/v5-client-message.test-db index 3e8e58656e5d9e7875bde806f9b4e3ea33e54c0e..1aac12c4e9a07f003fedcb2f503b3f38dc331506 100644 GIT binary patch delta 20 WcmZ3-xQ%gw8JBGc0~AcOC<6d6^aKY0 delta 12 TcmdnSxQ=mx8Ix(qM29i}8hiu) diff --git a/test/unit/files/persist_read/v5-message-store-props.test-db b/test/unit/files/persist_read/v5-message-store-props.test-db index 683544e200e0226628b298ffd47e05114c44276a..a5c296a817bf249effa9b5d10ecefd6d161bc6d9 100644 GIT binary patch delta 20 WcmdnUxRY^$8J7bi0~AcOs008ltOLLR delta 12 TcmdnVxRG&!8IuL$M2AWM8H5A4 diff --git a/test/unit/files/persist_read/v5-message-store.test-db b/test/unit/files/persist_read/v5-message-store.test-db index dc9b7f77f0cc0b89976ca4262a6559865a5ba584..f16b3907d03aa5034a77558fd4c212b95818ac23 100644 GIT binary patch delta 18 Tcmc~PnPA3c%g6u)6D>>tA&>&> delta 10 RcmXTPnPA3b$~e)%1OO4(0^|Sy diff --git a/test/unit/files/persist_read/v5-retain.test-db b/test/unit/files/persist_read/v5-retain.test-db index fa83928892e0fdfd8d0c8519b22d3629267326cd..f9d8a4f58badbd1ed38c13807dff0bfa488d7ff8 100644 GIT binary patch delta 19 Ucmbsource_listener); - if(db.msg_store->source_listener){ - CU_ASSERT_EQUAL(db.msg_store->source_listener->port, 1883); + + CU_ASSERT_PTR_NOT_NULL(db.msg_store); + if(db.msg_store){ + CU_ASSERT_PTR_NOT_NULL(db.msg_store->source_listener); + if(db.msg_store->source_listener){ + CU_ASSERT_EQUAL(db.msg_store->source_listener->port, 1883); + } } config.persistence_filepath = "v5-client-message-props.db";