Add MOSQ_EVT_SUBSCRIBE and MOSQ_EVT_UNSUBSCRIBE events.

These are called when subscribe/unsubscribes actually succeed.
pull/2505/head
Roger A. Light 4 years ago
parent fc4a59b288
commit 40d8015837

@ -103,6 +103,8 @@ Plugins / plugin interface:
client. client.
- The dynamic security plugin now only kicks clients at the start of the next - The dynamic security plugin now only kicks clients at the start of the next
network loop, to give chance for PUBACK/PUBREC to be sent. Closes #2474. network loop, to give chance for PUBACK/PUBREC to be sent. Closes #2474.
- Add MOSQ_EVT_SUBSCRIBE and MOSQ_EVT_UNSUBSCRIBE events that are called when
subscribe/unsubscribes actually succeed.
Client library: Client library:
- Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair

@ -75,23 +75,25 @@ enum mosquitto_plugin_event {
MOSQ_EVT_TICK = 9, MOSQ_EVT_TICK = 9,
MOSQ_EVT_DISCONNECT = 10, MOSQ_EVT_DISCONNECT = 10,
MOSQ_EVT_CONNECT = 11, MOSQ_EVT_CONNECT = 11,
MOSQ_EVT_PERSIST_RESTORE = 12, MOSQ_EVT_SUBSCRIBE = 12,
MOSQ_EVT_PERSIST_CONFIG_ADD = 13, MOSQ_EVT_UNSUBSCRIBE = 13,
MOSQ_EVT_PERSIST_BASE_MSG_ADD = 14, MOSQ_EVT_PERSIST_RESTORE = 14,
MOSQ_EVT_PERSIST_BASE_MSG_DELETE = 15, MOSQ_EVT_PERSIST_CONFIG_ADD = 15,
MOSQ_EVT_PERSIST_BASE_MSG_LOAD = 16, MOSQ_EVT_PERSIST_BASE_MSG_ADD = 16,
MOSQ_EVT_PERSIST_RETAIN_MSG_SET = 17, MOSQ_EVT_PERSIST_BASE_MSG_DELETE = 17,
MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE = 18, MOSQ_EVT_PERSIST_BASE_MSG_LOAD = 18,
MOSQ_EVT_PERSIST_CLIENT_ADD = 19, MOSQ_EVT_PERSIST_RETAIN_MSG_SET = 19,
MOSQ_EVT_PERSIST_CLIENT_DELETE = 20, MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE = 20,
MOSQ_EVT_PERSIST_CLIENT_UPDATE = 21, MOSQ_EVT_PERSIST_CLIENT_ADD = 21,
MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD = 22, MOSQ_EVT_PERSIST_CLIENT_DELETE = 22,
MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE = 23, MOSQ_EVT_PERSIST_CLIENT_UPDATE = 23,
MOSQ_EVT_PERSIST_CLIENT_MSG_ADD = 24, MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD = 24,
MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 25, MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE = 25,
MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 26, MOSQ_EVT_PERSIST_CLIENT_MSG_ADD = 26,
MOSQ_EVT_PERSIST_CLIENT_MSG_CLEAR = 27, MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 27,
MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 28, MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 28,
MOSQ_EVT_PERSIST_CLIENT_MSG_CLEAR = 29,
MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 30,
}; };
/* Data for the MOSQ_EVT_RELOAD event */ /* Data for the MOSQ_EVT_RELOAD event */
@ -204,6 +206,30 @@ struct mosquitto_evt_disconnect {
void *future2[4]; void *future2[4];
}; };
/* Data for the MOSQ_EVT_SUBSCRIBE event */
struct mosquitto_evt_subscribe {
void *future;
struct mosquitto *client;
const char *topic;
const mosquitto_property *properties;
uint32_t subscription_identifier;
uint8_t subscription_options;
uint8_t qos;
uint8_t padding[2];
void *future2[8];
};
/* Data for the MOSQ_EVT_UNSUBSCRIBE event */
struct mosquitto_evt_unsubscribe {
void *future;
struct mosquitto *client;
const char *topic;
const mosquitto_property *properties;
void *future2[8];
};
/* Data for the MOSQ_EVT_PERSIST_RESTORE event */ /* Data for the MOSQ_EVT_PERSIST_RESTORE event */
/* NOTE: The persistence interface is currently marked as unstable, which means /* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */ * it may change in a future minor release. */

