Allow plugins to send messages to specific clients.

pull/1522/merge
Roger A. Light 5 years ago
parent c94e111d73
commit 2dc8d2a19a

@ -3,7 +3,7 @@ include ../config.mk
.PHONY: all install uninstall clean reallyclean
ifeq ($(WITH_TLS),yes)
all : mosquitto mosquitto_passwd
all : mosquitto
else
all : mosquitto
endif

@ -10,4 +10,12 @@ _mosquitto_client_protocol_version
_mosquitto_client_sub_count
_mosquitto_client_username
_mosquitto_log_printf
_mosquitto_property_add_binary
_mosquitto_property_add_byte
_mosquitto_property_add_int16
_mosquitto_property_add_int32
_mosquitto_property_add_string
_mosquitto_property_add_string_pair
_mosquitto_property_add_varint
_mosquitto_property_free_all
_mosquitto_set_username

@ -12,5 +12,13 @@
mosquitto_client_username;
mosquitto_log_printf;
mosquitto_plugin_publish;
mosquitto_property_add_binary;
mosquitto_property_add_byte;
mosquitto_property_add_int16;
mosquitto_property_add_int32;
mosquitto_property_add_string;
mosquitto_property_add_string_pair;
mosquitto_property_add_varint;
mosquitto_property_free_all;
mosquitto_set_username;
};

@ -99,17 +99,59 @@ void lws__sul_callback(struct lws_sorted_usec_list *l)
static struct lws_sorted_usec_list sul;
#endif
static int single_publish(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_message_v5 *msg)
{
struct mosquitto_msg_store *stored;
int mid;
stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(stored == NULL) return MOSQ_ERR_NOMEM;
stored->topic = msg->topic;
msg->topic = NULL;
stored->retain = 0;
stored->payloadlen = msg->payloadlen;
if(UHPA_ALLOC(stored->payload, stored->payloadlen) == 0){
db__msg_store_free(stored);
return MOSQ_ERR_NOMEM;
}
memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), msg->payload, stored->payloadlen);
if(msg->properties){
stored->properties = msg->properties;
msg->properties = NULL;
}
if(db__message_store(db, context, stored, 0, 0, mosq_mo_broker)) return 1;
if(msg->qos){
mid = mosquitto__mid_generate(context);
}else{
mid = 0;
}
return db__message_insert(db, context, mid, mosq_md_out, msg->qos, 0, stored, msg->properties);
}
void queue_plugin_msgs(struct mosquitto_db *db)
{
struct mosquitto_message_v5 *msg, *tmp;
struct mosquitto *context;
DL_FOREACH_SAFE(db->plugin_msgs, msg, tmp){
DL_DELETE(db->plugin_msgs, msg);
db__messages_easy_queue(db, NULL, msg->topic, msg->qos, msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties);
if(msg->clientid){
HASH_FIND(hh_id, db->contexts_by_id, msg->clientid, strlen(msg->clientid), context);
if(context){
single_publish(db, context, msg);
}
}else{
db__messages_easy_queue(db, NULL, msg->topic, msg->qos, msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties);
}
mosquitto__free(msg->topic);
mosquitto__free(msg->payload);
mosquitto_property_free_all(&msg->properties);
mosquitto__free(msg->clientid);
mosquitto__free(msg);
}
}

