From fc843408205934c65e69393439c47255ecaaa8b2 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 7 Dec 2022 14:17:09 +0000 Subject: [PATCH] Add mosquitto_unsubscribe2_v5_callback_set --- include/mosquitto.h | 30 ++++++++++++- lib/callbacks.c | 14 +++++- lib/callbacks.h | 2 +- lib/handle_unsuback.c | 34 +++++++++----- lib/linker.version | 1 + lib/mosquitto_internal.h | 1 + test/lib/02-unsubscribe2-v5.py | 47 ++++++++++++++++++++ test/lib/Makefile | 1 + test/lib/c/02-unsubscribe-v5.c | 5 ++- test/lib/c/02-unsubscribe2-v5.c | 79 +++++++++++++++++++++++++++++++++ test/lib/c/CMakeLists.txt | 1 + test/lib/c/Makefile | 1 + test/lib/c/fuzzish.c | 18 +++++++- test/lib/test.py | 1 + 14 files changed, 217 insertions(+), 18 deletions(-) create mode 100755 test/lib/02-unsubscribe2-v5.py create mode 100644 test/lib/c/02-unsubscribe2-v5.c diff --git a/include/mosquitto.h b/include/mosquitto.h index 4de6ec4a..54d1cabf 100644 --- a/include/mosquitto.h +++ b/include/mosquitto.h @@ -2301,7 +2301,7 @@ libmosq_EXPORT void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, v * Parameters: * mosq - a valid mosquitto instance. * on_unsubscribe - a callback function in the following form: - * void callback(struct mosquitto *mosq, void *obj, int mid) + * void callback(struct mosquitto *mosq, void *obj, int mid, const mosquitto_property *props) * * Callback Parameters: * mosq - the mosquitto instance making the callback. @@ -2311,6 +2311,34 @@ libmosq_EXPORT void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, v */ libmosq_EXPORT void mosquitto_unsubscribe_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, const mosquitto_property *props)); +/* + * Function: mosquitto_unsubscribe2_v5_callback_set + * + * Set the unsubscribe callback. This is called when the library receives a + * UNSUBACK message in response to an UNSUBSCRIBE. + * + * It is valid to set this callback for all MQTT protocol versions. If it is + * used with MQTT clients that use MQTT v3.1.1 or earlier, then the `props` + * argument will always be NULL. + * + * Parameters: + * mosq - a valid mosquitto instance. + * on_unsubscribe - a callback function in the following form: + * void callback(struct mosquitto *mosq, void *obj, int mid, + * int reason_code_count, const int *reason_codes, const mosquitto_property *props) + * + * Callback Parameters: + * mosq - the mosquitto instance making the callback. + * obj - the user data provided in + * mid - the message id of the unsubscribe message. + * reason_code_count - the count of reason code responses + * reason_codes - an array of integers indicating the reason codes for each of + * the unsubscription requests. + * mid - the message id of the unsubscribe message. + * props - list of MQTT 5 properties, or NULL + */ +libmosq_EXPORT void mosquitto_unsubscribe2_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, int, const int *, const mosquitto_property *props)); + /* * Function: mosquitto_log_callback_set * diff --git a/lib/callbacks.c b/lib/callbacks.c index 1134e4f1..542dd3b1 100644 --- a/lib/callbacks.c +++ b/lib/callbacks.c @@ -121,6 +121,13 @@ void mosquitto_unsubscribe_v5_callback_set(struct mosquitto *mosq, void (*on_uns pthread_mutex_unlock(&mosq->callback_mutex); } +void mosquitto_unsubscribe2_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, int, const int *, const mosquitto_property *props)) +{ + pthread_mutex_lock(&mosq->callback_mutex); + mosq->on_unsubscribe2_v5 = on_unsubscribe; + pthread_mutex_unlock(&mosq->callback_mutex); +} + void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *)) { pthread_mutex_lock(&mosq->log_callback_mutex); @@ -234,14 +241,16 @@ void callback__on_subscribe(struct mosquitto *mosq, int mid, int qos_count, cons } -void callback__on_unsubscribe(struct mosquitto *mosq, int mid, const mosquitto_property *properties) +void callback__on_unsubscribe(struct mosquitto *mosq, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *properties) { void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid) = NULL; void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props) = NULL; + void (*on_unsubscribe2_v5)(struct mosquitto *, void *userdata, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props) = NULL; pthread_mutex_lock(&mosq->callback_mutex); on_unsubscribe = mosq->on_unsubscribe; on_unsubscribe_v5 = mosq->on_unsubscribe_v5; + on_unsubscribe2_v5 = mosq->on_unsubscribe2_v5; pthread_mutex_unlock(&mosq->callback_mutex); mosq->callback_depth++; @@ -251,6 +260,9 @@ void callback__on_unsubscribe(struct mosquitto *mosq, int mid, const mosquitto_p if(on_unsubscribe_v5){ on_unsubscribe_v5(mosq, mosq->userdata, mid, properties); } + if(on_unsubscribe2_v5){ + on_unsubscribe2_v5(mosq, mosq->userdata, mid, reason_code_count, reason_codes, properties); + } mosq->callback_depth--; } diff --git a/lib/callbacks.h b/lib/callbacks.h index e88955bd..fb33d547 100644 --- a/lib/callbacks.h +++ b/lib/callbacks.h @@ -25,7 +25,7 @@ void callback__on_connect(struct mosquitto *mosq, uint8_t reason_code, uint8_t c void callback__on_publish(struct mosquitto *mosq, int mid, int reason_code, const mosquitto_property *properties); void callback__on_message(struct mosquitto *mosq, const struct mosquitto_message *message, const mosquitto_property *properties); void callback__on_subscribe(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props); -void callback__on_unsubscribe(struct mosquitto *mosq, int mid, const mosquitto_property *props); +void callback__on_unsubscribe(struct mosquitto *mosq, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props); void callback__on_disconnect(struct mosquitto *mosq, int rc, const mosquitto_property *props); #endif diff --git a/lib/handle_unsuback.c b/lib/handle_unsuback.c index ee979a0c..b79916bc 100644 --- a/lib/handle_unsuback.c +++ b/lib/handle_unsuback.c @@ -45,6 +45,8 @@ int handle__unsuback(struct mosquitto *mosq) uint16_t mid; int rc; mosquitto_property *properties = NULL; + int *reason_codes = NULL; + int reason_code_count = 0; assert(mosq); @@ -69,20 +71,30 @@ int handle__unsuback(struct mosquitto *mosq) if(mid == 0) return MOSQ_ERR_PROTOCOL; if(mosq->protocol == mosq_p_mqtt5){ - rc = property__read_all(CMD_UNSUBACK, &mosq->in_packet, &properties); - if(rc) return rc; - } - if(mosq->in_packet.pos < mosq->in_packet.remaining_length){ - return MOSQ_ERR_MALFORMED_PACKET; + uint8_t byte; + reason_code_count = (int)(mosq->in_packet.remaining_length - mosq->in_packet.pos); + reason_codes = mosquitto__malloc((size_t)reason_code_count*sizeof(int)); + if(!reason_codes){ + mosquitto_property_free_all(&properties); + return MOSQ_ERR_NOMEM; + } + for(int i=0; iin_packet, &byte); + if(rc){ + mosquitto__FREE(reason_codes); + mosquitto_property_free_all(&properties); + return rc; + } + reason_codes[i] = (int)byte; + i++; + } } -#ifdef WITH_BROKER - /* Immediately free, we don't do anything with Reason String or User Property at the moment */ - mosquitto_property_free_all(&properties); -#else - callback__on_unsubscribe(mosq, mid, properties); - mosquitto_property_free_all(&properties); +#ifndef WITH_BROKER + callback__on_unsubscribe(mosq, mid, reason_code_count, reason_codes, properties); #endif + mosquitto_property_free_all(&properties); + mosquitto__FREE(reason_codes); return MOSQ_ERR_SUCCESS; } diff --git a/lib/linker.version b/lib/linker.version index 25ade8fc..67084969 100644 --- a/lib/linker.version +++ b/lib/linker.version @@ -148,4 +148,5 @@ MOSQ_2.1 { mosquitto_topic_matches_sub_with_pattern; mosquitto_sub_matches_acl; mosquitto_sub_matches_acl_with_pattern; + mosquitto_unsubscribe2_v5_callback_set; } MOSQ_1.7; diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index cd3d69d2..88c11646 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -396,6 +396,7 @@ struct mosquitto { void (*on_subscribe_v5)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props); void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid); void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props); + void (*on_unsubscribe2_v5)(struct mosquitto *, void *userdata, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props); void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str); /*void (*on_error)();*/ char *host; diff --git a/test/lib/02-unsubscribe2-v5.py b/test/lib/02-unsubscribe2-v5.py new file mode 100755 index 00000000..6248d6e4 --- /dev/null +++ b/test/lib/02-unsubscribe2-v5.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +# Test whether a v5 client sends a correct UNSUBSCRIBE packet, and handles the UNSUBACK. + +from mosq_test_helper import * + +port = mosq_test.get_lib_port() + +keepalive = 60 +connect_packet = mosq_test.gen_connect("unsubscribe-test", keepalive=keepalive, proto_ver=5) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + +disconnect_packet = mosq_test.gen_disconnect(proto_ver=5) + +mid = 1 +props = mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "key", "value") +unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "unsubscribe/test", proto_ver=5, properties=props) +unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=5) + +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.settimeout(10) +sock.bind(('', port)) +sock.listen(5) + +client_args = sys.argv[1:] +client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, port=port) + +try: + (conn, address) = sock.accept() + conn.settimeout(10) + + mosq_test.do_receive_send(conn, connect_packet, connack_packet, "connect") + mosq_test.do_receive_send(conn, unsubscribe_packet, unsuback_packet, "unsubscribe") + mosq_test.expect_packet(conn, "disconnect", disconnect_packet) + rc = 0 + + conn.close() +except mosq_test.TestError: + pass +finally: + if mosq_test.wait_for_subprocess(client): + print("test client not finished") + rc=1 + sock.close() + +exit(rc) diff --git a/test/lib/Makefile b/test/lib/Makefile index a37dbc85..2bdcd128 100644 --- a/test/lib/Makefile +++ b/test/lib/Makefile @@ -47,6 +47,7 @@ c : test-compile ./02-subscribe-helper-qos2.py $@/02-subscribe-helper-qos2.test ./02-unsubscribe-multiple-v5.py $@/02-unsubscribe-multiple-v5.test ./02-unsubscribe-v5.py $@/02-unsubscribe-v5.test + ./02-unsubscribe2-v5.py $@/02-unsubscribe2-v5.test ./02-unsubscribe.py $@/02-unsubscribe.test ./03-publish-b2c-qos1.py $@/03-publish-b2c-qos1.test ./03-publish-b2c-qos1-unexpected-puback.py $@/03-publish-b2c-qos1-unexpected-puback.test diff --git a/test/lib/c/02-unsubscribe-v5.c b/test/lib/c/02-unsubscribe-v5.c index 14f4e1ad..c3c01544 100644 --- a/test/lib/c/02-unsubscribe-v5.c +++ b/test/lib/c/02-unsubscribe-v5.c @@ -30,10 +30,11 @@ static void on_disconnect(struct mosquitto *mosq, void *obj, int rc) run = rc; } -static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid) +static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid, const mosquitto_property *props) { (void)obj; (void)mid; + (void)props; mosquitto_disconnect(mosq); } @@ -58,7 +59,7 @@ int main(int argc, char *argv[]) mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5); mosquitto_connect_callback_set(mosq, on_connect); mosquitto_disconnect_callback_set(mosq, on_disconnect); - mosquitto_unsubscribe_callback_set(mosq, on_unsubscribe); + mosquitto_unsubscribe_v5_callback_set(mosq, on_unsubscribe); rc = mosquitto_connect(mosq, "localhost", port, 60); if(rc != MOSQ_ERR_SUCCESS) return rc; diff --git a/test/lib/c/02-unsubscribe2-v5.c b/test/lib/c/02-unsubscribe2-v5.c new file mode 100644 index 00000000..3a96f6c3 --- /dev/null +++ b/test/lib/c/02-unsubscribe2-v5.c @@ -0,0 +1,79 @@ +#include +#include +#include +#include + +static int run = -1; + +static void on_connect(struct mosquitto *mosq, void *obj, int rc) +{ + int rc2; + mosquitto_property *proplist; + (void)obj; + + if(rc){ + exit(1); + }else{ + rc2 = mosquitto_property_add_string_pair(&proplist, MQTT_PROP_USER_PROPERTY, "key", "value"); + if(rc2 != MOSQ_ERR_SUCCESS){ + abort(); + } + mosquitto_unsubscribe_v5(mosq, NULL, "unsubscribe/test", proplist); + } +} + +static void on_disconnect(struct mosquitto *mosq, void *obj, int rc) +{ + (void)mosq; + (void)obj; + + run = rc; +} + +static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props) +{ + (void)obj; + (void)mid; + (void)props; + + for(int i=0; i