diff --git a/ChangeLog.txt b/ChangeLog.txt index 7ace1882..4dd09469 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -103,6 +103,8 @@ Plugins / plugin interface: client. - The dynamic security plugin now only kicks clients at the start of the next network loop, to give chance for PUBACK/PUBREC to be sent. Closes #2474. +- Add MOSQ_EVT_SUBSCRIBE and MOSQ_EVT_UNSUBSCRIBE events that are called when + subscribe/unsubscribes actually succeed. Client library: - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index 19f9ab94..acdb37e4 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -75,23 +75,25 @@ enum mosquitto_plugin_event { MOSQ_EVT_TICK = 9, MOSQ_EVT_DISCONNECT = 10, MOSQ_EVT_CONNECT = 11, - MOSQ_EVT_PERSIST_RESTORE = 12, - MOSQ_EVT_PERSIST_CONFIG_ADD = 13, - MOSQ_EVT_PERSIST_BASE_MSG_ADD = 14, - MOSQ_EVT_PERSIST_BASE_MSG_DELETE = 15, - MOSQ_EVT_PERSIST_BASE_MSG_LOAD = 16, - MOSQ_EVT_PERSIST_RETAIN_MSG_SET = 17, - MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE = 18, - MOSQ_EVT_PERSIST_CLIENT_ADD = 19, - MOSQ_EVT_PERSIST_CLIENT_DELETE = 20, - MOSQ_EVT_PERSIST_CLIENT_UPDATE = 21, - MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD = 22, - MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE = 23, - MOSQ_EVT_PERSIST_CLIENT_MSG_ADD = 24, - MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 25, - MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 26, - MOSQ_EVT_PERSIST_CLIENT_MSG_CLEAR = 27, - MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 28, + MOSQ_EVT_SUBSCRIBE = 12, + MOSQ_EVT_UNSUBSCRIBE = 13, + MOSQ_EVT_PERSIST_RESTORE = 14, + MOSQ_EVT_PERSIST_CONFIG_ADD = 15, + MOSQ_EVT_PERSIST_BASE_MSG_ADD = 16, + MOSQ_EVT_PERSIST_BASE_MSG_DELETE = 17, + MOSQ_EVT_PERSIST_BASE_MSG_LOAD = 18, + MOSQ_EVT_PERSIST_RETAIN_MSG_SET = 19, + MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE = 20, + MOSQ_EVT_PERSIST_CLIENT_ADD = 21, + MOSQ_EVT_PERSIST_CLIENT_DELETE = 22, + MOSQ_EVT_PERSIST_CLIENT_UPDATE = 23, + MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD = 24, + MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE = 25, + MOSQ_EVT_PERSIST_CLIENT_MSG_ADD = 26, + MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 27, + MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 28, + MOSQ_EVT_PERSIST_CLIENT_MSG_CLEAR = 29, + MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 30, }; /* Data for the MOSQ_EVT_RELOAD event */ @@ -204,6 +206,30 @@ struct mosquitto_evt_disconnect { void *future2[4]; }; +/* Data for the MOSQ_EVT_SUBSCRIBE event */ +struct mosquitto_evt_subscribe { + void *future; + struct mosquitto *client; + const char *topic; + const mosquitto_property *properties; + uint32_t subscription_identifier; + uint8_t subscription_options; + uint8_t qos; + uint8_t padding[2]; + void *future2[8]; +}; + + +/* Data for the MOSQ_EVT_UNSUBSCRIBE event */ +struct mosquitto_evt_unsubscribe { + void *future; + struct mosquitto *client; + const char *topic; + const mosquitto_property *properties; + void *future2[8]; +}; + + /* Data for the MOSQ_EVT_PERSIST_RESTORE event */ /* NOTE: The persistence interface is currently marked as unstable, which means * it may change in a future minor release. */ diff --git a/plugins/examples/plugin-event-stats/mosquitto_plugin_event_stats.c b/plugins/examples/plugin-event-stats/mosquitto_plugin_event_stats.c index f511b029..e2d0564c 100644 --- a/plugins/examples/plugin-event-stats/mosquitto_plugin_event_stats.c +++ b/plugins/examples/plugin-event-stats/mosquitto_plugin_event_stats.c @@ -63,6 +63,8 @@ const char evt_topics[][60] = { TOPIC_BASE "tick", /* MOSQ_EVT_TICK */ TOPIC_BASE "disconnect", /* MOSQ_EVT_DISCONNECT */ TOPIC_BASE "connect", /* MOSQ_EVT_CONNECT */ + TOPIC_BASE "subscribe", /* MOSQ_EVT_SUBSCRIBE */ + TOPIC_BASE "unsubscribe", /* MOSQ_EVT_UNSUBSCRIBE */ TOPIC_BASE "persist/restore", /* MOSQ_EVT_PERSIST_RESTORE */ TOPIC_BASE "persist/config/add", /* MOSQ_EVT_PERSIST_CONFIG_ADD */ TOPIC_BASE "persist/message/base/add", /* MOSQ_EVT_PERSIST_MSG_ADD */ diff --git a/src/Makefile b/src/Makefile index bbab747a..3a0ee070 100644 --- a/src/Makefile +++ b/src/Makefile @@ -72,6 +72,8 @@ OBJS= mosquitto.o \ plugin_persist.o \ plugin_psk_key.o \ plugin_public.o \ + plugin_subscribe.o \ + plugin_unsubscribe.o \ plugin_tick.o \ read_handle.o \ retain.o \ @@ -314,9 +316,15 @@ plugin_psk_key.o : plugin_psk_key.c ${R}/include/mosquitto_plugin.h mosquitto_br plugin_public.o : plugin_public.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +plugin_subscribe.o : plugin_subscribe.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + plugin_tick.o : plugin_tick.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +plugin_unsubscribe.o : plugin_unsubscribe.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + read_handle.o : read_handle.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index b752d247..70a553a9 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -213,6 +213,7 @@ int handle__subscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub); + plugin__handle_subscribe(context, sub, qos, subscription_options, subscription_identifier, properties); plugin_persist__handle_subscription_add(context, sub, qos | subscription_options, subscription_identifier); } mosquitto__FREE(sub); diff --git a/src/handle_unsubscribe.c b/src/handle_unsubscribe.c index a40acbc8..2866c5e2 100644 --- a/src/handle_unsubscribe.c +++ b/src/handle_unsubscribe.c @@ -130,6 +130,7 @@ int handle__unsubscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub); if(allowed){ rc = sub__remove(context, sub, db.subs, &reason); + plugin__handle_unsubscribe(context, sub, properties); plugin_persist__handle_subscription_delete(context, sub); }else{ rc = MOSQ_ERR_SUCCESS; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 4d0121d5..3839728d 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -164,6 +164,8 @@ struct plugin__callbacks{ struct mosquitto__callback *message; struct mosquitto__callback *psk_key; struct mosquitto__callback *reload; + struct mosquitto__callback *subscribe; + struct mosquitto__callback *unsubscribe; struct mosquitto__callback *persist_restore; struct mosquitto__callback *persist_client_add; struct mosquitto__callback *persist_client_delete; @@ -867,6 +869,8 @@ int acl__pre_check(struct mosquitto__plugin_config *plugin, struct mosquitto *co void plugin__handle_connect(struct mosquitto *context); void plugin__handle_disconnect(struct mosquitto *context, int reason); int plugin__handle_message(struct mosquitto *context, struct mosquitto_base_msg *base_msg); +int plugin__handle_subscribe(struct mosquitto *context, const char *topic, uint8_t qos, uint8_t subscription_options, uint32_t subscription_identifier, const mosquitto_property *properties); +int plugin__handle_unsubscribe(struct mosquitto *context, const char *topic, const mosquitto_property *properties); void LIB_ERROR(void); void plugin__handle_tick(void); int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier); diff --git a/src/plugin_callbacks.c b/src/plugin_callbacks.c index c2e6e974..210bb0fa 100644 --- a/src/plugin_callbacks.c +++ b/src/plugin_callbacks.c @@ -47,6 +47,10 @@ static const char *get_event_name(int event) return "disconnect"; case MOSQ_EVT_CONNECT: return "connect"; + case MOSQ_EVT_SUBSCRIBE: + return "subscribe"; + case MOSQ_EVT_UNSUBSCRIBE: + return "unsubscribe"; case MOSQ_EVT_PERSIST_RESTORE: return "persist-restore"; case MOSQ_EVT_PERSIST_BASE_MSG_ADD: @@ -121,6 +125,10 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__ return &security_options->plugin_callbacks.disconnect; case MOSQ_EVT_CONNECT: return &security_options->plugin_callbacks.connect; + case MOSQ_EVT_SUBSCRIBE: + return &security_options->plugin_callbacks.subscribe; + case MOSQ_EVT_UNSUBSCRIBE: + return &security_options->plugin_callbacks.unsubscribe; case MOSQ_EVT_PERSIST_RESTORE: return &security_options->plugin_callbacks.persist_restore; case MOSQ_EVT_PERSIST_CLIENT_ADD: diff --git a/src/plugin_subscribe.c b/src/plugin_subscribe.c new file mode 100644 index 00000000..4f7ab840 --- /dev/null +++ b/src/plugin_subscribe.c @@ -0,0 +1,67 @@ +/* +Copyright (c) 2016-2022 Roger Light +Copyright (c) 2022 Cedalo GmbH + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "utlist.h" + + +static int plugin__handle_subscribe_single(struct mosquitto__security_options *opts, struct mosquitto *context, const char *topic, uint8_t qos, uint8_t subscription_options, uint32_t subscription_identifier, const mosquitto_property *properties) +{ + struct mosquitto_evt_subscribe event_data; + struct mosquitto__callback *cb_base; + int rc = MOSQ_ERR_SUCCESS; + + memset(&event_data, 0, sizeof(event_data)); + event_data.client = context; + event_data.topic = topic; + event_data.qos = qos; + event_data.subscription_options = subscription_options; + event_data.subscription_identifier = subscription_identifier; + event_data.properties = properties; + + DL_FOREACH(opts->plugin_callbacks.subscribe, cb_base){ + rc = cb_base->cb(MOSQ_EVT_SUBSCRIBE, &event_data, cb_base->userdata); + if(rc != MOSQ_ERR_SUCCESS){ + break; + } + } + + return rc; +} + + +int plugin__handle_subscribe(struct mosquitto *context, const char *topic, uint8_t qos, uint8_t subscription_options, uint32_t subscription_identifier, const mosquitto_property *properties) +{ + int rc = MOSQ_ERR_SUCCESS; + + /* Global plugins */ + rc = plugin__handle_subscribe_single(&db.config->security_options, + context, topic, qos, subscription_options, subscription_identifier, properties); + if(rc) return rc; + + if(db.config->per_listener_settings && context->listener){ + rc = plugin__handle_subscribe_single(&context->listener->security_options, + context, topic, qos, subscription_options, subscription_identifier, properties); + } + + return rc; +} diff --git a/src/plugin_unsubscribe.c b/src/plugin_unsubscribe.c new file mode 100644 index 00000000..7f8c5d0a --- /dev/null +++ b/src/plugin_unsubscribe.c @@ -0,0 +1,64 @@ +/* +Copyright (c) 2016-2022 Roger Light +Copyright (c) 2022 Cedalo GmbH + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "utlist.h" + + +static int plugin__handle_unsubscribe_single(struct mosquitto__security_options *opts, struct mosquitto *context, const char *topic, const mosquitto_property *properties) +{ + struct mosquitto_evt_unsubscribe event_data; + struct mosquitto__callback *cb_base; + int rc = MOSQ_ERR_SUCCESS; + + memset(&event_data, 0, sizeof(event_data)); + event_data.client = context; + event_data.topic = topic; + event_data.properties = properties; + + DL_FOREACH(opts->plugin_callbacks.unsubscribe, cb_base){ + rc = cb_base->cb(MOSQ_EVT_UNSUBSCRIBE, &event_data, cb_base->userdata); + if(rc != MOSQ_ERR_SUCCESS){ + break; + } + } + + return rc; +} + + +int plugin__handle_unsubscribe(struct mosquitto *context, const char *topic, const mosquitto_property *properties) +{ + int rc = MOSQ_ERR_SUCCESS; + + /* Global plugins */ + rc = plugin__handle_unsubscribe_single(&db.config->security_options, + context, topic, properties); + if(rc) return rc; + + if(db.config->per_listener_settings && context->listener){ + rc = plugin__handle_unsubscribe_single(&context->listener->security_options, + context, topic, properties); + } + + return rc; +}