diff --git a/ChangeLog.txt b/ChangeLog.txt index e747689a..79e23ab6 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -91,6 +91,8 @@ Plugins / plugin interface: - Add MOSQ_EVT_CONNECT, to allow plugins to know when a client has successfully authenticated to the broker. - Add connection-state example plugin to demonstrate MOSQ_EVT_CONNECT. +- Add MOSQ_EVT_CLIENT_OFFLINE, to allow plugins to know when a client with a + non-zero session expiry interval has gone offline. - Plugins on non-Windows platforms now no longer make their symbols globally available, which means they are self contained. - Add support for delayed basic authentication in plugins. diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index dceaca1f..302111fd 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -156,6 +156,7 @@ enum mosquitto_plugin_event { MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 25, MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 26, MOSQ_EVT_MESSAGE_OUT = 27, + MOSQ_EVT_CLIENT_OFFLINE = 28, }; /* Data for the MOSQ_EVT_RELOAD event */ @@ -270,6 +271,14 @@ struct mosquitto_evt_disconnect { void *future2[4]; }; +/* Data for the MOSQ_EVT_CLIENT_OFFLINE event */ +struct mosquitto_evt_client_offline { + void *future; + struct mosquitto *client; + int reason; + void *future2[4]; +}; + /* Data for the MOSQ_EVT_SUBSCRIBE event */ struct mosquitto_evt_subscribe { void *future; diff --git a/lib/send_disconnect.c b/lib/send_disconnect.c index f41d415d..6f291316 100644 --- a/lib/send_disconnect.c +++ b/lib/send_disconnect.c @@ -78,9 +78,6 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt } } -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - metrics__int_inc(mosq_counter_mqtt_disconnect_sent, 1); -#endif return packet__queue(mosq, packet); } diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 6bb6bf2d..9eeac172 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -59,9 +59,6 @@ int send__pingresp(struct mosquitto *mosq) { #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGRESP to %s", SAFE_PRINT(mosq->id)); -# ifdef WITH_SYS_TREE - metrics__int_inc(mosq_counter_mqtt_pingresp_sent, 1); -# endif #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGRESP", SAFE_PRINT(mosq->id)); #endif @@ -72,9 +69,6 @@ int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, cons { #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code); -# ifdef WITH_SYS_TREE - metrics__int_inc(mosq_counter_mqtt_puback_sent, 1); -# endif #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code); #endif @@ -87,9 +81,6 @@ int send__pubcomp(struct mosquitto *mosq, uint16_t mid, const mosquitto_property { #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBCOMP to %s (m%d)", SAFE_PRINT(mosq->id), mid); -# ifdef WITH_SYS_TREE - metrics__int_inc(mosq_counter_mqtt_pubcomp_sent, 1); -# endif #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (m%d)", SAFE_PRINT(mosq->id), mid); #endif @@ -103,9 +94,6 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, cons { #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code); -# ifdef WITH_SYS_TREE - metrics__int_inc(mosq_counter_mqtt_pubrec_sent, 1); -# endif #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code); #endif @@ -120,9 +108,6 @@ int send__pubrel(struct mosquitto *mosq, uint16_t mid, const mosquitto_property { #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREL to %s (m%d)", SAFE_PRINT(mosq->id), mid); -# ifdef WITH_SYS_TREE - metrics__int_inc(mosq_counter_mqtt_pubrel_sent, 1); -# endif #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREL (m%d)", SAFE_PRINT(mosq->id), mid); #endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f3a8bc05..56fc7ea0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,6 +47,7 @@ set (MOSQ_SRCS plugin_callbacks.c plugin_v5.c plugin_v4.c plugin_v3.c plugin_v2.c plugin_init.c plugin_cleanup.c plugin_persist.c plugin_acl_check.c plugin_basic_auth.c plugin_connect.c plugin_disconnect.c + plugin_client_offline.c plugin_extended_auth.c plugin_message.c plugin_psk_key.c plugin_public.c plugin_subscribe.c plugin_tick.c diff --git a/src/Makefile b/src/Makefile index e81bdc86..f7936cbb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -74,6 +74,7 @@ OBJS= mosquitto.o \ plugin_acl_check.o \ plugin_basic_auth.o \ plugin_cleanup.o \ + plugin_client_offline.o \ plugin_connect.o \ plugin_disconnect.o \ plugin_extended_auth.o \ @@ -308,6 +309,9 @@ plugin_basic_auth.o : plugin_basic_auth.c ${R}/include/mosquitto_plugin.h mosqui plugin_cleanup.o : plugin_cleanup.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +plugin_client_offline.o : plugin_client_offline.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + plugin_connect.o : plugin_connect.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/context.c b/src/context.c index 5d002431..1fca44b1 100644 --- a/src/context.c +++ b/src/context.c @@ -231,7 +231,12 @@ void context__disconnect(struct mosquitto *context, int reason) if(send(context->sock, buf, 4, 0)){}; } #endif - plugin__handle_disconnect(context, reason); + + if(context->session_expiry_interval == 0){ + plugin__handle_disconnect(context, reason); + }else{ + plugin__handle_client_offline(context, reason); + } context__send_will(context); net__socket_close(context); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index acb2c05f..377cac42 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -163,6 +163,7 @@ struct plugin__callbacks{ struct mosquitto__callback *connect; struct mosquitto__callback *control; struct mosquitto__callback *disconnect; + struct mosquitto__callback *client_offline; struct mosquitto__callback *ext_auth_continue; struct mosquitto__callback *ext_auth_start; struct mosquitto__callback *message_in; @@ -848,6 +849,7 @@ int acl__pre_check(mosquitto_plugin_id_t *plugin, struct mosquitto *context, int void plugin__handle_connect(struct mosquitto *context); void plugin__handle_disconnect(struct mosquitto *context, int reason); +void plugin__handle_client_offline(struct mosquitto *context, int reason); int plugin__handle_message_in(struct mosquitto *context, struct mosquitto_base_msg *base_msg); int plugin__handle_message_out(struct mosquitto *context, struct mosquitto_base_msg *base_msg); int plugin__handle_subscribe(struct mosquitto *context, struct mosquitto_subscription *sub); diff --git a/src/plugin_callbacks.c b/src/plugin_callbacks.c index 4f3da85e..a08302d5 100644 --- a/src/plugin_callbacks.c +++ b/src/plugin_callbacks.c @@ -49,6 +49,8 @@ static const char *get_event_name(int event) return "disconnect"; case MOSQ_EVT_CONNECT: return "connect"; + case MOSQ_EVT_CLIENT_OFFLINE: + return "connect"; case MOSQ_EVT_SUBSCRIBE: return "subscribe"; case MOSQ_EVT_UNSUBSCRIBE: @@ -121,6 +123,8 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__ return &security_options->plugin_callbacks.disconnect; case MOSQ_EVT_CONNECT: return &security_options->plugin_callbacks.connect; + case MOSQ_EVT_CLIENT_OFFLINE: + return &security_options->plugin_callbacks.client_offline; case MOSQ_EVT_SUBSCRIBE: return &security_options->plugin_callbacks.subscribe; case MOSQ_EVT_UNSUBSCRIBE: diff --git a/src/plugin_client_offline.c b/src/plugin_client_offline.c new file mode 100644 index 00000000..6a87527f --- /dev/null +++ b/src/plugin_client_offline.c @@ -0,0 +1,51 @@ +/* +Copyright (c) 2023 Roger Light +Copyright (c) 2023 Cedalo Gmbh + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include "mosquitto_broker_internal.h" +#include "utlist.h" + + +static void plugin__handle_client_offline_single(struct mosquitto__security_options *opts, struct mosquitto *context, int reason) +{ + struct mosquitto_evt_client_offline event_data; + struct mosquitto__callback *cb_base, *cb_next; + + if(context->id == NULL) return; + + memset(&event_data, 0, sizeof(event_data)); + event_data.client = context; + event_data.reason = reason; + DL_FOREACH_SAFE(opts->plugin_callbacks.client_offline, cb_base, cb_next){ + cb_base->cb(MOSQ_EVT_CLIENT_OFFLINE, &event_data, cb_base->userdata); + } +} + + +void plugin__handle_client_offline(struct mosquitto *context, int reason) +{ + /* Global plugins */ + plugin__handle_client_offline_single(&db.config->security_options, context, reason); + + /* Per listener plugins */ + if(db.config->per_listener_settings && context->listener){ + plugin__handle_client_offline_single(context->listener->security_options, context, reason); + } +} diff --git a/src/read_handle.c b/src/read_handle.c index 5485fca8..82280c57 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -40,18 +40,15 @@ int handle__packet(struct mosquitto *context) switch((context->in_packet.command)&0xF0){ case CMD_PINGREQ: - metrics__int_inc(mosq_counter_mqtt_pingreq_received, 1); rc = handle__pingreq(context); break; case CMD_PINGRESP: rc = handle__pingresp(context); break; case CMD_PUBACK: - metrics__int_inc(mosq_counter_mqtt_puback_received, 1); rc = handle__pubackcomp(context, "PUBACK"); break; case CMD_PUBCOMP: - metrics__int_inc(mosq_counter_mqtt_pubcomp_received, 1); rc = handle__pubackcomp(context, "PUBCOMP"); break; case CMD_PUBLISH: @@ -59,26 +56,21 @@ int handle__packet(struct mosquitto *context) rc = handle__publish(context); break; case CMD_PUBREC: - metrics__int_inc(mosq_counter_mqtt_pubrec_received, 1); rc = handle__pubrec(context); break; case CMD_PUBREL: - metrics__int_inc(mosq_counter_mqtt_pubrel_received, 1); rc = handle__pubrel(context); break; case CMD_CONNECT: metrics__int_inc(mosq_counter_mqtt_connect_received, 1); return handle__connect(context); case CMD_DISCONNECT: - metrics__int_inc(mosq_counter_mqtt_disconnect_received, 1); rc = handle__disconnect(context); break; case CMD_SUBSCRIBE: - metrics__int_inc(mosq_counter_mqtt_subscribe_received, 1); rc = handle__subscribe(context); break; case CMD_UNSUBSCRIBE: - metrics__int_inc(mosq_counter_mqtt_unsubscribe_received, 1); rc = handle__unsubscribe(context); break; #ifdef WITH_BRIDGE @@ -93,7 +85,6 @@ int handle__packet(struct mosquitto *context) break; #endif case CMD_AUTH: - metrics__int_inc(mosq_counter_mqtt_auth_received, 1); rc = handle__auth(context); break; default: diff --git a/src/send_auth.c b/src/send_auth.c index 6443c742..bec5814a 100644 --- a/src/send_auth.c +++ b/src/send_auth.c @@ -60,8 +60,6 @@ int send__auth(struct mosquitto *context, uint8_t reason_code, const void *auth_ property__write_all(packet, properties, true); mosquitto_property_free_all(&properties); - metrics__int_inc(mosq_counter_mqtt_auth_sent, 1); - return packet__queue(context, packet); error: mosquitto_property_free_all(&properties); diff --git a/src/send_connack.c b/src/send_connack.c index 367ed7e5..d3a48a16 100644 --- a/src/send_connack.c +++ b/src/send_connack.c @@ -96,8 +96,6 @@ int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, c } mosquitto_property_free_all(&connack_props); - metrics__int_inc(mosq_counter_mqtt_connack_sent, 1); - return packet__queue(context, packet); } diff --git a/src/send_suback.c b/src/send_suback.c index 8b8d1163..213a901d 100644 --- a/src/send_suback.c +++ b/src/send_suback.c @@ -55,7 +55,5 @@ int send__suback(struct mosquitto *context, uint16_t mid, uint32_t payloadlen, c packet__write_bytes(packet, payload, payloadlen); } - metrics__int_inc(mosq_counter_mqtt_suback_sent, 1); - return packet__queue(context, packet); } diff --git a/src/send_unsuback.c b/src/send_unsuback.c index 8cfd6257..c8365eca 100644 --- a/src/send_unsuback.c +++ b/src/send_unsuback.c @@ -54,7 +54,5 @@ int send__unsuback(struct mosquitto *mosq, uint16_t mid, int reason_code_count, packet__write_bytes(packet, reason_codes, (uint32_t)reason_code_count); } - metrics__int_inc(mosq_counter_mqtt_unsuback_sent, 1); - return packet__queue(mosq, packet); } diff --git a/test/broker/09-plugin-evt-client-offline.py b/test/broker/09-plugin-evt-client-offline.py new file mode 100755 index 00000000..cccbbc4f --- /dev/null +++ b/test/broker/09-plugin-evt-client-offline.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("listener %d\n" % (port)) + f.write("plugin c/plugin_evt_client_offline.so\n") + f.write("allow_anonymous true\n") + + +def do_test(): + rc = 1 + connect_packet = mosq_test.gen_connect("plugin-evt-subscribe", proto_ver=4, clean_session=False) + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=4) + + publish_packet = mosq_test.gen_publish("evt/client/offline", qos=0, payload="plugin-evt-subscribe") + + port = mosq_test.get_port() + conf_file = os.path.basename(__file__).replace('.py', '.conf') + write_config(conf_file, port) + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, use_conf=True) + + try: + sub_sock = mosq_test.sub_helper(port, '#') + + sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) + sock.close() + + mosq_test.expect_packet(sub_sock, "publish", publish_packet) + rc = 0 + + sub_sock.close() + except mosq_test.TestError: + pass + finally: + os.remove(conf_file) + broker.terminate() + if mosq_test.wait_for_subprocess(broker): + print("broker not terminated") + if rc == 0: rc=1 + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + exit(rc) + + +do_test() diff --git a/test/broker/Makefile b/test/broker/Makefile index faaadccf..a4b258a4 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -206,6 +206,7 @@ endif ./09-plugin-bad.py ./09-plugin-change-id.py ./09-plugin-delayed-auth.py + ./09-plugin-evt-client-offline.py ./09-plugin-evt-message-in.py ./09-plugin-evt-message-out.py ./09-plugin-evt-psk-key.py diff --git a/test/broker/c/Makefile b/test/broker/c/Makefile index 28a30feb..9afe317e 100644 --- a/test/broker/c/Makefile +++ b/test/broker/c/Makefile @@ -45,6 +45,7 @@ PLUGIN_SRC = \ bad_v5_1.c \ bad_v6.c \ plugin_control.c \ + plugin_evt_client_offline.c \ plugin_evt_message_in.c \ plugin_evt_message_out.c \ plugin_evt_psk_key.c \ diff --git a/test/broker/c/plugin_evt_client_offline.c b/test/broker/c/plugin_evt_client_offline.c new file mode 100644 index 00000000..2a6f2b4c --- /dev/null +++ b/test/broker/c/plugin_evt_client_offline.c @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include +#include + +MOSQUITTO_PLUGIN_DECLARE_VERSION(5); + +static mosquitto_plugin_id_t *plg_id; + +int callback_client_offline(int event, void *event_data, void *user_data) +{ + struct mosquitto_evt_client_offline *ed = event_data; + const char *clientid; + + (void)user_data; + + if(event != MOSQ_EVT_CLIENT_OFFLINE){ + abort(); + } + clientid = mosquitto_client_id(ed->client); + mosquitto_broker_publish_copy(NULL, "evt/client/offline", strlen(clientid), clientid, 0, false, NULL); + + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, struct mosquitto_opt *opts, int opt_count) +{ + (void)user_data; + (void)opts; + (void)opt_count; + + plg_id = identifier; + + mosquitto_callback_register(plg_id, MOSQ_EVT_CLIENT_OFFLINE, callback_client_offline, NULL, NULL); + + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count) +{ + (void)user_data; + (void)opts; + (void)opt_count; + + mosquitto_callback_unregister(plg_id, MOSQ_EVT_CLIENT_OFFLINE, callback_client_offline, NULL); + + return MOSQ_ERR_SUCCESS; +} diff --git a/test/broker/test.py b/test/broker/test.py index 1e30ba5c..9d23a283 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -173,6 +173,7 @@ tests = [ (1, './09-plugin-auth-v5-unpwd-success.py'), (1, './09-plugin-bad.py'), (1, './09-plugin-change-id.py'), + (1, './09-plugin-evt-client-offline.py'), (1, './09-plugin-evt-message-in.py'), (1, './09-plugin-evt-message-out.py'), (1, './09-plugin-evt-psk-key.py'),