Add mosquitto_unsubscribe2_v5_callback_set

pull/2709/head
Roger A. Light 3 years ago
parent 7419aa7530
commit fc84340820

@ -2301,7 +2301,7 @@ libmosq_EXPORT void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, v
* Parameters:
* mosq - a valid mosquitto instance.
* on_unsubscribe - a callback function in the following form:
* void callback(struct mosquitto *mosq, void *obj, int mid)
* void callback(struct mosquitto *mosq, void *obj, int mid, const mosquitto_property *props)
*
* Callback Parameters:
* mosq - the mosquitto instance making the callback.
@ -2311,6 +2311,34 @@ libmosq_EXPORT void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, v
*/
libmosq_EXPORT void mosquitto_unsubscribe_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, const mosquitto_property *props));
/*
* Function: mosquitto_unsubscribe2_v5_callback_set
*
* Set the unsubscribe callback. This is called when the library receives a
* UNSUBACK message in response to an UNSUBSCRIBE.
*
* It is valid to set this callback for all MQTT protocol versions. If it is
* used with MQTT clients that use MQTT v3.1.1 or earlier, then the `props`
* argument will always be NULL.
*
* Parameters:
* mosq - a valid mosquitto instance.
* on_unsubscribe - a callback function in the following form:
* void callback(struct mosquitto *mosq, void *obj, int mid,
* int reason_code_count, const int *reason_codes, const mosquitto_property *props)
*
* Callback Parameters:
* mosq - the mosquitto instance making the callback.
* obj - the user data provided in <mosquitto_new>
* mid - the message id of the unsubscribe message.
* reason_code_count - the count of reason code responses
* reason_codes - an array of integers indicating the reason codes for each of
* the unsubscription requests.
* mid - the message id of the unsubscribe message.
* props - list of MQTT 5 properties, or NULL
*/
libmosq_EXPORT void mosquitto_unsubscribe2_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, int, const int *, const mosquitto_property *props));
/*
* Function: mosquitto_log_callback_set
*

@ -121,6 +121,13 @@ void mosquitto_unsubscribe_v5_callback_set(struct mosquitto *mosq, void (*on_uns
pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_unsubscribe2_v5_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int, int, const int *, const mosquitto_property *props))
{
pthread_mutex_lock(&mosq->callback_mutex);
mosq->on_unsubscribe2_v5 = on_unsubscribe;
pthread_mutex_unlock(&mosq->callback_mutex);
}
void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
{
pthread_mutex_lock(&mosq->log_callback_mutex);
@ -234,14 +241,16 @@ void callback__on_subscribe(struct mosquitto *mosq, int mid, int qos_count, cons
}
void callback__on_unsubscribe(struct mosquitto *mosq, int mid, const mosquitto_property *properties)
void callback__on_unsubscribe(struct mosquitto *mosq, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *properties)
{
void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid) = NULL;
void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props) = NULL;
void (*on_unsubscribe2_v5)(struct mosquitto *, void *userdata, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props) = NULL;
pthread_mutex_lock(&mosq->callback_mutex);
on_unsubscribe = mosq->on_unsubscribe;
on_unsubscribe_v5 = mosq->on_unsubscribe_v5;
on_unsubscribe2_v5 = mosq->on_unsubscribe2_v5;
pthread_mutex_unlock(&mosq->callback_mutex);
mosq->callback_depth++;
@ -251,6 +260,9 @@ void callback__on_unsubscribe(struct mosquitto *mosq, int mid, const mosquitto_p
if(on_unsubscribe_v5){
on_unsubscribe_v5(mosq, mosq->userdata, mid, properties);
}
if(on_unsubscribe2_v5){
on_unsubscribe2_v5(mosq, mosq->userdata, mid, reason_code_count, reason_codes, properties);
}
mosq->callback_depth--;
}

@ -25,7 +25,7 @@ void callback__on_connect(struct mosquitto *mosq, uint8_t reason_code, uint8_t c
void callback__on_publish(struct mosquitto *mosq, int mid, int reason_code, const mosquitto_property *properties);
void callback__on_message(struct mosquitto *mosq, const struct mosquitto_message *message, const mosquitto_property *properties);
void callback__on_subscribe(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props);
void callback__on_unsubscribe(struct mosquitto *mosq, int mid, const mosquitto_property *props);
void callback__on_unsubscribe(struct mosquitto *mosq, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props);
void callback__on_disconnect(struct mosquitto *mosq, int rc, const mosquitto_property *props);
#endif

@ -45,6 +45,8 @@ int handle__unsuback(struct mosquitto *mosq)
uint16_t mid;
int rc;
mosquitto_property *properties = NULL;
int *reason_codes = NULL;
int reason_code_count = 0;
assert(mosq);
@ -69,20 +71,30 @@ int handle__unsuback(struct mosquitto *mosq)
if(mid == 0) return MOSQ_ERR_PROTOCOL;
if(mosq->protocol == mosq_p_mqtt5){
rc = property__read_all(CMD_UNSUBACK, &mosq->in_packet, &properties);
if(rc) return rc;
}
if(mosq->in_packet.pos < mosq->in_packet.remaining_length){
return MOSQ_ERR_MALFORMED_PACKET;
uint8_t byte;
reason_code_count = (int)(mosq->in_packet.remaining_length - mosq->in_packet.pos);
reason_codes = mosquitto__malloc((size_t)reason_code_count*sizeof(int));
if(!reason_codes){
mosquitto_property_free_all(&properties);
return MOSQ_ERR_NOMEM;
}
for(int i=0; i<reason_code_count; i++){
rc = packet__read_byte(&mosq->in_packet, &byte);
if(rc){
mosquitto__FREE(reason_codes);
mosquitto_property_free_all(&properties);
return rc;
}
reason_codes[i] = (int)byte;
i++;
}
}
#ifdef WITH_BROKER
/* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties);
#else
callback__on_unsubscribe(mosq, mid, properties);
mosquitto_property_free_all(&properties);
#ifndef WITH_BROKER
callback__on_unsubscribe(mosq, mid, reason_code_count, reason_codes, properties);
#endif
mosquitto_property_free_all(&properties);
mosquitto__FREE(reason_codes);
return MOSQ_ERR_SUCCESS;
}

@ -148,4 +148,5 @@ MOSQ_2.1 {
mosquitto_topic_matches_sub_with_pattern;
mosquitto_sub_matches_acl;
mosquitto_sub_matches_acl_with_pattern;
mosquitto_unsubscribe2_v5_callback_set;
} MOSQ_1.7;

@ -396,6 +396,7 @@ struct mosquitto {
void (*on_subscribe_v5)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props);
void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props);
void (*on_unsubscribe2_v5)(struct mosquitto *, void *userdata, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props);
void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
/*void (*on_error)();*/
char *host;