@ -63,6 +63,8 @@ const char evt_topics[][60] = {
TOPIC_BASE "tick", /* MOSQ_EVT_TICK */ TOPIC_BASE "tick", /* MOSQ_EVT_TICK */
TOPIC_BASE "disconnect", /* MOSQ_EVT_DISCONNECT */ TOPIC_BASE "disconnect", /* MOSQ_EVT_DISCONNECT */
TOPIC_BASE "connect", /* MOSQ_EVT_CONNECT */ TOPIC_BASE "connect", /* MOSQ_EVT_CONNECT */
TOPIC_BASE "subscribe", /* MOSQ_EVT_SUBSCRIBE */
TOPIC_BASE "unsubscribe", /* MOSQ_EVT_UNSUBSCRIBE */
TOPIC_BASE "persist/restore", /* MOSQ_EVT_PERSIST_RESTORE */ TOPIC_BASE "persist/restore", /* MOSQ_EVT_PERSIST_RESTORE */
TOPIC_BASE "persist/config/add", /* MOSQ_EVT_PERSIST_CONFIG_ADD */ TOPIC_BASE "persist/config/add", /* MOSQ_EVT_PERSIST_CONFIG_ADD */
TOPIC_BASE "persist/message/base/add", /* MOSQ_EVT_PERSIST_MSG_ADD */ TOPIC_BASE "persist/message/base/add", /* MOSQ_EVT_PERSIST_MSG_ADD */

@ -72,6 +72,8 @@ OBJS= mosquitto.o \
plugin_persist.o \ plugin_persist.o \
plugin_psk_key.o \ plugin_psk_key.o \
plugin_public.o \ plugin_public.o \
plugin_subscribe.o \
plugin_unsubscribe.o \
plugin_tick.o \ plugin_tick.o \
read_handle.o \ read_handle.o \
retain.o \ retain.o \
@ -314,9 +316,15 @@ plugin_psk_key.o : plugin_psk_key.c ${R}/include/mosquitto_plugin.h mosquitto_br
plugin_public.o : plugin_public.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h plugin_public.o : plugin_public.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_subscribe.o : plugin_subscribe.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_tick.o : plugin_tick.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h plugin_tick.o : plugin_tick.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_unsubscribe.o : plugin_unsubscribe.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
read_handle.o : read_handle.c mosquitto_broker_internal.h read_handle.o : read_handle.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

@ -213,6 +213,7 @@ int handle__subscribe(struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub); log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
plugin__handle_subscribe(context, sub, qos, subscription_options, subscription_identifier, properties);
plugin_persist__handle_subscription_add(context, sub, qos | subscription_options, subscription_identifier); plugin_persist__handle_subscription_add(context, sub, qos | subscription_options, subscription_identifier);
} }
mosquitto__FREE(sub); mosquitto__FREE(sub);

