Add MOSQ_EVT_CLIENT_OFFLINE.

This allows plugins to know when a client with a non-zero session expiry
interval has gone offline.
pull/2756/merge
Roger Light 2 years ago
parent 086a361bcc
commit 92c1899278

@ -91,6 +91,8 @@ Plugins / plugin interface:
- Add MOSQ_EVT_CONNECT, to allow plugins to know when a client has
successfully authenticated to the broker.
- Add connection-state example plugin to demonstrate MOSQ_EVT_CONNECT.
- Add MOSQ_EVT_CLIENT_OFFLINE, to allow plugins to know when a client with a
non-zero session expiry interval has gone offline.
- Plugins on non-Windows platforms now no longer make their symbols globally
available, which means they are self contained.
- Add support for delayed basic authentication in plugins.

@ -156,6 +156,7 @@ enum mosquitto_plugin_event {
MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 25,
MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 26,
MOSQ_EVT_MESSAGE_OUT = 27,
MOSQ_EVT_CLIENT_OFFLINE = 28,
};
/* Data for the MOSQ_EVT_RELOAD event */
@ -270,6 +271,14 @@ struct mosquitto_evt_disconnect {
void *future2[4];
};
/* Data for the MOSQ_EVT_CLIENT_OFFLINE event */
struct mosquitto_evt_client_offline {
void *future;
struct mosquitto *client;
int reason;
void *future2[4];
};
/* Data for the MOSQ_EVT_SUBSCRIBE event */
struct mosquitto_evt_subscribe {
void *future;

@ -78,9 +78,6 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt
}
}
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
metrics__int_inc(mosq_counter_mqtt_disconnect_sent, 1);
#endif
return packet__queue(mosq, packet);
}

