Handle mismatched handshakes properly.

For example, a QoS1 PUBLISH with QoS2 reply.
pull/1132/head
Roger Light 7 years ago committed by Roger A. Light
parent d085216d71
commit 9dd8d1e054

@ -4,6 +4,7 @@
Broker:
- Fixed comment handling for config options that have optional arguments.
- Improved documentation around bridge topic remapping.
- Handle mismatched handshakes (e.g. QoS1 PUBLISH with QoS2 reply) properly.
Library:
- Fix TLS connections not working over SOCKS.

@ -44,15 +44,17 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
{
uint16_t mid;
int rc;
int qos;
assert(mosq);
rc = packet__read_uint16(&mosq->in_packet, &mid);
if(rc) return rc;
qos = type[3] == 'A'?1:2; /* pubAck or pubComp */
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);
if(mid){
rc = db__message_delete(db, mosq, mid, mosq_md_out);
rc = db__message_delete(db, mosq, mid, mosq_md_out, qos);
if(rc == MOSQ_ERR_NOT_FOUND){
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received %s from %s for an unknown packet identifier %d.", type, mosq->id, mid);
return MOSQ_ERR_SUCCESS;
@ -63,7 +65,10 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid);
if(!message__delete(mosq, mid, mosq_md_out)){
rc = message__delete(mosq, mid, mosq_md_out, qos);
if(rc){
return rc;
}else{
/* Only inform the client the message has been sent once. */
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){

@ -46,11 +46,11 @@ int handle__pubrec(struct mosquitto *mosq)
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);
rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp);
rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp, 2);
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid);
rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp);
rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp, 2);
#endif
if(rc == MOSQ_ERR_NOT_FOUND){
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received PUBREC from %s for an unknown packet identifier %d.", mosq->id, mid);

@ -55,7 +55,10 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid);
if(db__message_release(db, mosq, mid, mosq_md_in)){
rc = db__message_release(db, mosq, mid, mosq_md_in);
if(rc == MOSQ_ERR_PROTOCOL){
return rc;
}else if(rc != MOSQ_ERR_SUCCESS){
/* Message not found. Still send a PUBCOMP anyway because this could be
* due to a repeated PUBREL after a client has reconnected. */
log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received PUBREL from %s for an unknown packet identifier %d.", mosq->id, mid);
@ -63,7 +66,10 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid);
if(!message__remove(mosq, mid, mosq_md_in, &message)){
rc = message__remove(mosq, mid, mosq_md_in, &message, 2);
if(rc){
return rc;
}else{
/* Only pass the message on if we have removed it from the queue - this
* prevents multiple callbacks for the same message. */
pthread_mutex_lock(&mosq->callback_mutex);

@ -82,13 +82,13 @@ int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto
return MOSQ_ERR_SUCCESS;
}
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos)
{
struct mosquitto_message_all *message;
int rc;
assert(mosq);
rc = message__remove(mosq, mid, dir, &message);
rc = message__remove(mosq, mid, dir, &message, qos);
if(rc == MOSQ_ERR_SUCCESS){
message__cleanup(&message);
}
@ -218,7 +218,7 @@ void message__reconnect_reset(struct mosquitto *mosq)
pthread_mutex_unlock(&mosq->out_message_mutex);
}
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos)
{
struct mosquitto_message_all *cur, *prev = NULL;
bool found = false;
@ -231,6 +231,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
cur = mosq->out_messages;
while(cur){
if(cur->msg.mid == mid){
if(cur->msg.qos != qos){
return MOSQ_ERR_PROTOCOL;
}
if(prev){
prev->next = cur->next;
}else{
@ -287,6 +290,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
cur = mosq->in_messages;
while(cur){
if(cur->msg.mid == mid){
if(cur->msg.qos != qos){
return MOSQ_ERR_PROTOCOL;
}
if(prev){
prev->next = cur->next;
}else{
@ -370,7 +376,7 @@ void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_re
{
}
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos)
{
struct mosquitto_message_all *message;
assert(mosq);
@ -379,6 +385,9 @@ int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg
message = mosq->out_messages;
while(message){
if(message->msg.mid == mid){
if(message->msg.qos != qos){
return MOSQ_ERR_PROTOCOL;
}
message->state = state;
message->timestamp = mosquitto_time();
pthread_mutex_unlock(&mosq->out_message_mutex);

@ -21,11 +21,11 @@ Contributors:
void message__cleanup_all(struct mosquitto *mosq);
void message__cleanup(struct mosquitto_message_all **message);
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir);
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos);
int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir);
void message__reconnect_reset(struct mosquitto *mosq);
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message);
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos);
void message__retry_check(struct mosquitto *mosq);
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state);
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos);
#endif

@ -288,7 +288,7 @@ void db__message_dequeue_first(struct mosquitto *context)
msg->next = NULL;
}
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos)
{
struct mosquitto_client_msg *tail, *last = NULL;
int msg_index = 0;
@ -299,6 +299,11 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
while(tail){
msg_index++;
if(tail->mid == mid && tail->direction == dir){
if(tail->qos != qos){
return MOSQ_ERR_PROTOCOL;
}else if(qos == 2 && tail->state != mosq_ms_wait_for_pubcomp){
return MOSQ_ERR_PROTOCOL;
}
msg_index--;
db__message_remove(db, context, &tail, last);
}else{
@ -509,13 +514,16 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
#endif
}
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state)
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state, int qos)
{
struct mosquitto_client_msg *tail;
tail = context->inflight_msgs;
while(tail){
if(tail->mid == mid && tail->direction == dir){
if(tail->qos != qos){
return MOSQ_ERR_PROTOCOL;
}
tail->state = state;
tail->timestamp = mosquitto_time();
return MOSQ_ERR_SUCCESS;
@ -780,7 +788,6 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
{
struct mosquitto_client_msg *tail, *last = NULL;
int qos;
int retain;
char *topic;
char *source_id;
@ -793,7 +800,9 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
while(tail){
msg_index++;
if(tail->mid == mid && tail->direction == dir){
qos = tail->store->qos;
if(tail->store->qos != 2){
return MOSQ_ERR_PROTOCOL;
}
topic = tail->store->topic;
retain = tail->retain;
source_id = tail->store->source_id;
@ -802,7 +811,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
* denied/dropped and is being processed so the client doesn't
* keep resending it. That means we don't send it to other
* clients. */
if(!topic || !sub__messages_queue(db, source_id, topic, qos, retain, &tail->store)){
if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){
db__message_remove(db, context, &tail, last);
deleted = true;
}else{

@ -545,10 +545,10 @@ int persist__restore(struct mosquitto_db *db);
void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos);
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored);
int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state);
int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state, int qos);
int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);

@ -0,0 +1,69 @@
#!/usr/bin/env python
# Test what the broker does if receiving a PUBCOMP in response to a QoS 1 PUBLISH.
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
import time
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1)
suback_packet = mosq_test.gen_suback(mid, 1)
mid = 1
publish_packet2 = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)
mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
puback1s_packet = mosq_test.gen_puback(mid)
mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
pubcomp1r_packet = mosq_test.gen_pubcomp(mid)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1s")
helper.close()
if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
sock.send(pubcomp1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -0,0 +1,65 @@
#!/usr/bin/env python
# Test what the broker does if receiving a PUBREC in response to a QoS 1 PUBLISH.
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
import time
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1)
suback_packet = mosq_test.gen_suback(mid, 1)
helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)
mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
puback1s_packet = mosq_test.gen_puback(mid)
mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message")
pubrec1r_packet = mosq_test.gen_pubrec(mid)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1s")
helper.close()
if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
sock.send(pubrec1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -0,0 +1,68 @@
#!/usr/bin/env python
# Test what the broker does if receiving a PUBACK in response to a QoS 2 PUBLISH.
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
import time
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2)
suback_packet = mosq_test.gen_suback(mid, 2)
helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)
mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message")
pubrec1s_packet = mosq_test.gen_pubrec(mid)
pubrel1s_packet = mosq_test.gen_pubrel(mid)
pubcomp1s_packet = mosq_test.gen_pubcomp(mid)
mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message")
puback1r_packet = mosq_test.gen_puback(mid)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, pubrec1s_packet, "pubrec 1s")
mosq_test.do_send_receive(helper, pubrel1s_packet, pubcomp1s_packet, "pubcomp 1s")
helper.close()
if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
sock.send(puback1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -0,0 +1,71 @@
#!/usr/bin/env python
# Test what the broker does if receiving a PUBACK in response to a QoS 2 PUBREL.
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
import time
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2)
suback_packet = mosq_test.gen_suback(mid, 2)
helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)
mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message")
pubrec1s_packet = mosq_test.gen_pubrec(mid)
pubrel1s_packet = mosq_test.gen_pubrel(mid)
pubcomp1s_packet = mosq_test.gen_pubcomp(mid)
mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message")
pubrec1r_packet = mosq_test.gen_pubrec(mid)
pubrel1r_packet = mosq_test.gen_pubrel(mid)
puback1r_packet = mosq_test.gen_puback(mid)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, pubrec1s_packet, "pubrec 1s")
mosq_test.do_send_receive(helper, pubrel1s_packet, pubcomp1s_packet, "pubcomp 1s")
helper.close()
if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
mosq_test.do_send_receive(sock, pubrec1s_packet, pubrel1s_packet, "pubrel 1r")
sock.send(puback1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -0,0 +1,68 @@
#!/usr/bin/env python
# Test what the broker does if receiving a PUBCOMP in response to a QoS 2 PUBLISH.
import inspect, os, sys
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import mosq_test
import time
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2)
suback_packet = mosq_test.gen_suback(mid, 2)
helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive)
helper_connack = mosq_test.gen_connack(rc=0)
mid = 1
publish1s_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message")
pubrec1s_packet = mosq_test.gen_pubrec(mid)
pubrel1s_packet = mosq_test.gen_pubrel(mid)
pubcomp1s_packet = mosq_test.gen_pubcomp(mid)
mid = 1
publish1r_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message")
pubcomp1r_packet = mosq_test.gen_pubcomp(mid)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
mosq_test.do_send_receive(helper, publish1s_packet, pubrec1s_packet, "pubrec 1s")
mosq_test.do_send_receive(helper, pubrel1s_packet, pubcomp1s_packet, "pubcomp 1s")
helper.close()
if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet):
sock.send(pubcomp1r_packet)
sock.send(pingreq_packet)
p = sock.recv(len(pingresp_packet))
if len(p) == 0:
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -45,6 +45,11 @@ endif
./02-subpub-qos0.py
./02-subpub-qos1.py
./02-subpub-qos2.py
./02-subpub-qos1-bad-pubrec.py
./02-subpub-qos1-bad-pubcomp.py
./02-subpub-qos2-bad-puback-1.py
./02-subpub-qos2-bad-puback-2.py
./02-subpub-qos2-bad-pubcomp.py
./02-unsubscribe-qos0.py
./02-unsubscribe-qos1.py
./02-unsubscribe-qos2.py

@ -28,6 +28,11 @@ tests = [
(1, './02-subpub-qos0.py'),
(1, './02-subpub-qos1.py'),
(1, './02-subpub-qos2.py'),
(1, './02-subpub-qos1-bad-pubrec.py'),
(1, './02-subpub-qos1-bad-pubcomp.py'),
(1, './02-subpub-qos2-bad-puback-1.py'),
(1, './02-subpub-qos2-bad-puback-2.py'),
(1, './02-subpub-qos2-bad-pubcomp.py'),
(1, './02-unsubscribe-qos0.py'),
(1, './02-unsubscribe-qos1.py'),
(1, './02-unsubscribe-qos2.py'),

Loading…
Cancel
Save