diff --git a/src/Makefile b/src/Makefile index 393b1dc5..207c8c9f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -3,7 +3,7 @@ include ../config.mk .PHONY: all install uninstall clean reallyclean ifeq ($(WITH_TLS),yes) -all : mosquitto mosquitto_passwd +all : mosquitto else all : mosquitto endif diff --git a/src/linker-macosx.syms b/src/linker-macosx.syms index 63a9877d..f05a8721 100644 --- a/src/linker-macosx.syms +++ b/src/linker-macosx.syms @@ -10,4 +10,12 @@ _mosquitto_client_protocol_version _mosquitto_client_sub_count _mosquitto_client_username _mosquitto_log_printf +_mosquitto_property_add_binary +_mosquitto_property_add_byte +_mosquitto_property_add_int16 +_mosquitto_property_add_int32 +_mosquitto_property_add_string +_mosquitto_property_add_string_pair +_mosquitto_property_add_varint +_mosquitto_property_free_all _mosquitto_set_username diff --git a/src/linker.syms b/src/linker.syms index a77c97c1..6fd45b43 100644 --- a/src/linker.syms +++ b/src/linker.syms @@ -12,5 +12,13 @@ mosquitto_client_username; mosquitto_log_printf; mosquitto_plugin_publish; + mosquitto_property_add_binary; + mosquitto_property_add_byte; + mosquitto_property_add_int16; + mosquitto_property_add_int32; + mosquitto_property_add_string; + mosquitto_property_add_string_pair; + mosquitto_property_add_varint; + mosquitto_property_free_all; mosquitto_set_username; }; diff --git a/src/loop.c b/src/loop.c index 32b49b8e..aadd16b3 100644 --- a/src/loop.c +++ b/src/loop.c @@ -99,17 +99,59 @@ void lws__sul_callback(struct lws_sorted_usec_list *l) static struct lws_sorted_usec_list sul; #endif +static int single_publish(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_message_v5 *msg) +{ + struct mosquitto_msg_store *stored; + int mid; + + stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); + if(stored == NULL) return MOSQ_ERR_NOMEM; + + stored->topic = msg->topic; + msg->topic = NULL; + stored->retain = 0; + stored->payloadlen = msg->payloadlen; + if(UHPA_ALLOC(stored->payload, stored->payloadlen) == 0){ + db__msg_store_free(stored); + return MOSQ_ERR_NOMEM; + } + memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), msg->payload, stored->payloadlen); + + if(msg->properties){ + stored->properties = msg->properties; + msg->properties = NULL; + } + + if(db__message_store(db, context, stored, 0, 0, mosq_mo_broker)) return 1; + + if(msg->qos){ + mid = mosquitto__mid_generate(context); + }else{ + mid = 0; + } + return db__message_insert(db, context, mid, mosq_md_out, msg->qos, 0, stored, msg->properties); +} + void queue_plugin_msgs(struct mosquitto_db *db) { struct mosquitto_message_v5 *msg, *tmp; + struct mosquitto *context; DL_FOREACH_SAFE(db->plugin_msgs, msg, tmp){ DL_DELETE(db->plugin_msgs, msg); - db__messages_easy_queue(db, NULL, msg->topic, msg->qos, msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties); + if(msg->clientid){ + HASH_FIND(hh_id, db->contexts_by_id, msg->clientid, strlen(msg->clientid), context); + if(context){ + single_publish(db, context, msg); + } + }else{ + db__messages_easy_queue(db, NULL, msg->topic, msg->qos, msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties); + } mosquitto__free(msg->topic); mosquitto__free(msg->payload); mosquitto_property_free_all(&msg->properties); + mosquitto__free(msg->clientid); mosquitto__free(msg); } } diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 74a57c18..b9236563 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -189,10 +189,15 @@ int mosquitto_set_username(struct mosquitto *client, const char *username); * will be enforced as normal for individual clients when they are due to * receive the message. * - * payload must be allocated on the heap, and will be freed by mosquitto after - * use. + * It can be used to send messages to all clients that have a matching + * subscription, or to a single client whether or not it has a matching + * subscription. * * Parameters: + * clientid - optional string. If set to NULL, the message is delivered to all + * clients. If non-NULL, the message is delivered only to the + * client with the corresponding client id. If the client id + * specified is not connected, the message will be dropped. * topic - message topic * payloadlen - payload length in bytes. Can be 0 for an empty payload. * payload - payload bytes. If payloadlen > 0 this must not be NULL. Must @@ -212,6 +217,7 @@ int mosquitto_set_username(struct mosquitto *client, const char *username); * MOSQ_ERR_NOMEM - on out of memory */ int mosquitto_broker_publish( + const char *clientid, const char *topic, int payloadlen, void *payload, @@ -228,6 +234,10 @@ int mosquitto_broker_publish( * of `payload` is taken. * * Parameters: + * clientid - optional string. If set to NULL, the message is delivered to all + * clients. If non-NULL, the message is delivered only to the + * client with the corresponding client id. If the client id + * specified is not connected, the message will be dropped. * topic - message topic * payloadlen - payload length in bytes. Can be 0 for an empty payload. * payload - payload bytes. If payloadlen > 0 this must not be NULL. @@ -246,6 +256,7 @@ int mosquitto_broker_publish( * MOSQ_ERR_NOMEM - on out of memory */ int mosquitto_broker_publish_copy( + const char *clientid, const char *topic, int payloadlen, const void *payload, diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index a6344a51..19e936e7 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -446,6 +446,8 @@ struct mosquitto_message_v5{ char *topic; void *payload; mosquitto_property *properties; + char *clientid; /* Used only by mosquitto_broker_publish*() to indicate + this message is for a specific client. */ int payloadlen; int qos; bool retain; diff --git a/src/plugin.c b/src/plugin.c index 990f43bd..6487d773 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -112,6 +112,7 @@ const char *mosquitto_client_username(const struct mosquitto *context) int mosquitto_broker_publish( + const char *clientid, const char *topic, int payloadlen, void *payload, @@ -135,8 +136,18 @@ int mosquitto_broker_publish( msg->next = NULL; msg->prev = NULL; + if(clientid){ + msg->clientid = mosquitto__strdup(clientid); + if(msg->clientid == NULL){ + mosquitto__free(msg); + return MOSQ_ERR_NOMEM; + } + }else{ + msg->clientid = NULL; + } msg->topic = mosquitto__strdup(topic); if(msg->topic == NULL){ + mosquitto__free(msg->clientid); mosquitto__free(msg); return MOSQ_ERR_NOMEM; } @@ -155,6 +166,7 @@ int mosquitto_broker_publish( int mosquitto_broker_publish_copy( + const char *clientid, const char *topic, int payloadlen, const void *payload, @@ -179,6 +191,7 @@ int mosquitto_broker_publish_copy( memcpy(payload_out, payload, payloadlen); return mosquitto_broker_publish( + clientid, topic, payloadlen, payload_out, diff --git a/test/broker/09-plugin-publish.py b/test/broker/09-plugin-publish.py new file mode 100755 index 00000000..83d84c83 --- /dev/null +++ b/test/broker/09-plugin-publish.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 + +from mosq_test_helper import * + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("port %d\n" % (port)) + f.write("auth_plugin c/auth_plugin_publish.so\n") + f.write("allow_anonymous true\n") + +proto_ver = 5 +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port) + +rc = 1 +keepalive = 10 +connect1_packet = mosq_test.gen_connect("test-client", keepalive=keepalive, username="readwrite", clean_session=False, proto_ver=proto_ver) +connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + +publish_packet = mosq_test.gen_publish("init", qos=0, proto_ver=proto_ver) + +publish0_packet = mosq_test.gen_publish("topic/0", qos=0, payload="test-message-0", proto_ver=proto_ver) + +mid = 1 +publish1_packet = mosq_test.gen_publish("topic/1", qos=1, mid=mid, payload="test-message-1", proto_ver=proto_ver) +puback1_packet = mosq_test.gen_puback(mid, proto_ver=proto_ver) + +mid = 2 +publish2_packet = mosq_test.gen_publish("topic/2", qos=2, mid=mid, payload="test-message-2", proto_ver=proto_ver) +pubrec2_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver) +pubrel2_packet = mosq_test.gen_pubrel(mid, proto_ver=proto_ver) +pubcomp2_packet = mosq_test.gen_pubcomp(mid, proto_ver=proto_ver) + + +props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_PAYLOAD_FORMAT_INDICATOR, 1) +publish0p_packet = mosq_test.gen_publish("topic/0", qos=0, payload="test-message-0", proto_ver=proto_ver, properties=props) + +mid = 3 +publish1p_packet = mosq_test.gen_publish("topic/1", qos=1, mid=mid, payload="test-message-1", proto_ver=proto_ver, properties=props) +puback1p_packet = mosq_test.gen_puback(mid, proto_ver=proto_ver) + +mid = 4 +publish2p_packet = mosq_test.gen_publish("topic/2", qos=2, mid=mid, payload="test-message-2", proto_ver=proto_ver, properties=props) +pubrec2p_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver) +pubrel2p_packet = mosq_test.gen_pubrel(mid, proto_ver=proto_ver) +pubcomp2p_packet = mosq_test.gen_pubcomp(mid, proto_ver=proto_ver) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +try: + sock = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=20, port=port) + + # Trigger the plugin to send us some messages + sock.send(publish_packet) + + mosq_test.expect_packet(sock, "publish0", publish0_packet) + mosq_test.expect_packet(sock, "publish1", publish1_packet) + sock.send(puback1_packet) + + mosq_test.expect_packet(sock, "publish2", publish2_packet) + mosq_test.do_send_receive(sock, pubrec2_packet, pubrel2_packet, "pubrel1") + sock.send(pubcomp2_packet) + + # And trigger the second set of messages, with properties + sock.send(publish_packet) + mosq_test.expect_packet(sock, "publish0p", publish0p_packet) + mosq_test.expect_packet(sock, "publish1p", publish1p_packet) + sock.send(puback1_packet) + + mosq_test.expect_packet(sock, "publish2p", publish2p_packet) + mosq_test.do_send_receive(sock, pubrec2p_packet, pubrel2p_packet, "pubrel1p") + sock.send(pubcomp2p_packet) + + mosq_test.do_ping(sock) + + rc = 0 + sock.close() +except mosq_test.TestError: + pass +finally: + os.remove(conf_file) + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + + +exit(rc) diff --git a/test/broker/Makefile b/test/broker/Makefile index 673c640a..6731bb7d 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -189,6 +189,7 @@ endif ./09-plugin-auth-unpwd-success.py ./09-plugin-auth-v2-unpwd-fail.py ./09-plugin-auth-v2-unpwd-success.py + ./09-plugin-publish.py ./09-pwfile-parse-invalid.py 10 : diff --git a/test/broker/c/Makefile b/test/broker/c/Makefile index cdef589d..ffcde831 100644 --- a/test/broker/c/Makefile +++ b/test/broker/c/Makefile @@ -12,7 +12,8 @@ PLUGIN_SRC = \ auth_plugin_msg_params.c \ auth_plugin_extended_multiple.c \ auth_plugin_extended_single.c \ - auth_plugin_extended_single2.c + auth_plugin_extended_single2.c \ + auth_plugin_publish.c PLUGINS = ${PLUGIN_SRC:.c=.so} diff --git a/test/broker/c/auth_plugin_publish.c b/test/broker/c/auth_plugin_publish.c new file mode 100644 index 00000000..a1f07e0b --- /dev/null +++ b/test/broker/c/auth_plugin_publish.c @@ -0,0 +1,72 @@ +#include +#include +#include +#include +#include +#include + +int mosquitto_auth_plugin_version(void) +{ + return MOSQ_AUTH_PLUGIN_VERSION; +} + +int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *auth_opts, int auth_opt_count) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_security_init(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_security_cleanup(void *user_data, struct mosquitto_opt *auth_opts, int auth_opt_count, bool reload) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_acl_check(void *user_data, int access, struct mosquitto *client, const struct mosquitto_acl_msg *msg) +{ + static int count = 0; + mosquitto_property *props = NULL; + + if(access == MOSQ_ACL_WRITE){ + if(count == 0){ + /* "missing-client" isn't connected, so we can check memory usage properly. */ + mosquitto_broker_publish_copy("missing-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL); + mosquitto_broker_publish_copy("test-client", "topic/0", strlen("test-message-0"), "test-message-0", 0, true, NULL); + mosquitto_broker_publish_copy("missing-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL); + mosquitto_broker_publish_copy("test-client", "topic/1", strlen("test-message-1"), "test-message-1", 1, true, NULL); + mosquitto_broker_publish_copy("missing-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL); + mosquitto_broker_publish_copy("test-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, NULL); + count = 1; + }else{ + mosquitto_property_add_byte(&props, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, 1); + mosquitto_broker_publish_copy("test-client", "topic/0", strlen("test-message-0"), "test-message-0", 0, true, props); + props = NULL; + mosquitto_property_add_byte(&props, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, 1); + mosquitto_broker_publish_copy("test-client", "topic/1", strlen("test-message-1"), "test-message-1", 1, true, props); + props = NULL; + mosquitto_property_add_byte(&props, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, 1); + mosquitto_broker_publish_copy("test-client", "topic/2", strlen("test-message-2"), "test-message-2", 2, true, props); + } + } + + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_unpwd_check(void *user_data, struct mosquitto *client, const char *username, const char *password) +{ + return MOSQ_ERR_SUCCESS; +} + +int mosquitto_auth_psk_key_get(void *user_data, struct mosquitto *client, const char *hint, const char *identity, char *key, int max_key_len) +{ + return MOSQ_ERR_AUTH; +} + diff --git a/test/broker/test.py b/test/broker/test.py index 324b89ee..74232ae7 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -157,6 +157,7 @@ tests = [ (1, './09-plugin-auth-unpwd-success.py'), (1, './09-plugin-auth-v2-unpwd-fail.py'), (1, './09-plugin-auth-v2-unpwd-success.py'), + (1, './09-plugin-publish.py'), (1, './09-pwfile-parse-invalid.py'), (2, './10-listener-mount-point.py'),