@ -59,9 +59,6 @@ int send__pingresp(struct mosquitto *mosq)
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGRESP to %s", SAFE_PRINT(mosq->id));
# ifdef WITH_SYS_TREE
metrics__int_inc(mosq_counter_mqtt_pingresp_sent, 1);
# endif
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGRESP", SAFE_PRINT(mosq->id));
#endif
@ -72,9 +69,6 @@ int send__puback(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, cons
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code);
# ifdef WITH_SYS_TREE
metrics__int_inc(mosq_counter_mqtt_puback_sent, 1);
# endif
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code);
#endif
@ -87,9 +81,6 @@ int send__pubcomp(struct mosquitto *mosq, uint16_t mid, const mosquitto_property
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBCOMP to %s (m%d)", SAFE_PRINT(mosq->id), mid);
# ifdef WITH_SYS_TREE
metrics__int_inc(mosq_counter_mqtt_pubcomp_sent, 1);
# endif
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (m%d)", SAFE_PRINT(mosq->id), mid);
#endif
@ -103,9 +94,6 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code, cons
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code);
# ifdef WITH_SYS_TREE
metrics__int_inc(mosq_counter_mqtt_pubrec_sent, 1);
# endif
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (m%d, rc%d)", SAFE_PRINT(mosq->id), mid, reason_code);
#endif
@ -120,9 +108,6 @@ int send__pubrel(struct mosquitto *mosq, uint16_t mid, const mosquitto_property
{
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREL to %s (m%d)", SAFE_PRINT(mosq->id), mid);
# ifdef WITH_SYS_TREE
metrics__int_inc(mosq_counter_mqtt_pubrel_sent, 1);
# endif
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREL (m%d)", SAFE_PRINT(mosq->id), mid);
#endif

@ -47,6 +47,7 @@ set (MOSQ_SRCS
plugin_callbacks.c plugin_v5.c plugin_v4.c plugin_v3.c plugin_v2.c
plugin_init.c plugin_cleanup.c plugin_persist.c
plugin_acl_check.c plugin_basic_auth.c plugin_connect.c plugin_disconnect.c
plugin_client_offline.c
plugin_extended_auth.c plugin_message.c plugin_psk_key.c plugin_public.c
plugin_subscribe.c
plugin_tick.c

@ -74,6 +74,7 @@ OBJS= mosquitto.o \
plugin_acl_check.o \
plugin_basic_auth.o \
plugin_cleanup.o \
plugin_client_offline.o \
plugin_connect.o \
plugin_disconnect.o \
plugin_extended_auth.o \
@ -308,6 +309,9 @@ plugin_basic_auth.o : plugin_basic_auth.c ${R}/include/mosquitto_plugin.h mosqui
plugin_cleanup.o : plugin_cleanup.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_client_offline.o : plugin_client_offline.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_connect.o : plugin_connect.c ${R}/include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

@ -231,7 +231,12 @@ void context__disconnect(struct mosquitto *context, int reason)
if(send(context->sock, buf, 4, 0)){};
}
#endif
plugin__handle_disconnect(context, reason);
if(context->session_expiry_interval == 0){
plugin__handle_disconnect(context, reason);
}else{
plugin__handle_client_offline(context, reason);
}
context__send_will(context);
net__socket_close(context);

@ -163,6 +163,7 @@ struct plugin__callbacks{
struct mosquitto__callback *connect;
struct mosquitto__callback *control;
struct mosquitto__callback *disconnect;
struct mosquitto__callback *client_offline;
struct mosquitto__callback *ext_auth_continue;
struct mosquitto__callback *ext_auth_start;
struct mosquitto__callback *message_in;
@ -848,6 +849,7 @@ int acl__pre_check(mosquitto_plugin_id_t *plugin, struct mosquitto *context, int
void plugin__handle_connect(struct mosquitto *context);
void plugin__handle_disconnect(struct mosquitto *context, int reason);
void plugin__handle_client_offline(struct mosquitto *context, int reason);
int plugin__handle_message_in(struct mosquitto *context, struct mosquitto_base_msg *base_msg);
int plugin__handle_message_out(struct mosquitto *context, struct mosquitto_base_msg *base_msg);
int plugin__handle_subscribe(struct mosquitto *context, struct mosquitto_subscription *sub);

@ -49,6 +49,8 @@ static const char *get_event_name(int event)
return "disconnect";
case MOSQ_EVT_CONNECT:
return "connect";
case MOSQ_EVT_CLIENT_OFFLINE:
return "connect";
case MOSQ_EVT_SUBSCRIBE:
return "subscribe";
case MOSQ_EVT_UNSUBSCRIBE:
@ -121,6 +123,8 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__
return &security_options->plugin_callbacks.disconnect;
case MOSQ_EVT_CONNECT:
return &security_options->plugin_callbacks.connect;
case MOSQ_EVT_CLIENT_OFFLINE:
return &security_options->plugin_callbacks.client_offline;
case MOSQ_EVT_SUBSCRIBE:
return &security_options->plugin_callbacks.subscribe;
case MOSQ_EVT_UNSUBSCRIBE:

@ -0,0 +1,51 @@
/*
Copyright (c) 2023 Roger Light <roger@atchoo.org>
Copyright (c) 2023 Cedalo Gmbh
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include "mosquitto_broker_internal.h"
#include "utlist.h"
static void plugin__handle_client_offline_single(struct mosquitto__security_options *opts, struct mosquitto *context, int reason)
{
struct mosquitto_evt_client_offline event_data;
struct mosquitto__callback *cb_base, *cb_next;
if(context->id == NULL) return;
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.reason = reason;
DL_FOREACH_SAFE(opts->plugin_callbacks.client_offline, cb_base, cb_next){
cb_base->cb(MOSQ_EVT_CLIENT_OFFLINE, &event_data, cb_base->userdata);
}
}
void plugin__handle_client_offline(struct mosquitto *context, int reason)
{
/* Global plugins */
plugin__handle_client_offline_single(&db.config->security_options, context, reason);
/* Per listener plugins */
if(db.config->per_listener_settings && context->listener){
plugin__handle_client_offline_single(context->listener->security_options, context, reason);
}
}

@ -40,18 +40,15 @@ int handle__packet(struct mosquitto *context)
switch((context->in_packet.command)&0xF0){
case CMD_PINGREQ:
metrics__int_inc(mosq_counter_mqtt_pingreq_received, 1);
rc = handle__pingreq(context);
break;
case CMD_PINGRESP:
rc = handle__pingresp(context);
break;
case CMD_PUBACK:
metrics__int_inc(mosq_counter_mqtt_puback_received, 1);
rc = handle__pubackcomp(context, "PUBACK");
break;
case CMD_PUBCOMP:
metrics__int_inc(mosq_counter_mqtt_pubcomp_received, 1);
rc = handle__pubackcomp(context, "PUBCOMP");
break;
case CMD_PUBLISH:
@ -59,26 +56,21 @@ int handle__packet(struct mosquitto *context)
rc = handle__publish(context);
break;
case CMD_PUBREC:
metrics__int_inc(mosq_counter_mqtt_pubrec_received, 1);
rc = handle__pubrec(context);
break;
case CMD_PUBREL:
metrics__int_inc(mosq_counter_mqtt_pubrel_received, 1);
rc = handle__pubrel(context);
break;
case CMD_CONNECT:
metrics__int_inc(mosq_counter_mqtt_connect_received, 1);
return handle__connect(context);
case CMD_DISCONNECT:
metrics__int_inc(mosq_counter_mqtt_disconnect_received, 1);
rc = handle__disconnect(context);
break;
case CMD_SUBSCRIBE:
metrics__int_inc(mosq_counter_mqtt_subscribe_received, 1);
rc = handle__subscribe(context);
break;
case CMD_UNSUBSCRIBE:
metrics__int_inc(mosq_counter_mqtt_unsubscribe_received, 1);
rc = handle__unsubscribe(context);
break;
#ifdef WITH_BRIDGE
@ -93,7 +85,6 @@ int handle__packet(struct mosquitto *context)
break;
#endif
case CMD_AUTH:
metrics__int_inc(mosq_counter_mqtt_auth_received, 1);
rc = handle__auth(context);
break;
default:

@ -60,8 +60,6 @@ int send__auth(struct mosquitto *context, uint8_t reason_code, const void *auth_
property__write_all(packet, properties, true);
mosquitto_property_free_all(&properties);
metrics__int_inc(mosq_counter_mqtt_auth_sent, 1);
return packet__queue(context, packet);
error:
mosquitto_property_free_all(&properties);

@ -96,8 +96,6 @@ int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, c
}
mosquitto_property_free_all(&connack_props);
metrics__int_inc(mosq_counter_mqtt_connack_sent, 1);
return packet__queue(context, packet);
}

@ -55,7 +55,5 @@ int send__suback(struct mosquitto *context, uint16_t mid, uint32_t payloadlen, c
packet__write_bytes(packet, payload, payloadlen);
}
metrics__int_inc(mosq_counter_mqtt_suback_sent, 1);
return packet__queue(context, packet);
}

@ -54,7 +54,5 @@ int send__unsuback(struct mosquitto *mosq, uint16_t mid, int reason_code_count,
packet__write_bytes(packet, reason_codes, (uint32_t)reason_code_count);
}
metrics__int_inc(mosq_counter_mqtt_unsuback_sent, 1);
return packet__queue(mosq, packet);
}

@ -0,0 +1,48 @@
#!/usr/bin/env python3
from mosq_test_helper import *
def write_config(filename, port):
with open(filename, 'w') as f:
f.write("listener %d\n" % (port))
f.write("plugin c/plugin_evt_client_offline.so\n")
f.write("allow_anonymous true\n")
def do_test():
rc = 1
connect_packet = mosq_test.gen_connect("plugin-evt-subscribe", proto_ver=4, clean_session=False)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=4)
publish_packet = mosq_test.gen_publish("evt/client/offline", qos=0, payload="plugin-evt-subscribe")
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, use_conf=True)
try:
sub_sock = mosq_test.sub_helper(port, '#')
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
sock.close()
mosq_test.expect_packet(sub_sock, "publish", publish_packet)
rc = 0
sub_sock.close()
except mosq_test.TestError:
pass
finally:
os.remove(conf_file)
broker.terminate()
if mosq_test.wait_for_subprocess(broker):
print("broker not terminated")
if rc == 0: rc=1
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)
do_test()