@ -189,10 +189,15 @@ int mosquitto_set_username(struct mosquitto *client, const char *username);
* will be enforced as normal for individual clients when they are due to
* receive the message.
*
* payload must be allocated on the heap, and will be freed by mosquitto after
* use.
* It can be used to send messages to all clients that have a matching
* subscription, or to a single client whether or not it has a matching
* subscription.
*
* Parameters:
* clientid - optional string. If set to NULL, the message is delivered to all
* clients. If non-NULL, the message is delivered only to the
* client with the corresponding client id. If the client id
* specified is not connected, the message will be dropped.
* topic - message topic
* payloadlen - payload length in bytes. Can be 0 for an empty payload.
* payload - payload bytes. If payloadlen > 0 this must not be NULL. Must
@ -212,6 +217,7 @@ int mosquitto_set_username(struct mosquitto *client, const char *username);
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_broker_publish(
const char *clientid,
const char *topic,
int payloadlen,
void *payload,
@ -228,6 +234,10 @@ int mosquitto_broker_publish(
* of `payload` is taken.
*
* Parameters:
* clientid - optional string. If set to NULL, the message is delivered to all
* clients. If non-NULL, the message is delivered only to the
* client with the corresponding client id. If the client id
* specified is not connected, the message will be dropped.
* topic - message topic
* payloadlen - payload length in bytes. Can be 0 for an empty payload.
* payload - payload bytes. If payloadlen > 0 this must not be NULL.
@ -246,6 +256,7 @@ int mosquitto_broker_publish(
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_broker_publish_copy(
const char *clientid,
const char *topic,
int payloadlen,
const void *payload,

@ -446,6 +446,8 @@ struct mosquitto_message_v5{
char *topic;
void *payload;
mosquitto_property *properties;
char *clientid; /* Used only by mosquitto_broker_publish*() to indicate
this message is for a specific client. */
int payloadlen;
int qos;
bool retain;

@ -112,6 +112,7 @@ const char *mosquitto_client_username(const struct mosquitto *context)
int mosquitto_broker_publish(
const char *clientid,
const char *topic,
int payloadlen,
void *payload,
@ -135,8 +136,18 @@ int mosquitto_broker_publish(
msg->next = NULL;
msg->prev = NULL;
if(clientid){
msg->clientid = mosquitto__strdup(clientid);
if(msg->clientid == NULL){
mosquitto__free(msg);
return MOSQ_ERR_NOMEM;
}
}else{
msg->clientid = NULL;
}
msg->topic = mosquitto__strdup(topic);
if(msg->topic == NULL){
mosquitto__free(msg->clientid);
mosquitto__free(msg);
return MOSQ_ERR_NOMEM;
}
@ -155,6 +166,7 @@ int mosquitto_broker_publish(
int mosquitto_broker_publish_copy(
const char *clientid,
const char *topic,
int payloadlen,
const void *payload,
@ -179,6 +191,7 @@ int mosquitto_broker_publish_copy(
memcpy(payload_out, payload, payloadlen);
return mosquitto_broker_publish(
clientid,
topic,
payloadlen,
payload_out,

@ -0,0 +1,90 @@
#!/usr/bin/env python3
from mosq_test_helper import *
def write_config(filename, port):
with open(filename, 'w') as f:
f.write("port %d\n" % (port))
f.write("auth_plugin c/auth_plugin_publish.so\n")
f.write("allow_anonymous true\n")
proto_ver = 5
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
write_config(conf_file, port)
rc = 1
keepalive = 10
connect1_packet = mosq_test.gen_connect("test-client", keepalive=keepalive, username="readwrite", clean_session=False, proto_ver=proto_ver)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
publish_packet = mosq_test.gen_publish("init", qos=0, proto_ver=proto_ver)
publish0_packet = mosq_test.gen_publish("topic/0", qos=0, payload="test-message-0", proto_ver=proto_ver)
mid = 1
publish1_packet = mosq_test.gen_publish("topic/1", qos=1, mid=mid, payload="test-message-1", proto_ver=proto_ver)
puback1_packet = mosq_test.gen_puback(mid, proto_ver=proto_ver)
mid = 2
publish2_packet = mosq_test.gen_publish("topic/2", qos=2, mid=mid, payload="test-message-2", proto_ver=proto_ver)
pubrec2_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver)
pubrel2_packet = mosq_test.gen_pubrel(mid, proto_ver=proto_ver)
pubcomp2_packet = mosq_test.gen_pubcomp(mid, proto_ver=proto_ver)
props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 1)
publish0p_packet = mosq_test.gen_publish("topic/0", qos=0, payload="test-message-0", proto_ver=proto_ver, properties=props)
mid = 3
publish1p_packet = mosq_test.gen_publish("topic/1", qos=1, mid=mid, payload="test-message-1", proto_ver=proto_ver, properties=props)
puback1p_packet = mosq_test.gen_puback(mid, proto_ver=proto_ver)
mid = 4
publish2p_packet = mosq_test.gen_publish("topic/2", qos=2, mid=mid, payload="test-message-2", proto_ver=proto_ver, properties=props)
pubrec2p_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver)
pubrel2p_packet = mosq_test.gen_pubrel(mid, proto_ver=proto_ver)
pubcomp2p_packet = mosq_test.gen_pubcomp(mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
sock = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=20, port=port)
# Trigger the plugin to send us some messages
sock.send(publish_packet)
mosq_test.expect_packet(sock, "publish0", publish0_packet)
mosq_test.expect_packet(sock, "publish1", publish1_packet)
sock.send(puback1_packet)
mosq_test.expect_packet(sock, "publish2", publish2_packet)
mosq_test.do_send_receive(sock, pubrec2_packet, pubrel2_packet, "pubrel1")
sock.send(pubcomp2_packet)
# And trigger the second set of messages, with properties
sock.send(publish_packet)
mosq_test.expect_packet(sock, "publish0p", publish0p_packet)
mosq_test.expect_packet(sock, "publish1p", publish1p_packet)
sock.send(puback1_packet)
mosq_test.expect_packet(sock, "publish2p", publish2p_packet)
mosq_test.do_send_receive(sock, pubrec2p_packet, pubrel2p_packet, "pubrel1p")
sock.send(pubcomp2p_packet)
mosq_test.do_ping(sock)
rc = 0
sock.close()
except mosq_test.TestError:
pass
finally:
os.remove(conf_file)
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -189,6 +189,7 @@ endif
./09-plugin-auth-unpwd-success.py
./09-plugin-auth-v2-unpwd-fail.py
./09-plugin-auth-v2-unpwd-success.py
./09-plugin-publish.py
./09-pwfile-parse-invalid.py
10 :

@ -12,7 +12,8 @@ PLUGIN_SRC = \
auth_plugin_msg_params.c \
auth_plugin_extended_multiple.c \
auth_plugin_extended_single.c \
auth_plugin_extended_single2.c
auth_plugin_extended_single2.c \
auth_plugin_publish.c
PLUGINS = ${PLUGIN_SRC:.c=.so}

@ -0,0 +1,72 @@
#include <stdio.h>
#include <string.h>
#include <mqtt_protocol.h>
#include <mosquitto.h>
#include <mosquitto_broker.h>
#include <mosquitto_plugin.h>
int mosquitto_auth_plugin_version(void)
{
return MOSQ_AUTH_PLUGIN_VERSION;
}
int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *auth_opts, int auth_opt_count)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_security_init(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_security_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_acl_check(void *user_data, int access, struct mosquitto *client, const struct mosquitto_acl_msg *msg)
{
static int count = 0;
mosquitto_property *props = NULL;
if(access == MOSQ_ACL_WRITE){
if(count == 0){
/* "missing-client" isn't connected, so we can check memory usage properly. */
mosquitto_broker_publish_copy("missing-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL);
mosquitto_broker_publish_copy("test-client", "topic/0", strlen("test-message-0"), "test-message-0", 0, true, NULL);
mosquitto_broker_publish_copy("missing-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL);
mosquitto_broker_publish_copy("test-client", "topic/1", strlen("test-message-1"), "test-message-1", 1, true, NULL);
mosquitto_broker_publish_copy("missing-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL);
mosquitto_broker_publish_copy("test-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL);
count = 1;
}else{
mosquitto_property_add_byte(&props, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, 1);
mosquitto_broker_publish_copy("test-client", "topic/0", strlen("test-message-0"), "test-message-0", 0, true, props);
props = NULL;
mosquitto_property_add_byte(&props, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, 1);
mosquitto_broker_publish_copy("test-client", "topic/1", strlen("test-message-1"), "test-message-1", 1, true, props);
props = NULL;
mosquitto_property_add_byte(&props, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, 1);
mosquitto_broker_publish_copy("test-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, props);
}
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_unpwd_check(void *user_data, struct mosquitto *client, const char *username, const char *password)
{
return MOSQ_ERR_SUCCESS;
}
int mosquitto_auth_psk_key_get(void *user_data, struct mosquitto *client, const char *hint, const char *identity, char *key, int max_key_len)
{
return MOSQ_ERR_AUTH;
}

@ -157,6 +157,7 @@ tests = [
(1, './09-plugin-auth-unpwd-success.py'),
(1, './09-plugin-auth-v2-unpwd-fail.py'),
(1, './09-plugin-auth-v2-unpwd-success.py'),
(1, './09-plugin-publish.py'),
(1, './09-pwfile-parse-invalid.py'),
(2, './10-listener-mount-point.py'),

Loading…
Cancel
Save