diff --git a/ChangeLog.txt b/ChangeLog.txt index de5cedd3..5f732cc2 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -20,6 +20,7 @@ Broker: SUBSCRIBE, and UNSUBSCRIBE packets. - Add `mosquitto_kick_client_by_clientid()` and `mosquitto_kick_client_by_username()` functions, which can be used by plugins to disconnect clients. +- Add support for handling $CONTROL/ topics in plugins. Client library: - Client no longer generates random client ids for v3.1.1 clients, these are diff --git a/config.mk b/config.mk index 49145ba7..e998ea2f 100644 --- a/config.mk +++ b/config.mk @@ -108,6 +108,9 @@ WITH_UNIX_SOCKETS:=yes # Build mosquitto_sub with cJSON support WITH_CJSON:=yes +# Build mosquitto with support for the $CONTROL topics. +WITH_CONTROL:=yes + # ============================================================================= # End of user configuration # ============================================================================= @@ -281,6 +284,10 @@ ifeq ($(WITH_ADNS),yes) BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_ADNS endif +ifeq ($(WITH_CONTROL),yes) + BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_CONTROL +endif + MAKE_ALL:=mosquitto ifeq ($(WITH_DOCS),yes) MAKE_ALL:=$(MAKE_ALL) docs diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 6b2d27ae..2d770046 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -102,6 +102,7 @@ enum mosq_err_t { MOSQ_ERR_RETAIN_NOT_SUPPORTED = 28, MOSQ_ERR_TOPIC_ALIAS_INVALID = 29, MOSQ_ERR_ADMINISTRATIVE_ACTION = 30, + MOSQ_ERR_ALREADY_EXISTS = 31, }; /* Option values */ diff --git a/src/control.c b/src/control.c index 069ad653..13fbbf37 100644 --- a/src/control.c +++ b/src/control.c @@ -20,10 +20,105 @@ Contributors: #include "mqtt_protocol.h" #include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "send_mosq.h" +#ifdef WITH_CONTROL /* Process messages coming in on $CONTROL/. These messages aren't * passed on to other clients. */ int control__process(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_msg_store *stored) { + struct mosquitto__control_callback *control_callback; + int rc = MOSQ_ERR_SUCCESS; + + HASH_FIND(hh, db->control_callbacks, stored->topic, strlen(stored->topic), control_callback); + if(control_callback){ + rc = control_callback->function(control_callback->data, context, stored->topic, stored->payloadlen, UHPA_ACCESS(stored->payload, stored->payloadlen)); + } + + if(stored->qos == 1){ + if(send__puback(context, stored->source_mid, 0, NULL)) rc = 1; + }else if(stored->qos == 2){ + if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1; + } + + return rc; +} +#endif + +int mosquitto_control_topic_register(const char *topic, MOSQ_FUNC_control_callback callback, void *data) +{ +#ifdef WITH_CONTROL + struct mosquitto_db *db = mosquitto__get_db(); + struct mosquitto__control_callback *control_callback; + + if(topic == NULL || callback == NULL){ + return MOSQ_ERR_INVAL; + } + if(strncmp(topic, "$CONTROL/", strlen("$CONTROL/")) || strlen(topic) < strlen("$CONTROL/A/v1")){ + return MOSQ_ERR_INVAL; + } + + HASH_FIND(hh, db->control_callbacks, topic, strlen(topic), control_callback); + if(control_callback){ + return MOSQ_ERR_ALREADY_EXISTS; + } + + control_callback = mosquitto__calloc(1, sizeof(struct mosquitto__control_callback)); + if(control_callback == NULL){ + return MOSQ_ERR_NOMEM; + } + + control_callback->topic = mosquitto__strdup(topic); + if(control_callback->topic == NULL){ + mosquitto__free(control_callback); + return MOSQ_ERR_NOMEM; + } + control_callback->function = callback; + control_callback->data = data; + + HASH_ADD_KEYPTR(hh, db->control_callbacks, control_callback->topic, strlen(control_callback->topic), control_callback); + return MOSQ_ERR_SUCCESS; +#else + return MOSQ_ERR_NOT_SUPPORTED; +#endif +} + + +int mosquitto_control_topic_unregister(const char *topic) +{ +#ifdef WITH_CONTROL + struct mosquitto_db *db = mosquitto__get_db(); + struct mosquitto__control_callback *control_callback; + + if(topic == NULL || strncmp(topic, "$CONTROL/", strlen("$CONTROL/"))){ + return MOSQ_ERR_INVAL; + } + + HASH_FIND(hh, db->control_callbacks, topic, strlen(topic), control_callback); + if(control_callback){ + HASH_DELETE(hh, db->control_callbacks, control_callback); + mosquitto__free(control_callback->topic); + mosquitto__free(control_callback); + } + + return MOSQ_ERR_SUCCESS; +#else + return MOSQ_ERR_NOT_SUPPORTED; +#endif +} + + +#ifdef WITH_CONTROL +void control__cleanup(struct mosquitto_db *db) +{ + struct mosquitto__control_callback *control_callback, *cc_tmp; + + HASH_ITER(hh, db->control_callbacks, control_callback, cc_tmp){ + HASH_DELETE(hh, db->control_callbacks, control_callback); + mosquitto__free(control_callback->topic); + mosquitto__free(control_callback); + } } +#endif diff --git a/src/handle_publish.c b/src/handle_publish.c index 96de85e7..d7873efc 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -250,9 +250,14 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen); if(!strncmp(msg->topic, "$CONTROL/", 9)){ +#ifdef WITH_CONTROL rc = control__process(db, context, msg); db__msg_store_free(msg); return rc; +#else + db__msg_store_free(msg); + return MOSQ_ERR_SUCCESS; +#endif } if(msg->qos > 0){ diff --git a/src/linker-macosx.syms b/src/linker-macosx.syms index 269cc699..259fe835 100644 --- a/src/linker-macosx.syms +++ b/src/linker-macosx.syms @@ -9,6 +9,8 @@ _mosquitto_client_protocol _mosquitto_client_protocol_version _mosquitto_client_sub_count _mosquitto_client_username +_mosquitto_control_topic_register +_mosquitto_control_topic_unregister _mosquitto_kick_client_by_clientid _mosquitto_kick_client_by_username _mosquitto_log_printf diff --git a/src/linker.syms b/src/linker.syms index 9e7e6647..7febda30 100644 --- a/src/linker.syms +++ b/src/linker.syms @@ -10,6 +10,8 @@ mosquitto_client_protocol_version; mosquitto_client_sub_count; mosquitto_client_username; + mosquitto_control_topic_register; + mosquitto_control_topic_unregister; mosquitto_kick_client_by_clientid; mosquitto_kick_client_by_username; mosquitto_log_printf; diff --git a/src/mosquitto.c b/src/mosquitto.c index 285ce3f1..3c0a5ad5 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -468,6 +468,9 @@ int main(int argc, char *argv[]) listeners__stop(&int_db, listensock, listensock_count); mosquitto_security_module_cleanup(&int_db); +#ifdef WITH_CONTROL + control__cleanup(&int_db); +#endif if(config.pid_file){ remove(config.pid_file); diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 36e40b63..262892ec 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -179,6 +179,35 @@ const char *mosquitto_client_username(const struct mosquitto *client); int mosquitto_set_username(struct mosquitto *client, const char *username); +/* ========================================================================= + * + * Feature control + * + * ========================================================================= */ + +typedef int (*MOSQ_FUNC_control_callback)(void *, struct mosquitto *, const char *, int, const void *); +/* + * Function: mosquitto_control_topic_register + * + * Register a callback function to handle processing of a topic in the $CONTROL + * topic hierarchy, in the form $CONTROL//, e.g. + * $CONTROL/user-management/v1 + * + * Messages sent to a $CONTROL topic are not passed on to clients. + * + * This allows plugins to provide an API to control behaviour, e.g. implement + * adding/removing users in a security plugin. + */ +int mosquitto_control_topic_register(const char *topic, MOSQ_FUNC_control_callback callback, void *data); + +/* + * Function: mosquitto_control_topic_unregister + * + * Unregister a callback function previously registered with mosquitto_control_topic_register(). + */ +int mosquitto_control_topic_unregister(const char *topic); + + /* ========================================================================= * * Client control diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index e3d7892f..75bc7fe2 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -454,6 +454,13 @@ struct mosquitto_message_v5{ }; +struct mosquitto__control_callback{ + UT_hash_handle hh; + char *topic; + MOSQ_FUNC_control_callback function; + void *data; +}; + struct mosquitto_db{ dbid_t last_db_id; struct mosquitto__subhier *subs; @@ -486,6 +493,7 @@ struct mosquitto_db{ #ifdef WITH_EPOLL int epollfd; #endif + struct mosquitto__control_callback *control_callbacks; struct mosquitto_message_v5 *plugin_msgs; }; @@ -704,7 +712,10 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v /* ============================================================ * Control functions * ============================================================ */ +#ifdef WITH_CONTROL int control__process(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_msg_store *stored); +void control__cleanup(struct mosquitto_db *db); +#endif /* ============================================================ diff --git a/test/broker/14-plugin-register-control.py b/test/broker/14-plugin-register-control.py new file mode 100755 index 00000000..42abd895 --- /dev/null +++ b/test/broker/14-plugin-register-control.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("port %d\n" % (port)) + f.write("auth_plugin c/plugin_control.so\n") + f.write("allow_anonymous true\n") + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port) + +rc = 1 +keepalive = 10 +connect_packet = mosq_test.gen_connect("ctrl-test", keepalive=keepalive) +connack_packet = mosq_test.gen_connack(rc=0) + +mid = 2 +subscribe_packet = mosq_test.gen_subscribe(mid, "$CONTROL/user-management/v1", 1) +suback_packet = mosq_test.gen_suback(mid, 1) + +mid = 3 +publish_packet = mosq_test.gen_publish(topic="$CONTROL/user-management/v1", qos=1, payload="payload contents", retain=1, mid=mid) +puback_packet = mosq_test.gen_puback(mid) + +mid = 1 +publish_packet_recv = mosq_test.gen_publish(topic="$CONTROL/user-management/v1", qos=0, payload="payload contents", retain=0) + + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + sock.send(publish_packet) + mosq_test.receive_unordered(sock, puback_packet, publish_packet_recv, "puback/publish_receive") + + rc = 0 + + sock.close() +except mosq_test.TestError: + pass +finally: + os.remove(conf_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index 6731bb7d..55de882e 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -17,7 +17,7 @@ test-compile : ptest : test-compile ./test.py -test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 +test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14 01 : ./01-connect-anon-denied.py @@ -222,3 +222,6 @@ endif ./13-malformed-publish-v5.py ./13-malformed-subscribe-v5.py ./13-malformed-unsubscribe-v5.py + +14 : + ./14-plugin-register-control.py diff --git a/test/broker/c/Makefile b/test/broker/c/Makefile index ffcde831..dbd73d12 100644 --- a/test/broker/c/Makefile +++ b/test/broker/c/Makefile @@ -13,7 +13,8 @@ PLUGIN_SRC = \ auth_plugin_extended_multiple.c \ auth_plugin_extended_single.c \ auth_plugin_extended_single2.c \ - auth_plugin_publish.c + auth_plugin_publish.c \ + plugin_control.c PLUGINS = ${PLUGIN_SRC:.c=.so} diff --git a/test/broker/c/plugin_control.c b/test/broker/c/plugin_control.c new file mode 100644 index 00000000..6d533b9e --- /dev/null +++ b/test/broker/c/plugin_control.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include +#include +#include + +int control_callback(void *data, struct mosquitto *context, const char *topic, int payloadlen, const void *payload) +{ + mosquitto_broker_publish_copy(NULL, topic, payloadlen, payload, 0, 0, NULL); + + return 0; +} + + +int mosquitto_auth_plugin_version(void) +{ + return MOSQ_AUTH_PLUGIN_VERSION; +} + +int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *auth_opts, int auth_opt_count) +{ + int i; + char buf[100]; + + for(i=0; i<100; i++){ + snprintf(buf, sizeof(buf), "$CONTROL/user-management/v%d", i); + mosquitto_control_topic_register("$CONTROL/user-management/v1", control_callback, NULL); + } + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count) +{ + int i; + char buf[100]; + + for(i=0; i<100; i++){ + snprintf(buf, sizeof(buf), "$CONTROL/user-management/v%d", i); + mosquitto_control_topic_unregister("$CONTROL/user-management/v1"); + } + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_security_init(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_security_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_acl_check(void *user_data, int access, struct mosquitto *client, const struct mosquitto_acl_msg *msg) +{ + return MOSQ_ERR_SUCCESS; +} diff --git a/test/broker/test.py b/test/broker/test.py index 74232ae7..22b8e6ce 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -186,6 +186,8 @@ tests = [ (1, './13-malformed-publish-v5.py'), (1, './13-malformed-subscribe-v5.py'), (1, './13-malformed-unsubscribe-v5.py'), + + (1, './14-plugin-register-control.py'), ] ptest.run_tests(tests)