@ -206,6 +206,7 @@ endif
./09-plugin-bad.py
./09-plugin-change-id.py
./09-plugin-delayed-auth.py
./09-plugin-evt-client-offline.py
./09-plugin-evt-message-in.py
./09-plugin-evt-message-out.py
./09-plugin-evt-psk-key.py

@ -45,6 +45,7 @@ PLUGIN_SRC = \
bad_v5_1.c \
bad_v6.c \
plugin_control.c \
plugin_evt_client_offline.c \
plugin_evt_message_in.c \
plugin_evt_message_out.c \
plugin_evt_psk_key.c \

@ -0,0 +1,50 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
#include <mosquitto_broker.h>
#include <mosquitto_plugin.h>
MOSQUITTO_PLUGIN_DECLARE_VERSION(5);
static mosquitto_plugin_id_t *plg_id;
int callback_client_offline(int event, void *event_data, void *user_data)
{
struct mosquitto_evt_client_offline *ed = event_data;
const char *clientid;
(void)user_data;
if(event != MOSQ_EVT_CLIENT_OFFLINE){
abort();
}
clientid = mosquitto_client_id(ed->client);
mosquitto_broker_publish_copy(NULL, "evt/client/offline", strlen(clientid), clientid, 0, false, NULL);
return MOSQ_ERR_SUCCESS;
}
int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, struct mosquitto_opt *opts, int opt_count)
{
(void)user_data;
(void)opts;
(void)opt_count;
plg_id = identifier;
mosquitto_callback_register(plg_id, MOSQ_EVT_CLIENT_OFFLINE, callback_client_offline, NULL, NULL);
return MOSQ_ERR_SUCCESS;
}
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
(void)user_data;
(void)opts;
(void)opt_count;
mosquitto_callback_unregister(plg_id, MOSQ_EVT_CLIENT_OFFLINE, callback_client_offline, NULL);
return MOSQ_ERR_SUCCESS;
}

@ -173,6 +173,7 @@ tests = [
(1, './09-plugin-auth-v5-unpwd-success.py'),
(1, './09-plugin-bad.py'),
(1, './09-plugin-change-id.py'),
(1, './09-plugin-evt-client-offline.py'),
(1, './09-plugin-evt-message-in.py'),
(1, './09-plugin-evt-message-out.py'),
(1, './09-plugin-evt-psk-key.py'),

Loading…
Cancel
Save