diff --git a/src/persist.h b/src/persist.h index 3e62046c..9fedc3ea 100644 --- a/src/persist.h +++ b/src/persist.h @@ -104,9 +104,11 @@ struct P_msg_store{ struct PF_sub{ + uint32_t identifier; uint16_t id_len; uint16_t topic_len; uint8_t qos; + uint8_t options; }; struct P_sub{ struct PF_sub F; diff --git a/src/persist_read.c b/src/persist_read.c index e1fe1aa5..a8514691 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -39,7 +39,7 @@ static uint32_t db_version; const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'}; -static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos); +static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos, uint32_t identifier, int options); static struct mosquitto *persist__find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid) { @@ -113,7 +113,7 @@ static int persist__client_msg_restore(struct mosquitto_db *db, const char *clie struct mosquitto_msg_store_load *load; struct mosquitto *context; - cmsg = mosquitto__malloc(sizeof(struct mosquitto_client_msg)); + cmsg = mosquitto__calloc(1, sizeof(struct mosquitto_client_msg)); if(!cmsg){ log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; @@ -249,7 +249,7 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp } } } - load = mosquitto__malloc(sizeof(struct mosquitto_msg_store_load)); + load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load)); if(!load){ fclose(db_fptr); mosquitto__free(chunk.source.id); @@ -327,7 +327,7 @@ static int persist__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) return rc; } - rc = persist__restore_sub(db, chunk.client_id, chunk.topic, chunk.F.qos); + rc = persist__restore_sub(db, chunk.client_id, chunk.topic, chunk.F.qos, chunk.F.identifier, chunk.F.options); mosquitto__free(chunk.client_id); mosquitto__free(chunk.topic); @@ -466,7 +466,7 @@ error: return 1; } -static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos) +static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, const char *sub, int qos, uint32_t identifier, int options) { struct mosquitto *context; @@ -476,8 +476,7 @@ static int persist__restore_sub(struct mosquitto_db *db, const char *client_id, context = persist__find_or_add_context(db, client_id, 0); if(!context) return 1; - /* FIXME - identifer, options need saving */ - return sub__add(db, context, sub, qos, 0, 0, &db->subs); + return sub__add(db, context, sub, qos, identifier, options, &db->subs); } #endif diff --git a/src/persist_read_v5.c b/src/persist_read_v5.c index ef55d9b2..32441915 100644 --- a/src/persist_read_v5.c +++ b/src/persist_read_v5.c @@ -170,6 +170,7 @@ int persist__chunk_sub_read_v5(FILE *db_fptr, struct P_sub *chunk) int rc; read_e(db_fptr, &chunk->F, sizeof(struct PF_sub)); + chunk->F.identifier = ntohl(chunk->F.identifier); chunk->F.id_len = ntohs(chunk->F.id_len); chunk->F.topic_len = ntohs(chunk->F.topic_len); diff --git a/src/persist_write.c b/src/persist_write.c index 483ab3be..8e83ed51 100644 --- a/src/persist_write.c +++ b/src/persist_write.c @@ -213,9 +213,11 @@ static int persist__subs_retain_save(struct mosquitto_db *db, FILE *db_fptr, str sub = node->subs; while(sub){ if(sub->context->clean_start == false && sub->context->id){ + sub_chunk.F.identifier = sub->identifier; sub_chunk.F.id_len = strlen(sub->context->id); sub_chunk.F.topic_len = strlen(thistopic); sub_chunk.F.qos = (uint8_t)sub->qos; + sub_chunk.F.options = sub->no_local<<2 | sub->retain_as_published<<3; sub_chunk.client_id = sub->context->id; sub_chunk.topic = thistopic; diff --git a/src/persist_write_v5.c b/src/persist_write_v5.c index 4bc6346c..90fc3a1f 100644 --- a/src/persist_write_v5.c +++ b/src/persist_write_v5.c @@ -160,6 +160,7 @@ int persist__chunk_sub_write_v5(FILE *db_fptr, struct P_sub *chunk) int id_len = chunk->F.id_len; int topic_len = chunk->F.topic_len; + chunk->F.identifier = htonl(chunk->F.identifier); chunk->F.id_len = htons(chunk->F.id_len); chunk->F.topic_len = htons(chunk->F.topic_len); diff --git a/test/broker/11-persistent-subscription-no-local.py b/test/broker/11-persistent-subscription-no-local.py new file mode 100755 index 00000000..d6c8edad --- /dev/null +++ b/test/broker/11-persistent-subscription-no-local.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python + +# Test whether a client subscribed to a topic receives its own message sent to that topic. +# And whether the no-local option is persisted. + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("port %d\n" % (port)) + f.write("persistence true\n") + f.write("persistence_file mosquitto-%d.db\n" % (port)) + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port) + +rc = 1 +keepalive = 60 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 100) +connect_packet = mosq_test.gen_connect( + "persistent-subscription-test", keepalive=keepalive, clean_session=False, proto_ver=5, properties=props +) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5) +connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=5) # session present + +mid = 1 +subscribe1_packet = mosq_test.gen_subscribe(mid, "subpub/nolocal", 5, proto_ver=5) +suback1_packet = mosq_test.gen_suback(mid, 1, proto_ver=5) + +mid = 2 +subscribe2_packet = mosq_test.gen_subscribe(mid, "subpub/local", 1, proto_ver=5) +suback2_packet = mosq_test.gen_suback(mid, 1, proto_ver=5) + +mid = 1 +publish1_packet = mosq_test.gen_publish("subpub/nolocal", qos=1, mid=mid, payload="message", proto_ver=5) +puback1_packet = mosq_test.gen_puback(mid, proto_ver=5) + +mid = 2 +publish2s_packet = mosq_test.gen_publish("subpub/local", qos=1, mid=mid, payload="message", proto_ver=5) +puback2s_packet = mosq_test.gen_puback(mid, proto_ver=5) + +mid = 1 +publish2a_packet = mosq_test.gen_publish("subpub/local", qos=1, mid=mid, payload="message", proto_ver=5) +puback2a_packet = mosq_test.gen_puback(mid, proto_ver=5) + +mid = 2 +publish2b_packet = mosq_test.gen_publish("subpub/local", qos=1, mid=mid, payload="message", proto_ver=5) +puback2b_packet = mosq_test.gen_puback(mid, proto_ver=5) + +if os.path.exists('mosquitto-%d.db' % (port)): + os.unlink('mosquitto-%d.db' % (port)) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +(stdo1, stde1) = ("", "") +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback1") + mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2") + + mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1a") + mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2a") + + if mosq_test.expect_packet(sock, "publish2a", publish2a_packet): + sock.send(puback2a_packet) + + broker.terminate() + broker.wait() + (stdo1, stde1) = broker.communicate() + sock.close() + + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port) + + mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1b") + mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2b") + + if mosq_test.expect_packet(sock, "publish2b", publish2b_packet): + rc = 0 + + sock.close() +finally: + os.remove(conf_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde1 + stde) + if os.path.exists('mosquitto-%d.db' % (port)): + os.unlink('mosquitto-%d.db' % (port)) + + +exit(rc) + diff --git a/test/broker/11-persistent-subscription-v5.py b/test/broker/11-persistent-subscription-v5.py new file mode 100755 index 00000000..f3d825f5 --- /dev/null +++ b/test/broker/11-persistent-subscription-v5.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +# Test whether a client subscribed to a topic receives its own message sent to that topic. + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("port %d\n" % (port)) + f.write("persistence true\n") + f.write("persistence_file mosquitto-%d.db\n" % (port)) + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port) + +rc = 1 +mid = 530 +keepalive = 60 + +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 100) +connect_packet = mosq_test.gen_connect( + "persistent-subscription-test", keepalive=keepalive, clean_session=False, proto_ver=5, properties=props +) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5) +connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=5) # session present + +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1, proto_ver=5) +suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=5) + +mid = 300 +publish_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message", proto_ver=5) +puback_packet = mosq_test.gen_puback(mid, proto_ver=5) + +mid = 1 +publish_packet2 = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message", proto_ver=5) + +if os.path.exists('mosquitto-%d.db' % (port)): + os.unlink('mosquitto-%d.db' % (port)) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +(stdo1, stde1) = ("", "") +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + broker.terminate() + broker.wait() + (stdo1, stde1) = broker.communicate() + sock.close() + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port) + + mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback") + + if mosq_test.expect_packet(sock, "publish2", publish_packet2): + rc = 0 + + sock.close() +finally: + os.remove(conf_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde1 + stde) + if os.path.exists('mosquitto-%d.db' % (port)): + os.unlink('mosquitto-%d.db' % (port)) + + +exit(rc) + diff --git a/test/broker/11-persistent-subscription.py b/test/broker/11-persistent-subscription.py index e64cf2f8..5ac367c4 100755 --- a/test/broker/11-persistent-subscription.py +++ b/test/broker/11-persistent-subscription.py @@ -18,7 +18,7 @@ rc = 1 mid = 530 keepalive = 60 connect_packet = mosq_test.gen_connect( - "persitent-subscription-test", keepalive=keepalive, clean_session=False, + "persistent-subscription-test", keepalive=keepalive, clean_session=False, ) connack_packet = mosq_test.gen_connack(rc=0) connack_packet2 = mosq_test.gen_connack(rc=0, flags=1) # session present diff --git a/test/broker/Makefile b/test/broker/Makefile index 545a8b59..5d9a7a5d 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -179,6 +179,8 @@ endif 11 : ./11-persistent-subscription.py + ./11-persistent-subscription-v5.py + ./11-persistent-subscription-no-local.py 12 : ./12-prop-assigned-client-identifier.py diff --git a/test/broker/test.py b/test/broker/test.py index 1cf1325c..3f0f5664 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -147,6 +147,8 @@ tests = [ (2, './10-listener-mount-point.py'), (1, './11-persistent-subscription.py'), + (1, './11-persistent-subscription-v5.py'), + (1, './11-persistent-subscription-no-local.py'), (1, './12-prop-assigned-client-identifier.py'), (1, './12-prop-maximum-packet-size-broker.py'), diff --git a/test/unit/files/persist_read/v5-sub.test-db b/test/unit/files/persist_read/v5-sub.test-db index 56f707b2..acd14379 100644 Binary files a/test/unit/files/persist_read/v5-sub.test-db and b/test/unit/files/persist_read/v5-sub.test-db differ diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index 267cdb12..697ffe59 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -12,6 +12,7 @@ extern uint64_t last_retained; extern char *last_sub; extern int last_qos; +extern uint32_t last_identifier; struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) { @@ -119,6 +120,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub { last_sub = strdup(sub); last_qos = qos; + last_identifier = identifier; return MOSQ_ERR_SUCCESS; } diff --git a/test/unit/persist_read_test.c b/test/unit/persist_read_test.c index 0742c49a..cccee5ab 100644 --- a/test/unit/persist_read_test.c +++ b/test/unit/persist_read_test.c @@ -15,6 +15,7 @@ uint64_t last_retained; char *last_sub = NULL; int last_qos; +uint32_t last_identifier; static void TEST_persistence_disabled(void) { @@ -622,6 +623,7 @@ static void TEST_v5_sub(void) free(last_sub); } CU_ASSERT_EQUAL(last_qos, 1); + CU_ASSERT_EQUAL(last_identifier, 0x7623); } } diff --git a/test/unit/persist_write_test.c b/test/unit/persist_write_test.c index 3f0aad5e..a4e41592 100644 --- a/test/unit/persist_write_test.c +++ b/test/unit/persist_write_test.c @@ -211,6 +211,37 @@ static void TEST_v5_client_message(void) } +static void TEST_v5_sub(void) +{ + struct mosquitto_db db; + struct mosquitto__config config; + struct mosquitto__listener listener; + int rc; + + memset(&db, 0, sizeof(struct mosquitto_db)); + memset(&config, 0, sizeof(struct mosquitto__config)); + memset(&listener, 0, sizeof(struct mosquitto__listener)); + db.config = &config; + listener.port = 1883; + config.listeners = &listener; + config.listener_count = 1; + + db__open(&config, &db); + + config.persistence = true; + config.persistence_filepath = "files/persist_read/v5-sub.test-db"; + rc = persist__restore(&db); + CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS); + + config.persistence_filepath = "v5-sub.db"; + rc = persist__backup(&db, true); + CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS); + + CU_ASSERT_EQUAL(0, file_diff("files/persist_read/v5-sub.test-db", "v5-sub.db")); + //unlink("v5-sub.db"); +} + + #if 0 NOT WORKING static void TEST_v5_full(void) @@ -269,6 +300,7 @@ int main(int argc, char *argv[]) || !CU_add_test(test_suite, "v5 message store (message has no refs)", TEST_v5_message_store_no_ref) || !CU_add_test(test_suite, "v5 client", TEST_v5_client) || !CU_add_test(test_suite, "v5 client message", TEST_v5_client_message) + || !CU_add_test(test_suite, "v5 sub", TEST_v5_sub) //|| !CU_add_test(test_suite, "v5 full", TEST_v5_full) ){