From fb8a2baaad514fd22ce1b8a1f9708874f5e31888 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 3 Apr 2019 14:13:12 +0100 Subject: [PATCH] Refactor client msgs to use utlist to reduce code complexity. --- lib/mosquitto_internal.h | 2 - src/context.c | 4 - src/database.c | 231 +++++++++++--------------------- src/handle_connect.c | 29 ++-- src/mosquitto_broker_internal.h | 1 + src/persist_read.c | 14 +- test/broker/09-acl-change.py | 127 ++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + 9 files changed, 222 insertions(+), 188 deletions(-) create mode 100755 test/broker/09-acl-change.py diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 0fe676ea..0ae0cdb0 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -248,9 +248,7 @@ struct mosquitto { bool is_bridge; struct mosquitto__bridge *bridge; struct mosquitto_client_msg *inflight_msgs; - struct mosquitto_client_msg *last_inflight_msg; struct mosquitto_client_msg *queued_msgs; - struct mosquitto_client_msg *last_queued_msg; unsigned long msg_bytes; unsigned long msg_bytes12; int msg_count; diff --git a/src/context.c b/src/context.c index 8e526d71..e556c0a5 100644 --- a/src/context.c +++ b/src/context.c @@ -72,10 +72,6 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) } } context->bridge = NULL; - context->inflight_msgs = NULL; - context->last_inflight_msg = NULL; - context->queued_msgs = NULL; - context->last_queued_msg = NULL; context->receive_maximum = db->config->max_inflight_messages; context->send_maximum = db->config->max_inflight_messages; context->maximum_qos = 2; diff --git a/src/database.c b/src/database.c index c380d8ef..9269216a 100644 --- a/src/database.c +++ b/src/database.c @@ -18,6 +18,7 @@ Contributors: #include #include +#include #include "mosquitto_broker_internal.h" #include "memory_mosq.h" @@ -248,39 +249,25 @@ void db__msg_store_compact(struct mosquitto_db *db) } -static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) +static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **head, struct mosquitto_client_msg *item) { - if(!context || !msg || !(*msg)){ + if(!context || !head || !item){ return; } - if((*msg)->store){ + DL_DELETE(*head, item); + + if(item->store){ context->msg_count--; - context->msg_bytes -= (*msg)->store->payloadlen; - if((*msg)->qos > 0){ + context->msg_bytes -= item->store->payloadlen; + if(item->qos > 0){ context->msg_count12--; - context->msg_bytes12 -= (*msg)->store->payloadlen; - } - db__msg_store_deref(db, &(*msg)->store); - } - if(last){ - last->next = (*msg)->next; - if(!last->next){ - context->last_inflight_msg = last; + context->msg_bytes12 -= item->store->payloadlen; } - }else{ - context->inflight_msgs = (*msg)->next; - if(!context->inflight_msgs){ - context->last_inflight_msg = NULL; - } - } - mosquitto_property_free_all(&(*msg)->properties); - mosquitto__free(*msg); - if(last){ - *msg = last->next; - }else{ - *msg = context->inflight_msgs; + db__msg_store_deref(db, &item->store); } + mosquitto_property_free_all(&item->properties); + mosquitto__free(item); } void db__message_dequeue_first(struct mosquitto *context) @@ -288,30 +275,18 @@ void db__message_dequeue_first(struct mosquitto *context) struct mosquitto_client_msg *msg; msg = context->queued_msgs; - context->queued_msgs = msg->next; - if (context->last_queued_msg == msg){ - context->last_queued_msg = NULL; - } - - if (context->last_inflight_msg){ - context->last_inflight_msg->next = msg; - context->last_inflight_msg = msg; - }else{ - context->inflight_msgs = msg; - context->last_inflight_msg = msg; - } - msg->next = NULL; + DL_DELETE(context->queued_msgs, msg); + DL_APPEND(context->inflight_msgs, msg); } int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state expect_state, int qos) { - struct mosquitto_client_msg *tail, *last = NULL; + struct mosquitto_client_msg *tail, *tmp; int msg_index = 0; if(!context) return MOSQ_ERR_INVAL; - tail = context->inflight_msgs; - while(tail){ + DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){ msg_index++; if(tail->mid == mid && tail->direction == dir){ if(tail->qos != qos){ @@ -320,15 +295,16 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 return MOSQ_ERR_PROTOCOL; } msg_index--; - db__message_remove(db, context, &tail, last); - }else{ - last = tail; - tail = tail->next; + db__message_remove(db, context, &context->inflight_msgs, tail); } } - while (context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){ + + DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){ + if(context->send_maximum != 0 && msg_index >= context->send_maximum){ + break; + } + msg_index++; - tail = context->queued_msgs; tail->timestamp = mosquitto_time(); if(tail->direction == mosq_md_out){ switch(tail->qos){ @@ -358,7 +334,6 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties) { struct mosquitto_client_msg *msg; - struct mosquitto_client_msg **msgs, **last_msg; enum mosquitto_msg_state state = mosq_ms_invalid; int rc = 0; int i; @@ -463,6 +438,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg)); if(!msg) return MOSQ_ERR_NOMEM; + msg->prev = NULL; msg->next = NULL; msg->store = stored; msg->store->ref_count++; @@ -480,18 +456,9 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 msg->properties = properties; if (state == mosq_ms_queued){ - msgs = &(context->queued_msgs); - last_msg = &(context->last_queued_msg); - }else{ - msgs = &(context->inflight_msgs); - last_msg = &(context->last_inflight_msg); - } - if(*last_msg){ - (*last_msg)->next = msg; - (*last_msg) = msg; + DL_APPEND(context->queued_msgs, msg); }else{ - *msgs = msg; - *last_msg = msg; + DL_APPEND(context->inflight_msgs, msg); } context->msg_count++; context->msg_bytes += msg->store->payloadlen; @@ -544,8 +511,7 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m { struct mosquitto_client_msg *tail; - tail = context->inflight_msgs; - while(tail){ + DL_FOREACH(context->inflight_msgs, tail){ if(tail->mid == mid && tail->direction == dir){ if(tail->qos != qos){ return MOSQ_ERR_PROTOCOL; @@ -561,31 +527,25 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context) { - struct mosquitto_client_msg *tail, *next; + struct mosquitto_client_msg *tail, *tmp; if(!context) return MOSQ_ERR_INVAL; - tail = context->inflight_msgs; - while(tail){ + DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){ + DL_DELETE(context->inflight_msgs, tail); db__msg_store_deref(db, &tail->store); - next = tail->next; mosquitto_property_free_all(&tail->properties); mosquitto__free(tail); - tail = next; } context->inflight_msgs = NULL; - context->last_inflight_msg = NULL; - tail = context->queued_msgs; - while(tail){ + DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){ + DL_DELETE(context->queued_msgs, tail); db__msg_store_deref(db, &tail->store); - next = tail->next; mosquitto_property_free_all(&tail->properties); mosquitto__free(tail); - tail = next; } context->queued_msgs = NULL; - context->last_queued_msg = NULL; context->msg_bytes = 0; context->msg_bytes12 = 0; context->msg_count = 0; @@ -729,22 +689,18 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu if(!context) return MOSQ_ERR_INVAL; *stored = NULL; - tail = context->inflight_msgs; - while(tail){ + DL_FOREACH(context->inflight_msgs, tail){ if(tail->store->source_mid == mid && tail->direction == mosq_md_in){ *stored = tail->store; return MOSQ_ERR_SUCCESS; } - tail = tail->next; } - tail = context->queued_msgs; - while(tail){ + DL_FOREACH(context->queued_msgs, tail){ if(tail->store->source_mid == mid && tail->direction == mosq_md_in){ *stored = tail->store; return MOSQ_ERR_SUCCESS; } - tail = tail->next; } return 1; @@ -754,17 +710,14 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu * retry, and to set incoming messages to expect an appropriate retry. */ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context) { - struct mosquitto_client_msg *msg; - struct mosquitto_client_msg *prev = NULL; + struct mosquitto_client_msg *msg, *tmp; msg = context->inflight_msgs; context->msg_bytes = 0; context->msg_bytes12 = 0; context->msg_count = 0; context->msg_count12 = 0; - while(msg){ - context->last_inflight_msg = msg; - + DL_FOREACH_SAFE(context->inflight_msgs, msg, tmp){ context->msg_count++; context->msg_bytes += msg->store->payloadlen; if(msg->qos > 0){ @@ -792,14 +745,12 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte if(msg->qos != 2){ /* Anything inflight_msgs, msg); }else{ /* Message state can be preserved here because it should match * whatever the client has got. */ } } - prev = msg; - if(msg) msg = msg->next; } /* Messages received when the client was disconnected are put * in the mosq_ms_queued state. If we don't change them to the @@ -807,34 +758,26 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte * get sent until the client next receives a message - and they * will be sent out of order. */ - if(context->queued_msgs){ - msg = context->queued_msgs; - while(msg){ - context->last_queued_msg = msg; - - context->msg_count++; - context->msg_bytes += msg->store->payloadlen; - if(msg->qos > 0){ - context->msg_count12++; - context->msg_bytes12 += msg->store->payloadlen; - } - if (db__ready_for_flight(context, msg->qos)) { - switch(msg->qos){ - case 0: - msg->state = mosq_ms_publish_qos0; - break; - case 1: - msg->state = mosq_ms_publish_qos1; - break; - case 2: - msg->state = mosq_ms_publish_qos2; - break; - } - db__message_dequeue_first(context); - msg = context->queued_msgs; - } else { - msg = msg->next; + DL_FOREACH_SAFE(context->queued_msgs, msg, tmp){ + context->msg_count++; + context->msg_bytes += msg->store->payloadlen; + if(msg->qos > 0){ + context->msg_count12++; + context->msg_bytes12 += msg->store->payloadlen; + } + if(db__ready_for_flight(context, msg->qos)) { + switch(msg->qos){ + case 0: + msg->state = mosq_ms_publish_qos0; + break; + case 1: + msg->state = mosq_ms_publish_qos1; + break; + case 2: + msg->state = mosq_ms_publish_qos2; + break; } + db__message_dequeue_first(context); } } @@ -844,7 +787,7 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir) { - struct mosquitto_client_msg *tail, *last = NULL; + struct mosquitto_client_msg *tail, *tmp; int retain; char *topic; char *source_id; @@ -853,8 +796,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint if(!context) return MOSQ_ERR_INVAL; - tail = context->inflight_msgs; - while(tail){ + DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){ msg_index++; if(tail->mid == mid && tail->direction == dir){ if(tail->store->qos != 2){ @@ -869,20 +811,20 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint * keep resending it. That means we don't send it to other * clients. */ if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){ - db__message_remove(db, context, &tail, last); + db__message_remove(db, context, &context->inflight_msgs, tail); deleted = true; }else{ return 1; } - }else{ - last = tail; - tail = tail->next; } } - while(context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){ + DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){ + if(context->send_maximum != 0 && msg_index >= context->send_maximum){ + break; + } + msg_index++; - tail = context->queued_msgs; tail->timestamp = mosquitto_time(); if(tail->direction == mosq_md_out){ switch(tail->qos){ @@ -915,7 +857,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint int db__message_write(struct mosquitto_db *db, struct mosquitto *context) { int rc; - struct mosquitto_client_msg *tail, *last = NULL, *tmp; + struct mosquitto_client_msg *tail, *tmp; uint16_t mid; int retries; int retain; @@ -937,8 +879,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_SUCCESS; } - tail = context->inflight_msgs; - while(tail){ + DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){ msg_count++; expiry_interval = 0; if(tail->store->message_expiry_time){ @@ -947,7 +888,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) } if(now > tail->store->message_expiry_time){ /* Message is expired, must not send. */ - db__message_remove(db, context, &tail, last); + db__message_remove(db, context, &context->inflight_msgs, tail); continue; }else{ expiry_interval = tail->store->message_expiry_time - now; @@ -967,7 +908,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) case mosq_ms_publish_qos0: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(db, context, &tail, last); + db__message_remove(db, context, &context->inflight_msgs, tail); }else{ return rc; } @@ -979,13 +920,8 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ tail->state = mosq_ms_wait_for_puback; - - last = tail; - tail = tail->next; }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - tmp = tail->next; - db__message_remove(db, context, &tail, last); - tail = tmp; + db__message_remove(db, context, &context->inflight_msgs, tail); }else{ return rc; } @@ -997,13 +933,8 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ tail->state = mosq_ms_wait_for_pubrec; - - last = tail; - tail = tail->next; }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - tmp = tail->next; - db__message_remove(db, context, &tail, last); - tail = tmp; + db__message_remove(db, context, &context->inflight_msgs, tail); }else{ return rc; } @@ -1016,8 +947,6 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) }else{ return rc; } - last = tail; - tail = tail->next; break; case mosq_ms_resend_pubrel: @@ -1027,8 +956,6 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) }else{ return rc; } - last = tail; - tail = tail->next; break; case mosq_ms_resend_pubcomp: @@ -1038,20 +965,24 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) }else{ return rc; } - last = tail; - tail = tail->next; break; - default: - last = tail; - tail = tail->next; + case mosq_ms_invalid: + case mosq_ms_wait_for_puback: + case mosq_ms_wait_for_pubrec: + case mosq_ms_wait_for_pubrel: + case mosq_ms_wait_for_pubcomp: + case mosq_ms_queued: break; } } - while(context->queued_msgs && (context->send_maximum == 0 || msg_count < context->send_maximum)){ + DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){ + if(context->send_maximum != 0 && msg_count >= context->send_maximum){ + break; + } + msg_count++; - tail = context->queued_msgs; if(tail->direction == mosq_md_out){ switch(tail->qos){ case 0: diff --git a/src/handle_connect.c b/src/handle_connect.c index 043729d7..a2a1f739 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -18,6 +18,7 @@ Contributors: #include #include +#include #include "mosquitto_broker_internal.h" #include "mqtt_protocol.h" @@ -79,35 +80,21 @@ static char *client_id_gen(int *idlen, const char *auto_id_prefix, int auto_id_p /* Remove any queued messages that are no longer allowed through ACL, * assuming a possible change of username. */ -void connection_check_acl(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msgs) +void connection_check_acl(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **head) { - struct mosquitto_client_msg *msg_tail, *msg_prev; + struct mosquitto_client_msg *msg_tail, *tmp; - msg_tail = *msgs; - msg_prev = NULL; - while(msg_tail){ + DL_FOREACH_SAFE((*head), msg_tail, tmp){ if(msg_tail->direction == mosq_md_out){ if(mosquitto_acl_check(db, context, msg_tail->store->topic, msg_tail->store->payloadlen, UHPA_ACCESS(msg_tail->store->payload, msg_tail->store->payloadlen), msg_tail->store->qos, msg_tail->store->retain, MOSQ_ACL_READ) != MOSQ_ERR_SUCCESS){ + + DL_DELETE((*head), msg_tail); db__msg_store_deref(db, &msg_tail->store); - if(msg_prev){ - msg_prev->next = msg_tail->next; - mosquitto__free(msg_tail); - msg_tail = msg_prev->next; - }else{ - *msgs = (*msgs)->next; - mosquitto__free(msg_tail); - msg_tail = (*msgs); - } - // XXX: why it does not update last_msg if msg_tail was the last message ? - }else{ - msg_prev = msg_tail; - msg_tail = msg_tail->next; + mosquitto_property_free_all(&msg_tail->properties); + mosquitto__free(msg_tail); } - }else{ - msg_prev = msg_tail; - msg_tail = msg_tail->next; } } } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 5574463b..2e61a502 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -350,6 +350,7 @@ struct mosquitto_msg_store{ }; struct mosquitto_client_msg{ + struct mosquitto_client_msg *prev; struct mosquitto_client_msg *next; struct mosquitto_msg_store *store; mosquitto_property *properties; diff --git a/src/persist_read.c b/src/persist_read.c index 8752d86e..298cd331 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -28,6 +28,7 @@ Contributors: #include #include #include +#include #include "mosquitto_broker_internal.h" #include "memory_mosq.h" @@ -109,7 +110,6 @@ int persist__read_string(FILE *db_fptr, char **str) static int persist__client_msg_restore(struct mosquitto_db *db, struct P_client_msg *chunk) { struct mosquitto_client_msg *cmsg; - struct mosquitto_client_msg **msgs, **last_msg; struct mosquitto_msg_store_load *load; struct mosquitto *context; @@ -147,18 +147,10 @@ static int persist__client_msg_restore(struct mosquitto_db *db, struct P_client_ } if(chunk->F.state == mosq_ms_queued){ - msgs = &(context->queued_msgs); - last_msg = &(context->last_queued_msg); + DL_APPEND(context->queued_msgs, cmsg); }else{ - msgs = &(context->inflight_msgs); - last_msg = &(context->last_inflight_msg); + DL_APPEND(context->inflight_msgs, cmsg); } - if(*msgs){ - (*last_msg)->next = cmsg; - }else{ - *msgs = cmsg; - } - *last_msg = cmsg; return MOSQ_ERR_SUCCESS; } diff --git a/test/broker/09-acl-change.py b/test/broker/09-acl-change.py new file mode 100755 index 00000000..435dc705 --- /dev/null +++ b/test/broker/09-acl-change.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +# Check whether messages deliver or not after some access is revoked. + +from mosq_test_helper import * +import signal + +def write_config(filename, port, per_listener): + with open(filename, 'w') as f: + f.write("per_listener_settings %s\n" % (per_listener)) + f.write("port %d\n" % (port)) + f.write("acl_file %s\n" % (filename.replace('.conf', '.acl'))) + +def write_acl(filename, en): + with open(filename, 'w') as f: + f.write('user username\n') + f.write('topic readwrite topic/one\n') + if en: + f.write('topic readwrite topic/two\n') + +keepalive = 60 +username = "username" + +connect1_packet = mosq_test.gen_connect("acl-check", keepalive=keepalive, username=username, clean_session=False) +connack1a_packet = mosq_test.gen_connack(rc=0) +connack1b_packet = mosq_test.gen_connack(rc=0, flags=1) + +mid = 1 +subscribe1_packet = mosq_test.gen_subscribe(mid=mid, topic="topic/one", qos=1) +suback1_packet = mosq_test.gen_suback(mid=mid, qos=1) + +mid = 2 +subscribe2_packet = mosq_test.gen_subscribe(mid=mid, topic="topic/two", qos=1) +suback2_packet = mosq_test.gen_suback(mid=mid, qos=1) + +disconnect_packet = mosq_test.gen_disconnect() + +connect2_packet = mosq_test.gen_connect("helper", keepalive=keepalive, username=username) +connack2_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +publish1s_packet = mosq_test.gen_publish(topic="topic/one", mid=mid, qos=1, payload="message1") +puback1s_packet = mosq_test.gen_puback(mid) + +mid = 2 +publish2s_packet = mosq_test.gen_publish(topic="topic/two", mid=mid, qos=1, payload="message2") +puback2s_packet = mosq_test.gen_puback(mid) + +mid = 1 +publish1r_packet = mosq_test.gen_publish(topic="topic/one", mid=mid, qos=1, payload="message1") +puback1r_packet = mosq_test.gen_puback(mid) + +mid = 2 +publish3s_packet = mosq_test.gen_publish(topic="topic/one", mid=mid, qos=1, payload="message3") +puback3s_packet = mosq_test.gen_puback(mid) + +mid = 3 +publish3r_packet = mosq_test.gen_publish(topic="topic/one", mid=mid, qos=1, payload="message3") +puback3r_packet = mosq_test.gen_puback(mid) + +mid = 3 +publish4s_packet = mosq_test.gen_publish(topic="topic/two", mid=mid, qos=1, payload="message4") +puback4s_packet = mosq_test.gen_puback(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +rc = 1 + +port = mosq_test.get_port() + +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port, "false") + +acl_file = os.path.basename(__file__).replace('.py', '.acl') +write_acl(acl_file, True) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +try: + keepalive = 60 + + # Connect, subscribe, then disconnect + sock = mosq_test.do_client_connect(connect1_packet, connack1a_packet, port=port) + mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback1") + mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2") + sock.send(disconnect_packet) + sock.close() + + # Helper publish to topic/one and topic/two, will be queued for other client + sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, port=port) + mosq_test.do_send_receive(sock, publish1s_packet, puback1s_packet, "puback1") + mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2") + sock.close() + + # Reload ACLs with topic/two now disabled + write_acl(acl_file, False) + broker.send_signal(signal.SIGHUP) + + sock = mosq_test.do_client_connect(connect1_packet, connack1b_packet, port=port) + sock.settimeout(10) + mosq_test.expect_packet(sock, "publish1r", publish1r_packet) + # We don't expect messages to topic/two any more, so we don't expect the queued one + mosq_test.do_send_receive(sock, publish3s_packet, puback3s_packet, "puback3") + mosq_test.expect_packet(sock, "publish3r", publish3r_packet) + + # Send this, don't expect it to succeed + mosq_test.do_send_receive(sock, publish4s_packet, puback4s_packet, "puback4") + + # Check for non delivery with a ping + mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet, "pingresp") + + sock.close() + rc = 0 + +finally: + os.remove(conf_file) + os.remove(acl_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + exit(rc) + +port = mosq_test.get_port() + diff --git a/test/broker/Makefile b/test/broker/Makefile index afced3f0..57408a2f 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -161,6 +161,7 @@ endif 09 : ./09-acl-access-variants.py + ./09-acl-change.py ./09-acl-empty-file.py ./09-auth-bad-method.py ./09-extended-auth-unsupported.py diff --git a/test/broker/test.py b/test/broker/test.py index 87f94d19..c4915b10 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -131,6 +131,7 @@ tests = [ (3, './08-tls-psk-bridge.py'), (1, './09-acl-access-variants.py'), + (1, './09-acl-change.py'), (1, './09-acl-empty-file.py'), (1, './09-auth-bad-method.py'), (1, './09-extended-auth-unsupported.py'),