@ -0,0 +1,47 @@
#!/usr/bin/env python3
# Test whether a v5 client sends a correct UNSUBSCRIBE packet, and handles the UNSUBACK.
from mosq_test_helper import *
port = mosq_test.get_lib_port()
keepalive = 60
connect_packet = mosq_test.gen_connect("unsubscribe-test", keepalive=keepalive, proto_ver=5)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
disconnect_packet = mosq_test.gen_disconnect(proto_ver=5)
mid = 1
props = mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "key", "value")
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "unsubscribe/test", proto_ver=5, properties=props)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=5)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', port))
sock.listen(5)
client_args = sys.argv[1:]
client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, port=port)
try:
(conn, address) = sock.accept()
conn.settimeout(10)
mosq_test.do_receive_send(conn, connect_packet, connack_packet, "connect")
mosq_test.do_receive_send(conn, unsubscribe_packet, unsuback_packet, "unsubscribe")
mosq_test.expect_packet(conn, "disconnect", disconnect_packet)
rc = 0
conn.close()
except mosq_test.TestError:
pass
finally:
if mosq_test.wait_for_subprocess(client):
print("test client not finished")
rc=1
sock.close()
exit(rc)

@ -47,6 +47,7 @@ c : test-compile
./02-subscribe-helper-qos2.py $@/02-subscribe-helper-qos2.test
./02-unsubscribe-multiple-v5.py $@/02-unsubscribe-multiple-v5.test
./02-unsubscribe-v5.py $@/02-unsubscribe-v5.test
./02-unsubscribe2-v5.py $@/02-unsubscribe2-v5.test
./02-unsubscribe.py $@/02-unsubscribe.test
./03-publish-b2c-qos1.py $@/03-publish-b2c-qos1.test
./03-publish-b2c-qos1-unexpected-puback.py $@/03-publish-b2c-qos1-unexpected-puback.test

