allow message modification on output

Signed-off-by: Abilio Marques <abiliojr@gmail.com>
pull/2735/head
Abilio Marques 3 years ago committed by Roger A. Light
parent 9a6dcc0232
commit 1677d1aed2

@ -92,7 +92,7 @@ Plugins / plugin interface:
- Plugins on non-Windows platforms now no longer make their symbols globally
available, which means they are self contained.
- Add support for delayed basic authentication in plugins.
- Plugins using the MOSQ_EVT_MESSAGE callback can now return
- Plugins using the MOSQ_EVT_MESSAGE_WRITE callback can now return
MOSQ_ERR_QUOTA_EXCEEDED to have the message be rejected. MQTT v5 clients
using QoS 1 or 2 will receive the quota-exceeded reason code in the
corresponding PUBACK/PUBREC.

@ -72,7 +72,8 @@ enum mosquitto_plugin_event {
MOSQ_EVT_EXT_AUTH_START = 4,
MOSQ_EVT_EXT_AUTH_CONTINUE = 5,
MOSQ_EVT_CONTROL = 6,
MOSQ_EVT_MESSAGE = 7,
MOSQ_EVT_MESSAGE = 7, // deprecated name
MOSQ_EVT_MESSAGE_WRITE = 7,
MOSQ_EVT_PSK_KEY = 8,
MOSQ_EVT_TICK = 9,
MOSQ_EVT_DISCONNECT = 10,
@ -95,6 +96,7 @@ enum mosquitto_plugin_event {
MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE = 27,
MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 28,
MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 29,
MOSQ_EVT_MESSAGE_READ = 30,
};
/* Data for the MOSQ_EVT_RELOAD event */
@ -166,7 +168,7 @@ struct mosquitto_evt_control {
void *future2[4];
};
/* Data for the MOSQ_EVT_MESSAGE event */
/* Data for the MOSQ_EVT_MESSAGE_WRITE and MOSQ_EVT_MESSAGE_READ events */
struct mosquitto_evt_message {
void *future;
struct mosquitto *client;
@ -394,10 +396,13 @@ mosq_EXPORT int mosquitto_plugin_set_info(
* * MOSQ_EVT_CONTROL
* Called on receipt of a $CONTROL message that the plugin has
* registered for.
* * MOSQ_EVT_MESSAGE
* Called for each PUBLISH message after it has been received and
* authorised, but before it is sent to subscribing clients. The
* contents of the message can be modified.
* * MOSQ_EVT_MESSAGE_WRITE
* Called for each incoming PUBLISH message after it has been received
* and authorised. The contents of the message can be modified.
* * MOSQ_EVT_MESSAGE_READ
* Called for each outgoing PUBLISH message after it has been authorised,
* but before it is sent to each subscribing client. The contents of the
* message can be modified.
* * MOSQ_EVT_PSK_KEY
* Called when a client connects with TLS-PSK and the broker needs
* the PSK information.
@ -441,7 +446,8 @@ mosq_EXPORT int mosquitto_callback_register(
* * MOSQ_EVT_EXT_AUTH_START
* * MOSQ_EVT_EXT_AUTH_CONTINUE
* * MOSQ_EVT_CONTROL
* * MOSQ_EVT_MESSAGE
* * MOSQ_EVT_MESSAGE_WRITE
* * MOSQ_EVT_MESSAGE_READ
* * MOSQ_EVT_PSK_KEY
* * MOSQ_EVT_TICK
* * MOSQ_EVT_DISCONNECT

@ -56,6 +56,35 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
#ifdef WITH_BROKER
bool payload_changed = false;
bool topic_changed = false;
bool properties_changed = false;
{
struct mosquitto_base_msg tmp_msg;
tmp_msg.topic = (char *) topic;
tmp_msg.payloadlen = payloadlen;
tmp_msg.payload = (void *) payload;
tmp_msg.qos = qos;
tmp_msg.retain = retain;
tmp_msg.properties = (mosquitto_property *) store_props;
plugin__handle_message_read(mosq, &tmp_msg);
if(tmp_msg.payload != payload) payload_changed = true;
if(tmp_msg.topic != topic) topic_changed = true;
if(tmp_msg.properties != store_props) properties_changed = true;
topic = tmp_msg.topic;
payloadlen = tmp_msg.payloadlen;
payload = tmp_msg.payload;
qos = tmp_msg.qos;
retain = tmp_msg.retain;
store_props = tmp_msg.properties;
}
#endif
if(!mosq->retain_available){
retain = false;
}
@ -125,7 +154,15 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
#endif
#ifdef WITH_BROKER
rc = send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
if(payload_changed) mosquitto__free((void *) payload);
if(topic_changed) mosquitto__free((char *) topic);
if(properties_changed) mosquitto_property_free_all((mosquitto_property **) &store_props);
return rc;
#else
return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
#endif
}

@ -99,7 +99,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier;
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -51,7 +51,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier;
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -66,7 +66,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier;
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -70,7 +70,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier;
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -93,7 +93,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier;
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -134,7 +134,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
memset(last_size_counts, 0, sizeof(last_size_counts));
mosq_pid = identifier;
mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
mosquitto_callback_register(mosq_pid, MOSQ_EVT_TICK, callback_tick, NULL, NULL);
return MOSQ_ERR_SUCCESS;

@ -60,11 +60,12 @@ const char evt_topics[][60] = {
TOPIC_BASE "auth/ext/start", /* MOSQ_EVT_EXT_AUTH_START */
TOPIC_BASE "auth/ext/continue", /* MOSQ_EVT_EXT_AUTH_CONTINUE */
TOPIC_BASE "control", /* MOSQ_EVT_CONTROL */
TOPIC_BASE "message", /* MOSQ_EVT_MESSAGE */
TOPIC_BASE "message", /* MOSQ_EVT_MESSAGE_WRITE */
TOPIC_BASE "psk_key", /* MOSQ_EVT_PSK_KEY */
TOPIC_BASE "tick", /* MOSQ_EVT_TICK */
TOPIC_BASE "disconnect", /* MOSQ_EVT_DISCONNECT */
TOPIC_BASE "connect", /* MOSQ_EVT_CONNECT */
TOPIC_BASE "message_read", /* MOSQ_EVT_MESSAGE_READ */
TOPIC_BASE "subscribe", /* MOSQ_EVT_SUBSCRIBE */
TOPIC_BASE "unsubscribe", /* MOSQ_EVT_UNSUBSCRIBE */
TOPIC_BASE "persist/restore", /* MOSQ_EVT_PERSIST_RESTORE */

@ -40,7 +40,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier;
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, message_callback, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, message_callback, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -76,7 +76,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
UNUSED(opt_count);
mosq_pid = identifier;
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE_WRITE, callback_message, NULL, NULL);
}
/* mosquitto_plugin_cleanup() is optional in 2.1 and later. Use it only if you have your own cleanup to do */

@ -278,7 +278,7 @@ int handle__publish(struct mosquitto *context)
}
{
rc = plugin__handle_message(context, base_msg);
rc = plugin__handle_message_write(context, &base_msg->data);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",

@ -165,7 +165,8 @@ struct plugin__callbacks{
struct mosquitto__callback *disconnect;
struct mosquitto__callback *ext_auth_continue;
struct mosquitto__callback *ext_auth_start;
struct mosquitto__callback *message;
struct mosquitto__callback *message_write;
struct mosquitto__callback *message_read;
struct mosquitto__callback *psk_key;
struct mosquitto__callback *reload;
struct mosquitto__callback *subscribe;
@ -850,7 +851,8 @@ int acl__pre_check(mosquitto_plugin_id_t *plugin, struct mosquitto *context, int
void plugin__handle_connect(struct mosquitto *context);
void plugin__handle_disconnect(struct mosquitto *context, int reason);
int plugin__handle_message(struct mosquitto *context, struct mosquitto__base_msg *base_msg);
int plugin__handle_message_write(struct mosquitto *context, struct mosquitto_base_msg *base_msg);
int plugin__handle_message_read(struct mosquitto *context, struct mosquitto_base_msg *base_msg);
int plugin__handle_subscribe(struct mosquitto *context, const struct mosquitto_subscription *sub);
int plugin__handle_unsubscribe(struct mosquitto *context, const struct mosquitto_subscription *sub);
void LIB_ERROR(void);

@ -39,8 +39,10 @@ static const char *get_event_name(int event)
return "auth-start";
case MOSQ_EVT_EXT_AUTH_CONTINUE:
return "auth-continue";
case MOSQ_EVT_MESSAGE:
return "message";
case MOSQ_EVT_MESSAGE_WRITE:
return "message-write";
case MOSQ_EVT_MESSAGE_READ:
return "message-read";
case MOSQ_EVT_TICK:
return "tick";
case MOSQ_EVT_DISCONNECT:
@ -115,14 +117,16 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__
return &security_options->plugin_callbacks.ext_auth_continue;
case MOSQ_EVT_CONTROL:
return NULL;
case MOSQ_EVT_MESSAGE:
return &security_options->plugin_callbacks.message;
case MOSQ_EVT_MESSAGE_WRITE: /* same as MOSQ_EVT_MESSAGE */
return &security_options->plugin_callbacks.message_write;
case MOSQ_EVT_TICK:
return &security_options->plugin_callbacks.tick;
case MOSQ_EVT_DISCONNECT:
return &security_options->plugin_callbacks.disconnect;
case MOSQ_EVT_CONNECT:
return &security_options->plugin_callbacks.connect;
case MOSQ_EVT_MESSAGE_READ:
return &security_options->plugin_callbacks.message_read;
case MOSQ_EVT_SUBSCRIBE:
return &security_options->plugin_callbacks.subscribe;
case MOSQ_EVT_UNSUBSCRIBE:

@ -22,8 +22,13 @@ Contributors:
#include "memory_mosq.h"
#include "utlist.h"
struct should_free {
bool topic;
bool payload;
bool properties;
};
static int plugin__handle_message_single(struct mosquitto__security_options *opts, struct mosquitto *context, struct mosquitto__base_msg *stored)
static int plugin__handle_message_single(struct mosquitto__callback *callbacks, enum mosquitto_plugin_event ev_type, struct should_free *to_free, struct mosquitto *context, struct mosquitto_base_msg *stored)
{
struct mosquitto_evt_message event_data;
struct mosquitto__callback *cb_base;
@ -31,44 +36,78 @@ static int plugin__handle_message_single(struct mosquitto__security_options *opt
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = stored->data.topic;
event_data.payloadlen = stored->data.payloadlen;
event_data.payload = stored->data.payload;
event_data.qos = stored->data.qos;
event_data.retain = stored->data.retain;
event_data.properties = stored->data.properties;
DL_FOREACH(opts->plugin_callbacks.message, cb_base){
rc = cb_base->cb(MOSQ_EVT_MESSAGE, &event_data, cb_base->userdata);
event_data.topic = stored->topic;
event_data.payloadlen = stored->payloadlen;
event_data.payload = stored->payload;
event_data.qos = stored->qos;
event_data.retain = stored->retain;
event_data.properties = stored->properties;
DL_FOREACH(callbacks, cb_base){
rc = cb_base->cb(ev_type, &event_data, cb_base->userdata);
if(rc != MOSQ_ERR_SUCCESS){
break;
}
if(stored->topic != event_data.topic){
if(to_free->topic) mosquitto__FREE(stored->topic);
stored->topic = event_data.topic;
to_free->topic = true;
}
if(stored->payload != event_data.payload){
if(to_free->payload) mosquitto__FREE(stored->payload);
stored->payload = event_data.payload;
stored->payloadlen = event_data.payloadlen;
to_free->payload = true;
}
if(stored->properties != event_data.properties){
if(to_free->properties) mosquitto_property_free_all(&stored->properties);
stored->properties = event_data.properties;
to_free->properties = true;
}
}
stored->retain = event_data.retain;
if(ev_type == MOSQ_EVT_MESSAGE_READ){
stored->qos = event_data.qos;
}
stored->data.topic = event_data.topic;
if(stored->data.payload != event_data.payload){
mosquitto__FREE(stored->data.payload);
stored->data.payload = event_data.payload;
stored->data.payloadlen = event_data.payloadlen;
return rc;
}
int plugin__handle_message_read(struct mosquitto *context, struct mosquitto_base_msg *stored)
{
int rc = MOSQ_ERR_SUCCESS;
struct should_free to_free = {false, false, false}; /* in msg_read, original data will be freed later */
/* Global plugins */
rc = plugin__handle_message_single(db.config->security_options.plugin_callbacks.message_read,
MOSQ_EVT_MESSAGE_READ, &to_free, context, stored);
if(rc) return rc;
if(db.config->per_listener_settings && context->listener){
rc = plugin__handle_message_single(context->listener->security_options->plugin_callbacks.message_read,
MOSQ_EVT_MESSAGE_READ, &to_free, context, stored);
}
stored->data.retain = event_data.retain;
stored->data.properties = event_data.properties;
return rc;
}
int plugin__handle_message(struct mosquitto *context, struct mosquitto__base_msg *stored)
int plugin__handle_message_write(struct mosquitto *context, struct mosquitto_base_msg *stored)
{
int rc = MOSQ_ERR_SUCCESS;
struct should_free to_free = {true, true, true}; /* in msg_write, original data should be freed */
/* Global plugins */
rc = plugin__handle_message_single(&db.config->security_options,
context, stored);
rc = plugin__handle_message_single(db.config->security_options.plugin_callbacks.message_write,
MOSQ_EVT_MESSAGE_WRITE, &to_free, context, stored);
if(rc) return rc;
if(db.config->per_listener_settings && context->listener){
rc = plugin__handle_message_single(context->listener->security_options,
context, stored);
rc = plugin__handle_message_single(context->listener->security_options->plugin_callbacks.message_write,
MOSQ_EVT_MESSAGE_WRITE, &to_free, context, stored);
}
return rc;

@ -19,7 +19,7 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
plg_id = identifier;
mosquitto_callback_register(plg_id, MOSQ_EVT_MESSAGE, handle_publish, NULL, NULL);
mosquitto_callback_register(plg_id, MOSQ_EVT_MESSAGE_WRITE, handle_publish, NULL, NULL);
return MOSQ_ERR_SUCCESS;
}
@ -30,7 +30,7 @@ int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, i
(void)auth_opts;
(void)auth_opt_count;
mosquitto_callback_unregister(plg_id, MOSQ_EVT_MESSAGE, handle_publish, NULL);
mosquitto_callback_unregister(plg_id, MOSQ_EVT_MESSAGE_WRITE, handle_publish, NULL);
return MOSQ_ERR_SUCCESS;
}
@ -41,7 +41,7 @@ int handle_publish(int event, void *event_data, void *user_data)
(void)user_data;
if(event != MOSQ_EVT_MESSAGE){
if(event != MOSQ_EVT_MESSAGE_WRITE){
abort();
}
mosquitto_free(ed->topic);

Loading…
Cancel
Save