diff --git a/src/database.c b/src/database.c index 1d168f09..361e605f 100644 --- a/src/database.c +++ b/src/database.c @@ -346,7 +346,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte return MOSQ_ERR_SUCCESS; } -int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties) +int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update) { struct mosquitto_client_msg *msg; struct mosquitto_msg_data *msg_data; @@ -522,15 +522,15 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 if(dir == mosq_md_out && msg->qos > 0){ util__decrement_send_quota(context); } -#ifdef WITH_WEBSOCKETS - if(context->wsi && rc == 0){ - return db__message_write(db, context); - }else{ - return rc; + + if(dir == mosq_md_out && update && context->current_out_packet == NULL){ + rc = db__message_write_inflight_out(db, context); + if(rc) return rc; + rc = db__message_write_queued_out(db, context); + if(rc) return rc; } -#else + return rc; -#endif } int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos) @@ -926,33 +926,17 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont } } -int db__message_write(struct mosquitto_db *db, struct mosquitto *context) +int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *context) { - int rc; struct mosquitto_client_msg *tail, *tmp; - uint16_t mid; - int retries; - int retain; - const char *topic; - int qos; - uint32_t payloadlen; - const void *payload; - int msg_count = 0; - mosquitto_property *cmsg_props = NULL, *store_props = NULL; + int rc; time_t now = 0; - uint32_t expiry_interval; - - if(!context || context->sock == INVALID_SOCKET - || (context->state == mosq_cs_active && !context->id)){ - return MOSQ_ERR_INVAL; - } if(context->state != mosq_cs_active){ return MOSQ_ERR_SUCCESS; } DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){ - msg_count++; if(tail->store->message_expiry_time){ if(now == 0){ now = time(NULL); @@ -963,11 +947,10 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) continue; } } - mid = tail->mid; switch(tail->state){ case mosq_ms_send_pubrec: - rc = send__pubrec(context, mid, 0, NULL); + rc = send__pubrec(context, tail->mid, 0, NULL); if(!rc){ tail->state = mosq_ms_wait_for_pubrel; }else{ @@ -976,7 +959,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; case mosq_ms_resend_pubcomp: - rc = send__pubcomp(context, mid, NULL); + rc = send__pubcomp(context, tail->mid, NULL); if(!rc){ tail->state = mosq_ms_wait_for_pubrel; }else{ @@ -997,9 +980,31 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; } } + return MOSQ_ERR_SUCCESS; +} + + +int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context) +{ + struct mosquitto_client_msg *tail, *tmp; + mosquitto_property *cmsg_props = NULL, *store_props = NULL; + int rc; + uint16_t mid; + int retries; + int retain; + const char *topic; + int qos; + uint32_t payloadlen; + const void *payload; + time_t now = 0; + uint32_t expiry_interval; + + if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){ + return MOSQ_ERR_SUCCESS; + } DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){ - msg_count++; + expiry_interval = 0; if(tail->store->message_expiry_time){ if(now == 0){ now = time(NULL); @@ -1080,14 +1085,24 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) break; } } + return MOSQ_ERR_SUCCESS; +} + + +int db__message_write_queued_in(struct mosquitto_db *db, struct mosquitto *context) +{ + struct mosquitto_client_msg *tail, *tmp; + int rc; + + if(context->state != mosq_cs_active){ + return MOSQ_ERR_SUCCESS; + } DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){ if(context->msgs_out.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){ break; } - msg_count++; - if(tail->qos == 2){ tail->state = mosq_ms_send_pubrec; db__message_dequeue_first(context, &context->msgs_in); @@ -1099,14 +1114,23 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) } } } + return MOSQ_ERR_SUCCESS; +} + + +int db__message_write_queued_out(struct mosquitto_db *db, struct mosquitto *context) +{ + struct mosquitto_client_msg *tail, *tmp; + + if(context->state != mosq_cs_active){ + return MOSQ_ERR_SUCCESS; + } DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){ if(context->msgs_out.inflight_maximum != 0 && context->msgs_out.inflight_quota == 0){ break; } - msg_count++; - switch(tail->qos){ case 0: tail->state = mosq_ms_publish_qos0; @@ -1120,10 +1144,10 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) } db__message_dequeue_first(context, &context->msgs_out); } - return MOSQ_ERR_SUCCESS; } + void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes) { max_inflight_bytes = inflight_bytes; diff --git a/src/handle_connect.c b/src/handle_connect.c index 39ba66d7..bec9eb06 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -272,6 +272,10 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v mosquitto__set_state(context, mosq_cs_active); rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props); mosquitto_property_free_all(&connack_props); + if(rc) return rc; + rc = db__message_write_inflight_out(db, context); + if(rc) return rc; + rc = db__message_write_queued_out(db, context); return rc; error: free(auth_data_out); diff --git a/src/handle_publish.c b/src/handle_publish.c index f5d98df3..96de85e7 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -290,7 +290,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) break; case 2: if(dup == 0){ - res = db__message_insert(db, context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL); + res = db__message_insert(db, context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL, false); }else{ res = 0; } @@ -304,6 +304,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) break; } + db__message_write_queued_in(db, context); return rc; process_bad_message: rc = 1; diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index b253a0af..302ae6c6 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -220,6 +220,13 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) db->persistence_changes++; #endif + if(context->current_out_packet == NULL){ + rc = db__message_write_inflight_out(db, context); + if(rc) return rc; + rc = db__message_write_queued_out(db, context); + if(rc) return rc; + } + return rc; } diff --git a/src/loop.c b/src/loop.c index aadd16b3..4a192da5 100644 --- a/src/loop.c +++ b/src/loop.c @@ -166,14 +166,12 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li time_t last_backup = mosquitto_time(); #endif time_t now = 0; - int time_count; + time_t last_keepalive_check = mosquitto_time(); struct mosquitto *context, *ctxt_tmp; -#ifdef WITH_BRIDGE - int rc; -#endif #ifdef WITH_WEBSOCKETS int i; #endif + int rc; #if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000 @@ -197,41 +195,28 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } #endif - time_count = 0; - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ - if(time_count > 0){ - time_count--; - }else{ - time_count = 1000; - now = mosquitto_time(); - } + now = mosquitto_time(); + if(last_keepalive_check != now){ + last_keepalive_check = now; - if(context->sock != INVALID_SOCKET){ - /* Local bridges never time out in this fashion. */ - if(!(context->keepalive) - || context->bridge - || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(context->sock != INVALID_SOCKET){ + /* Local bridges never time out in this fashion. */ + if(!(context->keepalive) + || context->bridge + || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ - if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){ - if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ - mux__add_out(db, context); - context->ws_want_write = false; - } - else{ - mux__remove_out(db, context); - } }else{ - do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + /* Client has exceeded keepalive*1.5 */ + do_disconnect(db, context, MOSQ_ERR_KEEPALIVE); } - }else{ - /* Client has exceeded keepalive*1.5 */ - do_disconnect(db, context, MOSQ_ERR_KEEPALIVE); } } } - +#ifdef WITH_BRIDGE bridge_check(db); +#endif rc = mux__handle(db, listensock, listensock_count); if(rc) return rc; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 19e936e7..6a2d0fa9 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -653,10 +653,9 @@ void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queu /* Return the number of in-flight messages in count. */ int db__message_count(int *count); int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos); -int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties); +int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update); int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid); int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos); -int db__message_write(struct mosquitto_db *db, struct mosquitto *context); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context, bool force_free); int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); @@ -672,6 +671,9 @@ void db__msg_store_free(struct mosquitto_msg_store *store); int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); void sys_tree__init(struct mosquitto_db *db); void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time); +int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context); +int db__message_write_queued_out(struct mosquitto_db *db, struct mosquitto *context); +int db__message_write_queued_in(struct mosquitto_db *db, struct mosquitto *context); /* ============================================================ * Subscription functions diff --git a/src/retain.c b/src/retain.c index 238b63b7..1b4664cf 100644 --- a/src/retain.c +++ b/src/retain.c @@ -184,7 +184,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier if(subscription_identifier > 0){ mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, subscription_identifier); } - return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties); + return db__message_insert(db, context, mid, mosq_md_out, qos, true, retained, properties, false); } diff --git a/src/subs.c b/src/subs.c index 11603527..5062455b 100644 --- a/src/subs.c +++ b/src/subs.c @@ -95,7 +95,7 @@ static int subs__send(struct mosquitto_db *db, struct mosquitto__subleaf *leaf, if(leaf->identifier){ mosquitto_property_add_varint(&properties, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, leaf->identifier); } - if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties) == 1){ + if(db__message_insert(db, leaf->context, mid, mosq_md_out, msg_qos, client_retain, stored, properties, true) == 1){ return 1; } }else{ diff --git a/src/websockets.c b/src/websockets.c index 9ca03416..6b0a5c6f 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -242,6 +242,7 @@ static int callback_mqtt(struct libwebsocket_context *context, } mosq->sock = libwebsocket_get_socket_fd(wsi); HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(mosq->sock), mosq); + mux__add_in(db, mosq); break; case LWS_CALLBACK_CLOSED: @@ -273,7 +274,7 @@ static int callback_mqtt(struct libwebsocket_context *context, return -1; } - db__message_write(db, mosq); + db__message_write_inflight_out(db, mosq); if(mosq->out_packet && !mosq->current_out_packet){ mosq->current_out_packet = mosq->out_packet; @@ -672,8 +673,13 @@ static int callback_http(struct libwebsocket_context *context, case LWS_CALLBACK_DEL_POLL_FD: case LWS_CALLBACK_CHANGE_MODE_POLL_FD: HASH_FIND(hh_sock, db->contexts_by_sock, &pollargs->fd, sizeof(pollargs->fd), mosq); - if(mosq && (pollargs->events & POLLOUT)){ - mosq->ws_want_write = true; + if(mosq){ + if(pollargs->events & POLLOUT){ + mux__add_out(db, mosq); + mosq->ws_want_write = true; + }else{ + mux__remove_out(db, mosq); + } } break; diff --git a/test/broker/02-subpub-qos1-nolocal.py b/test/broker/02-subpub-qos1-nolocal.py index d2397879..90f4d332 100755 --- a/test/broker/02-subpub-qos1-nolocal.py +++ b/test/broker/02-subpub-qos1-nolocal.py @@ -42,9 +42,9 @@ def do_test(): mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2") mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback") - mosq_test.do_send_receive(sock, publish2_packet, puback2_packet, "puback2") + sock.send(publish2_packet) - mosq_test.expect_packet(sock, "publish3", publish3_packet) + mosq_test.receive_unordered(sock, puback2_packet, publish3_packet, "puback2/publish3") rc = 0 sock.close() diff --git a/test/broker/02-subpub-qos1.py b/test/broker/02-subpub-qos1.py index e45abe7a..ac1bf789 100755 --- a/test/broker/02-subpub-qos1.py +++ b/test/broker/02-subpub-qos1.py @@ -30,9 +30,8 @@ def do_test(proto_ver): mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") - mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback") - - mosq_test.expect_packet(sock, "publish2", publish_packet2) + sock.send(publish_packet) + mosq_test.receive_unordered(sock, puback_packet, publish_packet2, "puback/publish2") rc = 0 sock.close() diff --git a/test/broker/02-subpub-qos2-receive-maximum-1.py b/test/broker/02-subpub-qos2-receive-maximum-1.py index c2bc0c58..9f54edbe 100755 --- a/test/broker/02-subpub-qos2-receive-maximum-1.py +++ b/test/broker/02-subpub-qos2-receive-maximum-1.py @@ -70,7 +70,6 @@ def do_test(): (stdo, stde) = broker.communicate() if rc: print(stde.decode('utf-8')) - print("proto_ver=%d" % (proto_ver)) exit(rc) diff --git a/test/broker/02-subpub-qos2.py b/test/broker/02-subpub-qos2.py index 75cee903..f1a7c20f 100755 --- a/test/broker/02-subpub-qos2.py +++ b/test/broker/02-subpub-qos2.py @@ -35,9 +35,10 @@ def do_test(proto_ver): mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec") - mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "pubcomp") + sock.send(pubrel_packet) + + mosq_test.receive_unordered(sock, pubcomp_packet, publish_packet2, "pubcomp/publish2") - mosq_test.expect_packet(sock, "publish2", publish_packet2) mosq_test.do_send_receive(sock, pubrec_packet2, pubrel_packet2, "pubrel2") sock.send(pubcomp_packet2) # Broker side of flow complete so can quit here. diff --git a/test/broker/09-acl-change.py b/test/broker/09-acl-change.py index e86ba634..c9d2b720 100755 --- a/test/broker/09-acl-change.py +++ b/test/broker/09-acl-change.py @@ -98,8 +98,8 @@ try: sock.settimeout(10) mosq_test.expect_packet(sock, "publish1r", publish1r_packet) # We don't expect messages to topic/two any more, so we don't expect the queued one - mosq_test.do_send_receive(sock, publish3s_packet, puback3s_packet, "puback3") - mosq_test.expect_packet(sock, "publish3r", publish3r_packet) + sock.send(publish3s_packet) + mosq_test.receive_unordered(sock, puback3s_packet, publish3r_packet, "puback3/publish3r") # Send this, don't expect it to succeed mosq_test.do_send_receive(sock, publish4s_packet, puback4s_packet, "puback4") diff --git a/test/broker/09-extended-auth-change-username.py b/test/broker/09-extended-auth-change-username.py index 361bbdf9..313ece9b 100755 --- a/test/broker/09-extended-auth-change-username.py +++ b/test/broker/09-extended-auth-change-username.py @@ -61,8 +61,8 @@ def do_test(per_listener): sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=20, port=port) mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback2") - mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2") - mosq_test.expect_packet(sock, "publish2", publish2r_packet) + sock.send(publish2s_packet) + mosq_test.receive_unordered(sock, puback2s_packet, publish2r_packet, "puback2/publish2") mosq_test.do_ping(sock) rc = 0 diff --git a/test/broker/09-plugin-auth-msg-params.py b/test/broker/09-plugin-auth-msg-params.py index 1d4eb28d..c3ea31cf 100755 --- a/test/broker/09-plugin-auth-msg-params.py +++ b/test/broker/09-plugin-auth-msg-params.py @@ -36,9 +36,9 @@ broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=Tr try: sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") - mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback") + sock.send(publish_packet) + mosq_test.receive_unordered(sock, puback_packet, publish_packet_recv, "puback/publish_receive") - mosq_test.expect_packet(sock, "publish receive", publish_packet_recv) rc = 0 sock.close() diff --git a/test/broker/11-persistent-subscription-no-local.py b/test/broker/11-persistent-subscription-no-local.py index 420de680..ccc6132e 100755 --- a/test/broker/11-persistent-subscription-no-local.py +++ b/test/broker/11-persistent-subscription-no-local.py @@ -59,9 +59,9 @@ try: mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2") mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1a") - mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2a") + sock.send(publish2s_packet) + mosq_test.receive_unordered(sock, puback2s_packet, publish2a_packet, "puback2a/publish2a") - mosq_test.expect_packet(sock, "publish2a", publish2a_packet) sock.send(puback2a_packet) broker.terminate() @@ -74,9 +74,9 @@ try: sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port) mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1b") - mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2b") + sock.send(publish2s_packet) + mosq_test.receive_unordered(sock, puback2s_packet, publish2b_packet, "puback2b/publish2b") - mosq_test.expect_packet(sock, "publish2b", publish2b_packet) rc = 0 sock.close() diff --git a/test/broker/11-persistent-subscription-v5.py b/test/broker/11-persistent-subscription-v5.py index 02d8ef1e..9d883d77 100755 --- a/test/broker/11-persistent-subscription-v5.py +++ b/test/broker/11-persistent-subscription-v5.py @@ -52,9 +52,8 @@ try: sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port) - mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback") - - mosq_test.expect_packet(sock, "publish2", publish_packet2) + sock.send(publish_packet) + mosq_test.receive_unordered(sock, puback_packet, publish_packet2, "puback/publish2") rc = 0 sock.close() diff --git a/test/broker/11-persistent-subscription.py b/test/broker/11-persistent-subscription.py index eefbcc73..f77ffb51 100755 --- a/test/broker/11-persistent-subscription.py +++ b/test/broker/11-persistent-subscription.py @@ -52,9 +52,8 @@ def do_test(proto_ver): sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=20, port=port) - mosq_test.do_send_receive(sock, publish_packet, puback_packet, "puback") - - mosq_test.expect_packet(sock, "publish2", publish_packet2) + sock.send(publish_packet) + mosq_test.receive_unordered(sock, puback_packet, publish_packet2, "puback/publish2") rc = 0 sock.close() diff --git a/test/broker/12-prop-maximum-packet-size-publish-qos1.py b/test/broker/12-prop-maximum-packet-size-publish-qos1.py index 88550369..f2686026 100755 --- a/test/broker/12-prop-maximum-packet-size-publish-qos1.py +++ b/test/broker/12-prop-maximum-packet-size-publish-qos1.py @@ -36,9 +36,8 @@ try: # We shouldn't receive the publish here because it is > MAXIMUM_PACKET_SIZE mosq_test.do_ping(sock) - mosq_test.do_send_receive(sock, publish2_packet, puback2_packet, "puback 2") - - mosq_test.expect_packet(sock, "publish2", publish2_packet) + sock.send(publish2_packet) + mosq_test.receive_unordered(sock, puback2_packet, publish2_packet, "puback 2/publish2") rc = 0 except mosq_test.TestError: pass diff --git a/test/broker/12-prop-maximum-packet-size-publish-qos2.py b/test/broker/12-prop-maximum-packet-size-publish-qos2.py index 4cfe237e..5fcee737 100755 --- a/test/broker/12-prop-maximum-packet-size-publish-qos2.py +++ b/test/broker/12-prop-maximum-packet-size-publish-qos2.py @@ -42,9 +42,8 @@ try: mosq_test.do_ping(sock) mosq_test.do_send_receive(sock, publish2_packet, pubrec2_packet, "pubrec 2") - mosq_test.do_send_receive(sock, pubrel2_packet, pubcomp2_packet, "pubcomp 2") - - mosq_test.expect_packet(sock, "publish2", publish2_packet) + sock.send(pubrel2_packet) + mosq_test.receive_unordered(sock, pubcomp2_packet, publish2_packet, "pubcomp 2/publish2") rc = 0 except mosq_test.TestError: pass diff --git a/test/mosq_test.py b/test/mosq_test.py index 56693fb9..0dcb82e2 100644 --- a/test/mosq_test.py +++ b/test/mosq_test.py @@ -114,6 +114,20 @@ def packet_matches(name, recvd, expected): return True +def receive_unordered(sock, recv1_packet, recv2_packet, error_string): + expected1 = recv1_packet + recv2_packet + expected2 = recv2_packet + recv1_packet + recvd = b'' + while len(recvd) < len(expected1): + recvd += sock.recv(1) + + if recvd == expected1 or recvd == expected2: + return + else: + packet_matches(error_string, recvd, expected2) + raise ValueError(error_string) + + def do_send_receive(sock, send_packet, receive_packet, error_string="send receive error"): size = len(send_packet) total_sent = 0 diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index e7d7c264..93b93102 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -135,7 +135,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub return MOSQ_ERR_SUCCESS; } -int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties) +int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update) { return MOSQ_ERR_SUCCESS; }