@ -30,10 +30,11 @@ static void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
run = rc;
}
static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid)
static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid, const mosquitto_property *props)
{
(void)obj;
(void)mid;
(void)props;
mosquitto_disconnect(mosq);
}
@ -58,7 +59,7 @@ int main(int argc, char *argv[])
mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_unsubscribe_callback_set(mosq, on_unsubscribe);
mosquitto_unsubscribe_v5_callback_set(mosq, on_unsubscribe);
rc = mosquitto_connect(mosq, "localhost", port, 60);
if(rc != MOSQ_ERR_SUCCESS) return rc;

@ -0,0 +1,79 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
static int run = -1;
static void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
int rc2;
mosquitto_property *proplist;
(void)obj;
if(rc){
exit(1);
}else{
rc2 = mosquitto_property_add_string_pair(&proplist, MQTT_PROP_USER_PROPERTY, "key", "value");
if(rc2 != MOSQ_ERR_SUCCESS){
abort();
}
mosquitto_unsubscribe_v5(mosq, NULL, "unsubscribe/test", proplist);
}
}
static void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
(void)mosq;
(void)obj;
run = rc;
}
static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props)
{
(void)obj;
(void)mid;
(void)props;
for(int i=0; i<reason_code_count; i++){
if(reason_codes[i] != 0){
exit(1);
}
}
mosquitto_disconnect(mosq);
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
int port;
if(argc < 2){
return 1;
}
port = atoi(argv[1]);
mosquitto_lib_init();
mosq = mosquitto_new("unsubscribe-test", true, NULL);
if(mosq == NULL){
return 1;
}
mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_unsubscribe2_v5_callback_set(mosq, on_unsubscribe);
rc = mosquitto_connect(mosq, "localhost", port, 60);
if(rc != MOSQ_ERR_SUCCESS) return rc;
while(run == -1){
mosquitto_loop(mosq, -1, 1);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return run;
}

@ -19,6 +19,7 @@ set(BINARIES
02-subscribe-qos2
02-unsubscribe-multiple-v5
02-unsubscribe-v5
02-unsubscribe2-v5
02-unsubscribe
03-publish-b2c-qos1-unexpected-puback
03-publish-b2c-qos1

@ -27,6 +27,7 @@ SRC = \
02-subscribe-qos2.c \
02-unsubscribe-multiple-v5.c \
02-unsubscribe-v5.c \
02-unsubscribe2-v5.c \
02-unsubscribe.c \
03-publish-b2c-qos1-unexpected-puback.c \
03-publish-b2c-qos1.c \

@ -236,15 +236,28 @@ static void on_subscribe_v5(struct mosquitto *mosq, void *obj, int mid, int qos_
prop_test(props);
}
static void on_unsubscribe(struct mosquitto *mosq, void *obj, int reason_code)
static void on_unsubscribe(struct mosquitto *mosq, void *obj, int mid)
{
}
static void on_unsubscribe_v5(struct mosquitto *mosq, void *obj, int reason_code, const mosquitto_property *props)
static void on_unsubscribe_v5(struct mosquitto *mosq, void *obj, int mid, const mosquitto_property *props)
{
prop_test(props);
}
static void on_unsubscribe2_v5(struct mosquitto *mosq, void *obj, int mid, int reason_code_count, const int *reason_codes, const mosquitto_property *props)
{
int sum = 0;
prop_test(props);
for(int i=0; i<reason_code_count; i++){
sum += reason_codes[i];
}
if(sum < 0){
/* This is a "fake" condition to stop the above check being optimised out */
exit(1);
}
}
static void on_log(struct mosquitto *mosq, void *obj, int level, const char *str)
{
if(str == NULL){
@ -313,6 +326,7 @@ int main(int argc, char *argv[])
mosquitto_unsubscribe_callback_set(mosq, on_unsubscribe);
mosquitto_unsubscribe_v5_callback_set(mosq, on_unsubscribe_v5);
mosquitto_unsubscribe2_v5_callback_set(mosq, on_unsubscribe2_v5);
mosquitto_log_callback_set(mosq, on_log);

@ -23,6 +23,7 @@ tests = [
(1, ['./02-subscribe-qos2.py', 'c/02-subscribe-qos2.test']),
(1, ['./02-unsubscribe-multiple-v5.py', 'c/02-unsubscribe-multiple-v5.test']),
(1, ['./02-unsubscribe-v5.py', 'c/02-unsubscribe-v5.test']),
(1, ['./02-unsubscribe2-v5.py', 'c/02-unsubscribe2-v5.test']),
(1, ['./02-unsubscribe.py', 'c/02-unsubscribe.test']),
(1, ['./03-publish-b2c-qos1.py', 'c/03-publish-b2c-qos1.test']),

Loading…
Cancel
Save