Add support for $CONTROL/ topics in plugins.

pull/1639/head
Roger A. Light 5 years ago
parent cd33670f6a
commit 5daa5ee162

@ -20,6 +20,7 @@ Broker:
SUBSCRIBE, and UNSUBSCRIBE packets.
- Add `mosquitto_kick_client_by_clientid()` and `mosquitto_kick_client_by_username()`
functions, which can be used by plugins to disconnect clients.
- Add support for handling $CONTROL/ topics in plugins.
Client library:
- Client no longer generates random client ids for v3.1.1 clients, these are

@ -108,6 +108,9 @@ WITH_UNIX_SOCKETS:=yes
# Build mosquitto_sub with cJSON support
WITH_CJSON:=yes
# Build mosquitto with support for the $CONTROL topics.
WITH_CONTROL:=yes
# =============================================================================
# End of user configuration
# =============================================================================
@ -281,6 +284,10 @@ ifeq ($(WITH_ADNS),yes)
BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_ADNS
endif
ifeq ($(WITH_CONTROL),yes)
BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_CONTROL
endif
MAKE_ALL:=mosquitto
ifeq ($(WITH_DOCS),yes)
MAKE_ALL:=$(MAKE_ALL) docs

@ -102,6 +102,7 @@ enum mosq_err_t {
MOSQ_ERR_RETAIN_NOT_SUPPORTED = 28,
MOSQ_ERR_TOPIC_ALIAS_INVALID = 29,
MOSQ_ERR_ADMINISTRATIVE_ACTION = 30,
MOSQ_ERR_ALREADY_EXISTS = 31,
};
/* Option values */

@ -20,10 +20,105 @@ Contributors:
#include "mqtt_protocol.h"
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "send_mosq.h"
#ifdef WITH_CONTROL
/* Process messages coming in on $CONTROL/<feature>. These messages aren't
* passed on to other clients. */
int control__process(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_msg_store *stored)
{
struct mosquitto__control_callback *control_callback;
int rc = MOSQ_ERR_SUCCESS;
HASH_FIND(hh, db->control_callbacks, stored->topic, strlen(stored->topic), control_callback);
if(control_callback){
rc = control_callback->function(control_callback->data, context, stored->topic, stored->payloadlen, UHPA_ACCESS(stored->payload, stored->payloadlen));
}
if(stored->qos == 1){
if(send__puback(context, stored->source_mid, 0, NULL)) rc = 1;
}else if(stored->qos == 2){
if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1;
}
return rc;
}
#endif
int mosquitto_control_topic_register(const char *topic, MOSQ_FUNC_control_callback callback, void *data)
{
#ifdef WITH_CONTROL
struct mosquitto_db *db = mosquitto__get_db();
struct mosquitto__control_callback *control_callback;
if(topic == NULL || callback == NULL){
return MOSQ_ERR_INVAL;
}
if(strncmp(topic, "$CONTROL/", strlen("$CONTROL/")) || strlen(topic) < strlen("$CONTROL/A/v1")){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh, db->control_callbacks, topic, strlen(topic), control_callback);
if(control_callback){
return MOSQ_ERR_ALREADY_EXISTS;
}
control_callback = mosquitto__calloc(1, sizeof(struct mosquitto__control_callback));
if(control_callback == NULL){
return MOSQ_ERR_NOMEM;
}
control_callback->topic = mosquitto__strdup(topic);
if(control_callback->topic == NULL){
mosquitto__free(control_callback);
return MOSQ_ERR_NOMEM;
}
control_callback->function = callback;
control_callback->data = data;
HASH_ADD_KEYPTR(hh, db->control_callbacks, control_callback->topic, strlen(control_callback->topic), control_callback);
return MOSQ_ERR_SUCCESS;
#else
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}
int mosquitto_control_topic_unregister(const char *topic)
{
#ifdef WITH_CONTROL
struct mosquitto_db *db = mosquitto__get_db();
struct mosquitto__control_callback *control_callback;
if(topic == NULL || strncmp(topic, "$CONTROL/", strlen("$CONTROL/"))){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh, db->control_callbacks, topic, strlen(topic), control_callback);
if(control_callback){
HASH_DELETE(hh, db->control_callbacks, control_callback);
mosquitto__free(control_callback->topic);
mosquitto__free(control_callback);
}
return MOSQ_ERR_SUCCESS;
#else
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}
#ifdef WITH_CONTROL
void control__cleanup(struct mosquitto_db *db)
{
struct mosquitto__control_callback *control_callback, *cc_tmp;
HASH_ITER(hh, db->control_callbacks, control_callback, cc_tmp){
HASH_DELETE(hh, db->control_callbacks, control_callback);
mosquitto__free(control_callback->topic);
mosquitto__free(control_callback);
}
}
#endif