@ -130,6 +130,7 @@ int handle__unsubscribe(struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub); log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub);
if(allowed){ if(allowed){
rc = sub__remove(context, sub, db.subs, &reason); rc = sub__remove(context, sub, db.subs, &reason);
plugin__handle_unsubscribe(context, sub, properties);
plugin_persist__handle_subscription_delete(context, sub); plugin_persist__handle_subscription_delete(context, sub);
}else{ }else{
rc = MOSQ_ERR_SUCCESS; rc = MOSQ_ERR_SUCCESS;

@ -164,6 +164,8 @@ struct plugin__callbacks{
struct mosquitto__callback *message; struct mosquitto__callback *message;
struct mosquitto__callback *psk_key; struct mosquitto__callback *psk_key;
struct mosquitto__callback *reload; struct mosquitto__callback *reload;
struct mosquitto__callback *subscribe;
struct mosquitto__callback *unsubscribe;
struct mosquitto__callback *persist_restore; struct mosquitto__callback *persist_restore;
struct mosquitto__callback *persist_client_add; struct mosquitto__callback *persist_client_add;
struct mosquitto__callback *persist_client_delete; struct mosquitto__callback *persist_client_delete;
@ -867,6 +869,8 @@ int acl__pre_check(struct mosquitto__plugin_config *plugin, struct mosquitto *co
void plugin__handle_connect(struct mosquitto *context); void plugin__handle_connect(struct mosquitto *context);
void plugin__handle_disconnect(struct mosquitto *context, int reason); void plugin__handle_disconnect(struct mosquitto *context, int reason);
int plugin__handle_message(struct mosquitto *context, struct mosquitto_base_msg *base_msg); int plugin__handle_message(struct mosquitto *context, struct mosquitto_base_msg *base_msg);
int plugin__handle_subscribe(struct mosquitto *context, const char *topic, uint8_t qos, uint8_t subscription_options, uint32_t subscription_identifier, const mosquitto_property *properties);
int plugin__handle_unsubscribe(struct mosquitto *context, const char *topic, const mosquitto_property *properties);
void LIB_ERROR(void); void LIB_ERROR(void);
void plugin__handle_tick(void); void plugin__handle_tick(void);
int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier); int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier);

@ -47,6 +47,10 @@ static const char *get_event_name(int event)
return "disconnect"; return "disconnect";
case MOSQ_EVT_CONNECT: case MOSQ_EVT_CONNECT:
return "connect"; return "connect";
case MOSQ_EVT_SUBSCRIBE:
return "subscribe";
case MOSQ_EVT_UNSUBSCRIBE:
return "unsubscribe";
case MOSQ_EVT_PERSIST_RESTORE: case MOSQ_EVT_PERSIST_RESTORE:
return "persist-restore"; return "persist-restore";
case MOSQ_EVT_PERSIST_BASE_MSG_ADD: case MOSQ_EVT_PERSIST_BASE_MSG_ADD:
@ -121,6 +125,10 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__
return &security_options->plugin_callbacks.disconnect; return &security_options->plugin_callbacks.disconnect;
case MOSQ_EVT_CONNECT: case MOSQ_EVT_CONNECT:
return &security_options->plugin_callbacks.connect; return &security_options->plugin_callbacks.connect;
case MOSQ_EVT_SUBSCRIBE:
return &security_options->plugin_callbacks.subscribe;
case MOSQ_EVT_UNSUBSCRIBE:
return &security_options->plugin_callbacks.unsubscribe;
case MOSQ_EVT_PERSIST_RESTORE: case MOSQ_EVT_PERSIST_RESTORE:
return &security_options->plugin_callbacks.persist_restore; return &security_options->plugin_callbacks.persist_restore;
case MOSQ_EVT_PERSIST_CLIENT_ADD: case MOSQ_EVT_PERSIST_CLIENT_ADD:

@ -0,0 +1,67 @@
/*
Copyright (c) 2016-2022 Roger Light <roger@atchoo.org>
Copyright (c) 2022 Cedalo GmbH
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "utlist.h"
static int plugin__handle_subscribe_single(struct mosquitto__security_options *opts, struct mosquitto *context, const char *topic, uint8_t qos, uint8_t subscription_options, uint32_t subscription_identifier, const mosquitto_property *properties)
{
struct mosquitto_evt_subscribe event_data;
struct mosquitto__callback *cb_base;
int rc = MOSQ_ERR_SUCCESS;
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = topic;
event_data.qos = qos;
event_data.subscription_options = subscription_options;
event_data.subscription_identifier = subscription_identifier;
event_data.properties = properties;
DL_FOREACH(opts->plugin_callbacks.subscribe, cb_base){
rc = cb_base->cb(MOSQ_EVT_SUBSCRIBE, &event_data, cb_base->userdata);
if(rc != MOSQ_ERR_SUCCESS){
break;
}
}
return rc;
}
int plugin__handle_subscribe(struct mosquitto *context, const char *topic, uint8_t qos, uint8_t subscription_options, uint32_t subscription_identifier, const mosquitto_property *properties)
{
int rc = MOSQ_ERR_SUCCESS;
/* Global plugins */
rc = plugin__handle_subscribe_single(&db.config->security_options,
context, topic, qos, subscription_options, subscription_identifier, properties);
if(rc) return rc;
if(db.config->per_listener_settings && context->listener){
rc = plugin__handle_subscribe_single(&context->listener->security_options,
context, topic, qos, subscription_options, subscription_identifier, properties);
}
return rc;
}

@ -0,0 +1,64 @@
/*
Copyright (c) 2016-2022 Roger Light <roger@atchoo.org>
Copyright (c) 2022 Cedalo GmbH
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "utlist.h"
static int plugin__handle_unsubscribe_single(struct mosquitto__security_options *opts, struct mosquitto *context, const char *topic, const mosquitto_property *properties)
{
struct mosquitto_evt_unsubscribe event_data;
struct mosquitto__callback *cb_base;
int rc = MOSQ_ERR_SUCCESS;
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = topic;
event_data.properties = properties;
DL_FOREACH(opts->plugin_callbacks.unsubscribe, cb_base){
rc = cb_base->cb(MOSQ_EVT_UNSUBSCRIBE, &event_data, cb_base->userdata);
if(rc != MOSQ_ERR_SUCCESS){
break;
}
}
return rc;
}
int plugin__handle_unsubscribe(struct mosquitto *context, const char *topic, const mosquitto_property *properties)
{
int rc = MOSQ_ERR_SUCCESS;
/* Global plugins */
rc = plugin__handle_unsubscribe_single(&db.config->security_options,
context, topic, properties);
if(rc) return rc;
if(db.config->per_listener_settings && context->listener){
rc = plugin__handle_unsubscribe_single(&context->listener->security_options,
context, topic, properties);
}
return rc;
}
Loading…
Cancel
Save