@ -250,9 +250,14 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
if(!strncmp(msg->topic, "$CONTROL/", 9)){
#ifdef WITH_CONTROL
rc = control__process(db, context, msg);
db__msg_store_free(msg);
return rc;
#else
db__msg_store_free(msg);
return MOSQ_ERR_SUCCESS;
#endif
}
if(msg->qos > 0){

@ -9,6 +9,8 @@ _mosquitto_client_protocol
_mosquitto_client_protocol_version
_mosquitto_client_sub_count
_mosquitto_client_username
_mosquitto_control_topic_register
_mosquitto_control_topic_unregister
_mosquitto_kick_client_by_clientid
_mosquitto_kick_client_by_username
_mosquitto_log_printf

@ -10,6 +10,8 @@
mosquitto_client_protocol_version;
mosquitto_client_sub_count;
mosquitto_client_username;
mosquitto_control_topic_register;
mosquitto_control_topic_unregister;
mosquitto_kick_client_by_clientid;
mosquitto_kick_client_by_username;
mosquitto_log_printf;

@ -468,6 +468,9 @@ int main(int argc, char *argv[])
listeners__stop(&int_db, listensock, listensock_count);
mosquitto_security_module_cleanup(&int_db);
#ifdef WITH_CONTROL
control__cleanup(&int_db);
#endif
if(config.pid_file){
remove(config.pid_file);

@ -179,6 +179,35 @@ const char *mosquitto_client_username(const struct mosquitto *client);
int mosquitto_set_username(struct mosquitto *client, const char *username);
/* =========================================================================
*
* Feature control
*
* ========================================================================= */
typedef int (*MOSQ_FUNC_control_callback)(void *, struct mosquitto *, const char *, int, const void *);
/*
* Function: mosquitto_control_topic_register
*
* Register a callback function to handle processing of a topic in the $CONTROL
* topic hierarchy, in the form $CONTROL/<feature>/<version>, e.g.
* $CONTROL/user-management/v1
*
* Messages sent to a $CONTROL topic are not passed on to clients.
*
* This allows plugins to provide an API to control behaviour, e.g. implement
* adding/removing users in a security plugin.
*/
int mosquitto_control_topic_register(const char *topic, MOSQ_FUNC_control_callback callback, void *data);
/*
* Function: mosquitto_control_topic_unregister
*
* Unregister a callback function previously registered with mosquitto_control_topic_register().
*/
int mosquitto_control_topic_unregister(const char *topic);
/* =========================================================================
*
* Client control

@ -454,6 +454,13 @@ struct mosquitto_message_v5{
};
struct mosquitto__control_callback{
UT_hash_handle hh;
char *topic;
MOSQ_FUNC_control_callback function;
void *data;
};
struct mosquitto_db{
dbid_t last_db_id;
struct mosquitto__subhier *subs;
@ -486,6 +493,7 @@ struct mosquitto_db{
#ifdef WITH_EPOLL
int epollfd;
#endif
struct mosquitto__control_callback *control_callbacks;
struct mosquitto_message_v5 *plugin_msgs;
};
@ -704,7 +712,10 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
/* ============================================================
* Control functions
* ============================================================ */
#ifdef WITH_CONTROL
int control__process(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_msg_store *stored);
void control__cleanup(struct mosquitto_db *db);
#endif
/* ============================================================

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from mosq_test_helper import *
def write_config(filename, port):
with open(filename, 'w') as f:
f.write("port %d\n" % (port))
f.write("auth_plugin c/plugin_control.so\n")
f.write("allow_anonymous true\n")
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port)
rc = 1
keepalive = 10
connect_packet = mosq_test.gen_connect("ctrl-test", keepalive=keepalive)
connack_packet = mosq_test.gen_connack(rc=0)
mid = 2
subscribe_packet = mosq_test.gen_subscribe(mid, "$CONTROL/user-management/v1", 1)
suback_packet = mosq_test.gen_suback(mid, 1)
mid = 3
publish_packet = mosq_test.gen_publish(topic="$CONTROL/user-management/v1", qos=1, payload="payload contents", retain=1, mid=mid)
puback_packet = mosq_test.gen_puback(mid)
mid = 1
publish_packet_recv = mosq_test.gen_publish(topic="$CONTROL/user-management/v1", qos=0, payload="payload contents", retain=0)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
sock.send(publish_packet)
mosq_test.receive_unordered(sock, puback_packet, publish_packet_recv, "puback/publish_receive")
rc = 0
sock.close()
except mosq_test.TestError:
pass
finally:
os.remove(conf_file)
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -17,7 +17,7 @@ test-compile :
ptest : test-compile
./test.py
test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13
test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14
01 :
./01-connect-anon-denied.py
@ -222,3 +222,6 @@ endif
./13-malformed-publish-v5.py
./13-malformed-subscribe-v5.py
./13-malformed-unsubscribe-v5.py
14 :
./14-plugin-register-control.py

@ -13,7 +13,8 @@ PLUGIN_SRC = \
auth_plugin_extended_multiple.c \
auth_plugin_extended_single.c \
auth_plugin_extended_single2.c \
auth_plugin_publish.c
auth_plugin_publish.c \
plugin_control.c
PLUGINS = ${PLUGIN_SRC:.c=.so}

@ -0,0 +1,58 @@
#include <stdio.h>
#include <string.h>
#include <mqtt_protocol.h>
#include <mosquitto.h>
#include <mosquitto_broker.h>
#include <mosquitto_plugin.h>
int control_callback(void *data, struct mosquitto *context, const char *topic, int payloadlen, const void *payload)
{
mosquitto_broker_publish_copy(NULL, topic, payloadlen, payload, 0, 0, NULL);
return 0;
}
int mosquitto_auth_plugin_version(void)
{
return MOSQ_AUTH_PLUGIN_VERSION;
}
int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *auth_opts, int auth_opt_count)
{
int i;
char buf[100];
for(i=0; i<100; i++){
snprintf(buf, sizeof(buf), "$CONTROL/user-management/v%d", i);
mosquitto_control_topic_register("$CONTROL/user-management/v1", control_callback, NULL);
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count)
{
int i;
char buf[100];
for(i=0; i<100; i++){
snprintf(buf, sizeof(buf), "$CONTROL/user-management/v%d", i);
mosquitto_control_topic_unregister("$CONTROL/user-management/v1");
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_security_init(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_security_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_acl_check(void *user_data, int access, struct mosquitto *client, const struct mosquitto_acl_msg *msg)
{
return MOSQ_ERR_SUCCESS;
}

@ -186,6 +186,8 @@ tests = [
(1, './13-malformed-publish-v5.py'),
(1, './13-malformed-subscribe-v5.py'),
(1, './13-malformed-unsubscribe-v5.py'),
(1, './14-plugin-register-control.py'),
]
ptest.run_tests(tests)

Loading…
Cancel
Save