From b83c58763da1a7207b94ce0b4654bfa42a021663 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 25 Jan 2016 22:55:31 +0000 Subject: [PATCH 01/10] Add mosquitto_subscribe_single()/multiple(). --- ChangeLog.txt | 3 ++ lib/Makefile | 4 ++ lib/linker.version | 6 +++ lib/messages_mosq.c | 10 ++++- lib/mosquitto.h | 92 +++++++++++++++++++++++++++++++++++++++++- man/libmosquitto.3.xml | 21 ++++++++++ src/bridge.c | 2 +- 7 files changed, 135 insertions(+), 3 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index a3cf20b2..600e6194 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -30,6 +30,9 @@ Client library: - Outgoing messages with QoS>1 are no longer retried after a timeout period. Messages will be retried when a client reconnects. - DNS-SRV support is now disabled by default. +- Add mosquitto_subscribe_single() and mosquitto_subscribe_multiple(). These + are two helper functions to make retrieving messages from a broker very + straightforward. Client: - Add -x to mosquitto_sub for printing the payload in hexadecimal format. diff --git a/lib/Makefile b/lib/Makefile index 9d3722eb..cd599fdd 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -3,6 +3,7 @@ include ../config.mk .PHONY : really clean install MOSQ_OBJS=mosquitto.o \ + helpers.o \ logging_mosq.o \ memory_mosq.o \ messages_mosq.o \ @@ -51,6 +52,9 @@ libmosquitto.a : ${MOSQ_OBJS} mosquitto.o : mosquitto.c mosquitto.h ${CROSS_COMPILE}$(CC) $(LIB_CFLAGS) -c $< -o $@ +helpers.o : helpers.c + ${CROSS_COMPILE}$(CC) $(LIB_CFLAGS) -c $< -o $@ + logging_mosq.o : logging_mosq.c logging_mosq.h ${CROSS_COMPILE}$(CC) $(LIB_CFLAGS) -c $< -o $@ diff --git a/lib/linker.version b/lib/linker.version index 59eed794..d04984d6 100644 --- a/lib/linker.version +++ b/lib/linker.version @@ -78,3 +78,9 @@ MOSQ_1.4 { mosquitto_sub_topic_check; mosquitto_socks5_set; } MOSQ_1.3; + +MOSQ_1.5 { + global: + mosquitto_subscribe_simple; + mosquitto_subscribe_multiple; +} MOSQ_1.4; diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index 38bdaea9..208e73b7 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -66,7 +66,7 @@ int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto dst->qos = src->qos; dst->retain = src->retain; if(src->payloadlen){ - dst->payload = mosquitto__malloc(src->payloadlen); + dst->payload = mosquitto__calloc(src->payloadlen+1, sizeof(uint8_t)); if(!dst->payload){ mosquitto__free(dst->topic); return MOSQ_ERR_NOMEM; @@ -106,6 +106,14 @@ void mosquitto_message_free(struct mosquitto_message **message) mosquitto__free(msg); } +void mosquitto_message_free_contents(struct mosquitto_message *message) +{ + if(!message) return; + + mosquitto__free(message->topic); + mosquitto__free(message->payload); +} + int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir) { int rc = 0; diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 4be9d5f5..e7798861 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -672,10 +672,23 @@ libmosq_EXPORT int mosquitto_message_copy(struct mosquitto_message *dst, const s * message - pointer to a mosquitto_message pointer to free. * * See Also: - * + * , */ libmosq_EXPORT void mosquitto_message_free(struct mosquitto_message **message); +/* + * Function: mosquitto_message_free_contents + * + * Free a mosquitto_message struct contents, leaving the struct unaffected. + * + * Parameters: + * message - pointer to a mosquitto_message struct to free its contents. + * + * See Also: + * , + */ +libmosq_EXPORT void mosquitto_message_free_contents(struct mosquitto_message *message); + /* * Function: mosquitto_loop * @@ -1523,6 +1536,83 @@ libmosq_EXPORT int mosquitto_pub_topic_check(const char *topic); */ libmosq_EXPORT int mosquitto_sub_topic_check(const char *topic); + +struct libmosquitto_will { + char *topic; + void *payload; + int payloadlen; + int qos; + bool retain; +}; + +struct libmosquitto_auth { + char *username; + char *password; +}; + +struct libmosquitto_tls { + char *cafile; + char *capath; + char *certfile; + char *keyfile; + char *ciphers; + char *tls_version; + int (*pw_callback)(char *buf, int size, int rwflag, void *userdata); + int cert_reqs; +}; + +/* + * Function: mosquitto_subscribe_simple + * + * Helper function to make subscribing to a topic and retrieving some messages + * very straightforward. + * + * This connects to a broker, subscribes to a topic, waits for msg_count + * messages to be received, then returns after disconnecting cleanly. + * + * Parameters: + * messages - pointer to a "struct mosquitto_message *". The received + * messages will be returned here. On error, this will be set to + * NULL. + * msg_count - the number of messages to retrieve. + * topic - the subscription topic to use (wildcards are allowed). + * qos - the qos to use for the subscription. + * retained - if set to true, stale retained messages will be treated as + * normal messages with regards to msg_count. If set to false, + * they will be ignored. + * host - the broker to connect to. + * port - the network port the broker is listening on. + * client_id - the client id to use, or NULL if a random client id should be + * generated. + * keepalive - the MQTT keepalive value. + * clean_session - the MQTT clean session flag. + * username - the username string, or NULL for no username authentication. + * password - the password string, or NULL for an empty password. + * will - a libmosquitto_will struct containing will information, or NULL for + * no will. + * tls - a libmosquitto_tls struct containing TLS related parameters, or NULL + * for no use of TLS. + * + * + * Returns: + * MOSQ_ERR_SUCCESS - on success + * >0 - on error. + */ +libmosq_EXPORT int mosquitto_subscribe_simple( + struct mosquitto_message **messages, + int msg_count, + const char *topic, + int qos, + bool retained, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls); #ifdef __cplusplus } #endif diff --git a/man/libmosquitto.3.xml b/man/libmosquitto.3.xml index ed122283..e051069c 100644 --- a/man/libmosquitto.3.xml +++ b/man/libmosquitto.3.xml @@ -368,6 +368,27 @@ bool *result + + + Helper functions + + int mosquitto_subscribe_simple + struct mosquitto_message **message + int msg_count + const char *topic + const char *qos + bool retained + const char *host + int port + const char *client_id + int keepalive + bool clean_session + const char *username + const char *password + const struct libmosquitto_will *will + const struct libmosquitto_tls *tls + + diff --git a/src/bridge.c b/src/bridge.c index c3787e6e..4cb92ce5 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -180,7 +180,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) rc = net__socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false); if(rc > 0 ){ if(rc == MOSQ_ERR_TLS){ - _mosquitto_socket_close(db, context); + net__socket_close(db, context); return rc; /* Error already printed */ }else if(rc == MOSQ_ERR_ERRNO){ log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno)); From 896b4563fb5076b4a22afc115e7303e9a6d77b02 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 09:26:17 +0000 Subject: [PATCH 02/10] Add missing helpers code. --- lib/CMakeLists.txt | 1 + lib/helpers.c | 179 +++++++++++++++++++++++++++++++++++++++++ man/libmosquitto.3.xml | 2 +- 3 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 lib/helpers.c diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 05304654..aabb27ad 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -25,6 +25,7 @@ include_directories(${mosquitto_SOURCE_DIR} ${mosquitto_SOURCE_DIR}/lib link_directories(${mosquitto_SOURCE_DIR}/lib) add_library(libmosquitto SHARED + helpers.c logging_mosq.c logging_mosq.h memory_mosq.c memory_mosq.h messages_mosq.c messages_mosq.h diff --git a/lib/helpers.c b/lib/helpers.c new file mode 100644 index 00000000..30db83cf --- /dev/null +++ b/lib/helpers.c @@ -0,0 +1,179 @@ +/* +Copyright (c) 2016 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include +#include + +#include "mosquitto.h" +#include "mosquitto_internal.h" + +#if 0 + struct mosquitto_message *msg; + msg = mosquitto_subscribe_single("#", 0, 1, true, NULL, 1883, NULL, 60, NULL, NULL, NULL); +#endif + +struct subscribe__userdata { + const char *topic; + struct mosquitto_message *messages; + int max_msg_count; + int message_count; + int qos; + bool retained; +}; + + +void on_connect(struct mosquitto *mosq, void *obj, int rc) +{ + struct subscribe__userdata *userdata = obj; + + mosquitto_subscribe(mosq, NULL, userdata->topic, userdata->qos); +} + + +void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +{ + struct subscribe__userdata *userdata = obj; + int rc; + + if(userdata->max_msg_count == 0){ + return; + } + + /* Don't process stale retained messages if 'retained' was false */ + if(!userdata->retained && message->retain){ + return; + } + + userdata->max_msg_count--; + + rc = mosquitto_message_copy(&userdata->messages[userdata->message_count], message); + userdata->message_count++; + if(userdata->max_msg_count == 0){ + mosquitto_disconnect(mosq); + } +} + + +void on_log(struct mosquitto *mosq, void *obj, int level, const char *str) +{ + printf("LOG %s\n", str); +} + + +libmosq_EXPORT int mosquitto_subscribe_simple( + struct mosquitto_message **messages, + int msg_count, + const char *topic, + int qos, + bool retained, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls) +{ + struct mosquitto *mosq; + struct subscribe__userdata userdata; + int rc; + + if(!topic || msg_count < 1 || !messages){ + return MOSQ_ERR_INVAL; + } + + *messages = NULL; + + userdata.topic = topic; + userdata.qos = qos; + userdata.max_msg_count = msg_count; + userdata.retained = retained; + userdata.messages = calloc(sizeof(struct mosquitto_message), msg_count); + if(!userdata.messages){ + return MOSQ_ERR_NOMEM; + } + userdata.message_count = 0; + + mosq = mosquitto_new(client_id, clean_session, &userdata); + if(!mosq){ + free(userdata.messages); + userdata.messages = NULL; + return MOSQ_ERR_NOMEM; + } + + if(will){ + rc = mosquitto_will_set(mosq, will->topic, will->payloadlen, will->payload, will->qos, will->retain); + if(rc){ + free(userdata.messages); + userdata.messages = NULL; + mosquitto_destroy(mosq); + return rc; + } + } + if(username){ + rc = mosquitto_username_pw_set(mosq, username, password); + if(rc){ + free(userdata.messages); + userdata.messages = NULL; + mosquitto_destroy(mosq); + return rc; + } + } + if(tls){ + rc = mosquitto_tls_set(mosq, tls->cafile, tls->capath, tls->certfile, tls->keyfile, tls->pw_callback); + if(rc){ + free(userdata.messages); + userdata.messages = NULL; + mosquitto_destroy(mosq); + return rc; + } + rc = mosquitto_tls_opts_set(mosq, tls->cert_reqs, tls->tls_version, tls->ciphers); + if(rc){ + free(userdata.messages); + userdata.messages = NULL; + mosquitto_destroy(mosq); + return rc; + } + } + + mosquitto_log_callback_set(mosq, on_log); + mosquitto_connect_callback_set(mosq, on_connect); + mosquitto_message_callback_set(mosq, on_message); + + rc = mosquitto_connect(mosq, host, port, keepalive); + if(rc){ + free(userdata.messages); + userdata.messages = NULL; + mosquitto_destroy(mosq); + return rc; + } + rc = mosquitto_loop_forever(mosq, -1, 1); + printf("lp:%d\n", rc); + mosquitto_destroy(mosq); + if(!rc && userdata.max_msg_count == 0){ + printf("*messages: %p\n", userdata.messages); + *messages = userdata.messages; + return MOSQ_ERR_SUCCESS; + }else{ + // FIXME - free messages + free(userdata.messages); + userdata.messages = NULL; + return rc; + } +} + diff --git a/man/libmosquitto.3.xml b/man/libmosquitto.3.xml index e051069c..9d7bda1d 100644 --- a/man/libmosquitto.3.xml +++ b/man/libmosquitto.3.xml @@ -376,7 +376,7 @@ struct mosquitto_message **message int msg_count const char *topic - const char *qos + intqos bool retained const char *host int port From d157e8c60335dbea38ca8a60addc8e5c11d7bbb1 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 09:55:17 +0000 Subject: [PATCH 03/10] Add examples for subscribe_simple. --- ChangeLog.txt | 6 ++--- examples/subscribe_simple/Makefile | 23 ++++++++++++++++ examples/subscribe_simple/multiple.c | 39 ++++++++++++++++++++++++++++ examples/subscribe_simple/single.c | 33 +++++++++++++++++++++++ 4 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 examples/subscribe_simple/Makefile create mode 100644 examples/subscribe_simple/multiple.c create mode 100644 examples/subscribe_simple/single.c diff --git a/ChangeLog.txt b/ChangeLog.txt index 600e6194..9e2735be 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -30,9 +30,9 @@ Client library: - Outgoing messages with QoS>1 are no longer retried after a timeout period. Messages will be retried when a client reconnects. - DNS-SRV support is now disabled by default. -- Add mosquitto_subscribe_single() and mosquitto_subscribe_multiple(). These - are two helper functions to make retrieving messages from a broker very - straightforward. +- Add mosquitto_subscribe_simple() This is a helper function to make + retrieving messages from a broker very straightforward. Examples of its user + are in examples/subscribe_simple. Client: - Add -x to mosquitto_sub for printing the payload in hexadecimal format. diff --git a/examples/subscribe_simple/Makefile b/examples/subscribe_simple/Makefile new file mode 100644 index 00000000..f42b1364 --- /dev/null +++ b/examples/subscribe_simple/Makefile @@ -0,0 +1,23 @@ +include ../../config.mk + +.PHONY: all + +all : sub_single sub_multiple + +sub_single : single.o + ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} + +sub_multiple : multiple.o + ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} + +single.o : single.c ../../lib/libmosquitto.so.${SOVERSION} + ${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS} + +multiple.o : multiple.c ../../lib/libmosquitto.so.${SOVERSION} + ${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS} + +../../lib/libmosquitto.so.${SOVERSION} : + $(MAKE) -C ../../lib + +clean : + -rm -f *.o sub_single sub_multiple diff --git a/examples/subscribe_simple/multiple.c b/examples/subscribe_simple/multiple.c new file mode 100644 index 00000000..4938a06e --- /dev/null +++ b/examples/subscribe_simple/multiple.c @@ -0,0 +1,39 @@ +#include +#include +#include "mosquitto.h" + +#define COUNT 3 + +int main(int argc, char *argv[]) +{ + int rc; + int i; + struct mosquitto_message *msg; + + mosquitto_lib_init(); + + rc = mosquitto_subscribe_simple( + &msg, COUNT, + "irc/#", 0, true, + "test.mosquitto.org", 1883, + NULL, 60, true, + NULL, NULL, + NULL, NULL); + + if(rc){ + printf("Error: %s\n", mosquitto_strerror(rc)); + mosquitto_lib_cleanup(); + return rc; + } + + for(i=0; i +#include +#include "mosquitto.h" + +int main(int argc, char *argv[]) +{ + int rc; + struct mosquitto_message *msg; + + mosquitto_lib_init(); + + rc = mosquitto_subscribe_simple( + &msg, 1, + "irc/#", 0, true, + "test.mosquitto.org", 1883, + NULL, 60, true, + NULL, NULL, + NULL, NULL); + + if(rc){ + printf("Error: %s\n", mosquitto_strerror(rc)); + mosquitto_lib_cleanup(); + return rc; + } + + printf("%s %s\n", msg->topic, (char *)msg->payload); + mosquitto_message_free(&msg); + + mosquitto_lib_cleanup(); + + return 0; +} + From 1288b14dcb7ad7806fdf7cdde653330a049c729d Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 09:55:32 +0000 Subject: [PATCH 04/10] Fixes and cleanup for subscribe_simple. --- lib/helpers.c | 28 ++++++++++++++-------------- lib/linker.version | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/helpers.c b/lib/helpers.c index 30db83cf..5f38e3e1 100644 --- a/lib/helpers.c +++ b/lib/helpers.c @@ -20,17 +20,13 @@ Contributors: #include "mosquitto.h" #include "mosquitto_internal.h" -#if 0 - struct mosquitto_message *msg; - msg = mosquitto_subscribe_single("#", 0, 1, true, NULL, 1883, NULL, 60, NULL, NULL, NULL); -#endif - struct subscribe__userdata { const char *topic; struct mosquitto_message *messages; int max_msg_count; int message_count; int qos; + int rc; bool retained; }; @@ -60,6 +56,11 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag userdata->max_msg_count--; rc = mosquitto_message_copy(&userdata->messages[userdata->message_count], message); + if(rc){ + userdata->rc = rc; + mosquitto_disconnect(mosq); + return; + } userdata->message_count++; if(userdata->max_msg_count == 0){ mosquitto_disconnect(mosq); @@ -67,11 +68,6 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag } -void on_log(struct mosquitto *mosq, void *obj, int level, const char *str) -{ - printf("LOG %s\n", str); -} - libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, @@ -92,6 +88,7 @@ libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto *mosq; struct subscribe__userdata userdata; int rc; + int i; if(!topic || msg_count < 1 || !messages){ return MOSQ_ERR_INVAL; @@ -104,6 +101,7 @@ libmosq_EXPORT int mosquitto_subscribe_simple( userdata.max_msg_count = msg_count; userdata.retained = retained; userdata.messages = calloc(sizeof(struct mosquitto_message), msg_count); + userdata.rc = 0; if(!userdata.messages){ return MOSQ_ERR_NOMEM; } @@ -151,7 +149,6 @@ libmosq_EXPORT int mosquitto_subscribe_simple( } } - mosquitto_log_callback_set(mosq, on_log); mosquitto_connect_callback_set(mosq, on_connect); mosquitto_message_callback_set(mosq, on_message); @@ -163,14 +160,17 @@ libmosq_EXPORT int mosquitto_subscribe_simple( return rc; } rc = mosquitto_loop_forever(mosq, -1, 1); - printf("lp:%d\n", rc); mosquitto_destroy(mosq); + if(userdata.rc){ + rc = userdata.rc; + } if(!rc && userdata.max_msg_count == 0){ - printf("*messages: %p\n", userdata.messages); *messages = userdata.messages; return MOSQ_ERR_SUCCESS; }else{ - // FIXME - free messages + for(i=0; i Date: Tue, 26 Jan 2016 15:55:17 +0000 Subject: [PATCH 05/10] Fix merge error. --- src/mosquitto_broker.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 7dbeecea..a8e6202f 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -580,9 +580,9 @@ void service_run(void); * ============================================================ */ #ifdef WITH_WEBSOCKETS # if defined(LWS_LIBRARY_VERSION_NUMBER) -struct lws_context *mosq_websockets_init(struct _mqtt3_listener *listener, int log_level); +struct lws_context *mosq_websockets_init(struct mosquitto__listener *listener, int log_level); # else -struct libwebsocket_context *mosq_websockets_init(struct _mqtt3_listener *listener, int log_level); +struct libwebsocket_context *mosq_websockets_init(struct mosquitto__listener *listener, int log_level); # endif #endif void do_disconnect(struct mosquitto_db *db, struct mosquitto *context); From 0a95c9a3afc88562229d4d2851e4548e148de5c0 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 17:00:08 +0000 Subject: [PATCH 06/10] Add mosquitto_subscribe_callback(). --- examples/subscribe_simple/Makefile | 8 +- examples/subscribe_simple/callback.c | 34 +++++++ lib/cpp/mosquittopp.cpp | 45 +++++++++ lib/cpp/mosquittopp.h | 31 ++++++ lib/helpers.c | 135 ++++++++++++++++++--------- lib/linker.version | 1 + lib/mosquitto.h | 52 +++++++++++ man/libmosquitto.3.xml | 16 ++++ 8 files changed, 275 insertions(+), 47 deletions(-) create mode 100644 examples/subscribe_simple/callback.c diff --git a/examples/subscribe_simple/Makefile b/examples/subscribe_simple/Makefile index f42b1364..f5576c53 100644 --- a/examples/subscribe_simple/Makefile +++ b/examples/subscribe_simple/Makefile @@ -2,7 +2,10 @@ include ../../config.mk .PHONY: all -all : sub_single sub_multiple +all : sub_callback sub_single sub_multiple + +sub_callback : callback.o + ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} sub_single : single.o ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} @@ -10,6 +13,9 @@ sub_single : single.o sub_multiple : multiple.o ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} +callback.o : callback.c ../../lib/libmosquitto.so.${SOVERSION} + ${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS} + single.o : single.c ../../lib/libmosquitto.so.${SOVERSION} ${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS} diff --git a/examples/subscribe_simple/callback.c b/examples/subscribe_simple/callback.c new file mode 100644 index 00000000..647df32c --- /dev/null +++ b/examples/subscribe_simple/callback.c @@ -0,0 +1,34 @@ +#include +#include +#include "mosquitto.h" + +int on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) +{ + printf("%s %s (%d)\n", msg->topic, (const char *)msg->payload, msg->payloadlen); + return 0; +} + + +int main(int argc, char *argv[]) +{ + int rc; + + mosquitto_lib_init(); + + rc = mosquitto_subscribe_callback( + on_message, + "irc/#", 0, NULL, + "test.mosquitto.org", 1883, + NULL, 60, true, + NULL, NULL, + NULL, NULL); + + if(rc){ + printf("Error: %s\n", mosquitto_strerror(rc)); + } + + mosquitto_lib_cleanup(); + + return rc; +} + diff --git a/lib/cpp/mosquittopp.cpp b/lib/cpp/mosquittopp.cpp index 03301812..ce4198c3 100644 --- a/lib/cpp/mosquittopp.cpp +++ b/lib/cpp/mosquittopp.cpp @@ -106,6 +106,51 @@ int topic_matches_sub(const char *sub, const char *topic, bool *result) return mosquitto_topic_matches_sub(sub, topic, result); } +int subscribe_simple( + struct mosquitto_message **messages, + int msg_count, + const char *topic, + int qos, + bool retained, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls) +{ + return mosquitto_subscribe_simple(messages, msg_count, topic, qos, retained, + host, port, client_id, keepalive, clean_session, + username, password, + will, tls); +} + +mosqpp_EXPORT int subscribe_callback( + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + const char *topic, + int qos, + void *userdata, + bool retained, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls) +{ + return mosquitto_subscribe_callback(callback, topic, qos, userdata, retained, + host, port, client_id, keepalive, clean_session, + username, password, + will, tls); +} + + mosquittopp::mosquittopp(const char *id, bool clean_session) { m_mosq = mosquitto_new(id, clean_session, this); diff --git a/lib/cpp/mosquittopp.h b/lib/cpp/mosquittopp.h index d3d6f13e..9876d370 100644 --- a/lib/cpp/mosquittopp.h +++ b/lib/cpp/mosquittopp.h @@ -41,6 +41,37 @@ mosqpp_EXPORT int lib_version(int *major, int *minor, int *revision); mosqpp_EXPORT int lib_init(); mosqpp_EXPORT int lib_cleanup(); mosqpp_EXPORT int topic_matches_sub(const char *sub, const char *topic, bool *result); +mosqpp_EXPORT int subscribe_simple( + struct mosquitto_message **messages, + int msg_count, + const char *topic, + int qos=0, + bool retained=true, + const char *host="localhost", + int port=1883, + const char *client_id=NULL, + int keepalive=60, + bool clean_session=true, + const char *username=NULL, + const char *password=NULL, + const struct libmosquitto_will *will=NULL, + const struct libmosquitto_tls *tls=NULL); + +mosqpp_EXPORT int subscribe_callback( + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + const char *topic, + int qos=0, + void *userdata=NULL, + bool retained=true, + const char *host="localhost", + int port=1883, + const char *client_id=NULL, + int keepalive=60, + bool clean_session=true, + const char *username=NULL, + const char *password=NULL, + const struct libmosquitto_will *will=NULL, + const struct libmosquitto_tls *tls=NULL); /* * Class: mosquittopp diff --git a/lib/helpers.c b/lib/helpers.c index 5f38e3e1..c36cf35f 100644 --- a/lib/helpers.c +++ b/lib/helpers.c @@ -20,55 +20,69 @@ Contributors: #include "mosquitto.h" #include "mosquitto_internal.h" -struct subscribe__userdata { +struct userdata__callback { const char *topic; + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *); + void *userdata; + int qos; + int rc; +}; + +struct userdata__simple { struct mosquitto_message *messages; int max_msg_count; int message_count; - int qos; - int rc; bool retained; }; -void on_connect(struct mosquitto *mosq, void *obj, int rc) +static void on_connect(struct mosquitto *mosq, void *obj, int rc) { - struct subscribe__userdata *userdata = obj; + struct userdata__callback *userdata = obj; mosquitto_subscribe(mosq, NULL, userdata->topic, userdata->qos); } -void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +static void on_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { - struct subscribe__userdata *userdata = obj; + int rc; + struct userdata__callback *userdata = obj; + + rc = userdata->callback(mosq, userdata->userdata, message); + if(rc){ + mosquitto_disconnect(mosq); + } +} + +static int on_message_simple(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +{ + struct userdata__simple *userdata = obj; int rc; if(userdata->max_msg_count == 0){ - return; + return 0; } /* Don't process stale retained messages if 'retained' was false */ if(!userdata->retained && message->retain){ - return; + return 0; } userdata->max_msg_count--; rc = mosquitto_message_copy(&userdata->messages[userdata->message_count], message); if(rc){ - userdata->rc = rc; - mosquitto_disconnect(mosq); - return; + return rc; } userdata->message_count++; if(userdata->max_msg_count == 0){ mosquitto_disconnect(mosq); } + return 0; } - libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, int msg_count, @@ -85,8 +99,7 @@ libmosq_EXPORT int mosquitto_subscribe_simple( const struct libmosquitto_will *will, const struct libmosquitto_tls *tls) { - struct mosquitto *mosq; - struct subscribe__userdata userdata; + struct userdata__simple userdata; int rc; int i; @@ -96,29 +109,72 @@ libmosq_EXPORT int mosquitto_subscribe_simple( *messages = NULL; - userdata.topic = topic; - userdata.qos = qos; - userdata.max_msg_count = msg_count; - userdata.retained = retained; userdata.messages = calloc(sizeof(struct mosquitto_message), msg_count); - userdata.rc = 0; if(!userdata.messages){ return MOSQ_ERR_NOMEM; } userdata.message_count = 0; + userdata.max_msg_count = msg_count; + userdata.retained = retained; - mosq = mosquitto_new(client_id, clean_session, &userdata); - if(!mosq){ + rc = mosquitto_subscribe_callback(on_message_simple, + topic, qos, + &userdata, + host, port, + client_id, keepalive, clean_session, + username, password, + will, tls); + + if(!rc && userdata.max_msg_count == 0){ + *messages = userdata.messages; + return MOSQ_ERR_SUCCESS; + }else{ + for(i=0; itopic, will->payloadlen, will->payload, will->qos, will->retain); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } @@ -126,8 +182,6 @@ libmosq_EXPORT int mosquitto_subscribe_simple( if(username){ rc = mosquitto_username_pw_set(mosq, username, password); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } @@ -135,45 +189,34 @@ libmosq_EXPORT int mosquitto_subscribe_simple( if(tls){ rc = mosquitto_tls_set(mosq, tls->cafile, tls->capath, tls->certfile, tls->keyfile, tls->pw_callback); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } rc = mosquitto_tls_opts_set(mosq, tls->cert_reqs, tls->tls_version, tls->ciphers); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } } mosquitto_connect_callback_set(mosq, on_connect); - mosquitto_message_callback_set(mosq, on_message); + mosquitto_message_callback_set(mosq, on_message_callback); rc = mosquitto_connect(mosq, host, port, keepalive); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } rc = mosquitto_loop_forever(mosq, -1, 1); mosquitto_destroy(mosq); - if(userdata.rc){ - rc = userdata.rc; - } - if(!rc && userdata.max_msg_count == 0){ - *messages = userdata.messages; - return MOSQ_ERR_SUCCESS; - }else{ - for(i=0; i0 - on error. + */ +libmosq_EXPORT int mosquitto_subscribe_callback( + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + const char *topic, + int qos, + void *userdata, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls); #ifdef __cplusplus } #endif diff --git a/man/libmosquitto.3.xml b/man/libmosquitto.3.xml index 9d7bda1d..0961ff96 100644 --- a/man/libmosquitto.3.xml +++ b/man/libmosquitto.3.xml @@ -388,6 +388,22 @@ const struct libmosquitto_will *will const struct libmosquitto_tls *tls + + int mosquitto_subscribe_callback + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *) + const char *topic + int qos + void *userdata + const char *host + int port + const char *client_id + int keepalive + bool clean_session + const char *username + const char *password + const struct libmosquitto_will *will + const struct libmosquitto_tls *tls + From 7709621911c11c2bd1334ced463af8437df01c55 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 17:06:32 +0000 Subject: [PATCH 07/10] Reorder helper function arguments for consistency. --- ChangeLog.txt | 5 ++++- examples/subscribe_simple/callback.c | 4 ++-- examples/subscribe_simple/multiple.c | 4 ++-- examples/subscribe_simple/single.c | 4 ++-- lib/cpp/mosquittopp.cpp | 13 ++++++++----- lib/cpp/mosquittopp.h | 4 ++-- lib/helpers.c | 9 +++++---- lib/mosquitto.h | 8 ++++---- man/libmosquitto.3.xml | 4 ++-- 9 files changed, 31 insertions(+), 24 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 9e2735be..bb090f17 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -31,8 +31,11 @@ Client library: Messages will be retried when a client reconnects. - DNS-SRV support is now disabled by default. - Add mosquitto_subscribe_simple() This is a helper function to make - retrieving messages from a broker very straightforward. Examples of its user + retrieving messages from a broker very straightforward. Examples of its use are in examples/subscribe_simple. +- Add mosquitto_subscribe_callback() This is a helper function to make + processing messages from a broker very straightforward. An example of its use + is in examples/subscribe_simple. Client: - Add -x to mosquitto_sub for printing the payload in hexadecimal format. diff --git a/examples/subscribe_simple/callback.c b/examples/subscribe_simple/callback.c index 647df32c..b66afdba 100644 --- a/examples/subscribe_simple/callback.c +++ b/examples/subscribe_simple/callback.c @@ -16,8 +16,8 @@ int main(int argc, char *argv[]) mosquitto_lib_init(); rc = mosquitto_subscribe_callback( - on_message, - "irc/#", 0, NULL, + on_message, NULL, + "irc/#", 0, "test.mosquitto.org", 1883, NULL, 60, true, NULL, NULL, diff --git a/examples/subscribe_simple/multiple.c b/examples/subscribe_simple/multiple.c index 4938a06e..67dc760d 100644 --- a/examples/subscribe_simple/multiple.c +++ b/examples/subscribe_simple/multiple.c @@ -13,8 +13,8 @@ int main(int argc, char *argv[]) mosquitto_lib_init(); rc = mosquitto_subscribe_simple( - &msg, COUNT, - "irc/#", 0, true, + &msg, COUNT, true, + "irc/#", 0, "test.mosquitto.org", 1883, NULL, 60, true, NULL, NULL, diff --git a/examples/subscribe_simple/single.c b/examples/subscribe_simple/single.c index 44eb440f..b23ebd37 100644 --- a/examples/subscribe_simple/single.c +++ b/examples/subscribe_simple/single.c @@ -10,8 +10,8 @@ int main(int argc, char *argv[]) mosquitto_lib_init(); rc = mosquitto_subscribe_simple( - &msg, 1, - "irc/#", 0, true, + &msg, 1, true, + "irc/#", 0, "test.mosquitto.org", 1883, NULL, 60, true, NULL, NULL, diff --git a/lib/cpp/mosquittopp.cpp b/lib/cpp/mosquittopp.cpp index ce4198c3..541d5760 100644 --- a/lib/cpp/mosquittopp.cpp +++ b/lib/cpp/mosquittopp.cpp @@ -109,9 +109,9 @@ int topic_matches_sub(const char *sub, const char *topic, bool *result) int subscribe_simple( struct mosquitto_message **messages, int msg_count, + bool retained, const char *topic, int qos, - bool retained, const char *host, int port, const char *client_id, @@ -122,7 +122,9 @@ int subscribe_simple( const struct libmosquitto_will *will, const struct libmosquitto_tls *tls) { - return mosquitto_subscribe_simple(messages, msg_count, topic, qos, retained, + return mosquitto_subscribe_simple( + messages, msg_count, retained, + topic, qos, host, port, client_id, keepalive, clean_session, username, password, will, tls); @@ -130,10 +132,9 @@ int subscribe_simple( mosqpp_EXPORT int subscribe_callback( int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + void *userdata, const char *topic, int qos, - void *userdata, - bool retained, const char *host, int port, const char *client_id, @@ -144,7 +145,9 @@ mosqpp_EXPORT int subscribe_callback( const struct libmosquitto_will *will, const struct libmosquitto_tls *tls) { - return mosquitto_subscribe_callback(callback, topic, qos, userdata, retained, + return mosquitto_subscribe_callback( + callback, userdata, + topic, qos, host, port, client_id, keepalive, clean_session, username, password, will, tls); diff --git a/lib/cpp/mosquittopp.h b/lib/cpp/mosquittopp.h index 9876d370..36b05ec4 100644 --- a/lib/cpp/mosquittopp.h +++ b/lib/cpp/mosquittopp.h @@ -44,9 +44,9 @@ mosqpp_EXPORT int topic_matches_sub(const char *sub, const char *topic, bool *re mosqpp_EXPORT int subscribe_simple( struct mosquitto_message **messages, int msg_count, + bool retained, const char *topic, int qos=0, - bool retained=true, const char *host="localhost", int port=1883, const char *client_id=NULL, @@ -59,9 +59,9 @@ mosqpp_EXPORT int subscribe_simple( mosqpp_EXPORT int subscribe_callback( int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + void *userdata, const char *topic, int qos=0, - void *userdata=NULL, bool retained=true, const char *host="localhost", int port=1883, diff --git a/lib/helpers.c b/lib/helpers.c index c36cf35f..7071ccf2 100644 --- a/lib/helpers.c +++ b/lib/helpers.c @@ -86,9 +86,9 @@ static int on_message_simple(struct mosquitto *mosq, void *obj, const struct mos libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, int msg_count, + bool retained, const char *topic, int qos, - bool retained, const char *host, int port, const char *client_id, @@ -117,9 +117,9 @@ libmosq_EXPORT int mosquitto_subscribe_simple( userdata.max_msg_count = msg_count; userdata.retained = retained; - rc = mosquitto_subscribe_callback(on_message_simple, + rc = mosquitto_subscribe_callback( + on_message_simple, &userdata, topic, qos, - &userdata, host, port, client_id, keepalive, clean_session, username, password, @@ -138,11 +138,12 @@ libmosq_EXPORT int mosquitto_subscribe_simple( } } + libmosq_EXPORT int mosquitto_subscribe_callback( int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + void *userdata, const char *topic, int qos, - void *userdata, const char *host, int port, const char *client_id, diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 0d8ca988..1babfedc 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -1575,9 +1575,9 @@ struct libmosquitto_tls { * messages will be returned here. On error, this will be set to * NULL. * msg_count - the number of messages to retrieve. + * retained - if set to true, stale retained messages will be treated as * topic - the subscription topic to use (wildcards are allowed). * qos - the qos to use for the subscription. - * retained - if set to true, stale retained messages will be treated as * normal messages with regards to msg_count. If set to false, * they will be ignored. * host - the broker to connect to. @@ -1601,9 +1601,9 @@ struct libmosquitto_tls { libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, int msg_count, + bool retained, const char *topic, int qos, - bool retained, const char *host, int port, const char *client_id, @@ -1630,9 +1630,9 @@ libmosq_EXPORT int mosquitto_subscribe_simple( * int callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) * Note that this is the same as the normal on_message callback, * except that it returns an int. + * userdata - user provided pointer that will be passed to the callback. * topic - the subscription topic to use (wildcards are allowed). * qos - the qos to use for the subscription. - * userdata - user provided pointer that will be passed to the callback. * host - the broker to connect to. * port - the network port the broker is listening on. * client_id - the client id to use, or NULL if a random client id should be @@ -1653,9 +1653,9 @@ libmosq_EXPORT int mosquitto_subscribe_simple( */ libmosq_EXPORT int mosquitto_subscribe_callback( int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + void *userdata, const char *topic, int qos, - void *userdata, const char *host, int port, const char *client_id, diff --git a/man/libmosquitto.3.xml b/man/libmosquitto.3.xml index 0961ff96..70e44cb3 100644 --- a/man/libmosquitto.3.xml +++ b/man/libmosquitto.3.xml @@ -375,9 +375,9 @@ int mosquitto_subscribe_simple struct mosquitto_message **message int msg_count + bool retained const char *topic intqos - bool retained const char *host int port const char *client_id @@ -391,9 +391,9 @@ int mosquitto_subscribe_callback int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *) + void *userdata const char *topic int qos - void *userdata const char *host int port const char *client_id From 4e4c08aaf02a92bab82ace769b117b21b6f92984 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 17:10:54 +0000 Subject: [PATCH 08/10] "retained" -> "want_retained". --- lib/helpers.c | 10 +++++----- lib/mosquitto.h | 8 ++++---- man/libmosquitto.3.xml | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/helpers.c b/lib/helpers.c index 7071ccf2..c2693a5b 100644 --- a/lib/helpers.c +++ b/lib/helpers.c @@ -32,7 +32,7 @@ struct userdata__simple { struct mosquitto_message *messages; int max_msg_count; int message_count; - bool retained; + bool want_retained; }; @@ -64,8 +64,8 @@ static int on_message_simple(struct mosquitto *mosq, void *obj, const struct mos return 0; } - /* Don't process stale retained messages if 'retained' was false */ - if(!userdata->retained && message->retain){ + /* Don't process stale retained messages if 'want_retained' was false */ + if(!userdata->want_retained && message->retain){ return 0; } @@ -86,7 +86,7 @@ static int on_message_simple(struct mosquitto *mosq, void *obj, const struct mos libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, int msg_count, - bool retained, + bool want_retained, const char *topic, int qos, const char *host, @@ -115,7 +115,7 @@ libmosq_EXPORT int mosquitto_subscribe_simple( } userdata.message_count = 0; userdata.max_msg_count = msg_count; - userdata.retained = retained; + userdata.want_retained = want_retained; rc = mosquitto_subscribe_callback( on_message_simple, &userdata, diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 1babfedc..014c29e1 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -1575,11 +1575,11 @@ struct libmosquitto_tls { * messages will be returned here. On error, this will be set to * NULL. * msg_count - the number of messages to retrieve. - * retained - if set to true, stale retained messages will be treated as + * want_retained - if set to true, stale retained messages will be treated as + * normal messages with regards to msg_count. If set to + * false, they will be ignored. * topic - the subscription topic to use (wildcards are allowed). * qos - the qos to use for the subscription. - * normal messages with regards to msg_count. If set to false, - * they will be ignored. * host - the broker to connect to. * port - the network port the broker is listening on. * client_id - the client id to use, or NULL if a random client id should be @@ -1601,7 +1601,7 @@ struct libmosquitto_tls { libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, int msg_count, - bool retained, + bool want_retained, const char *topic, int qos, const char *host, diff --git a/man/libmosquitto.3.xml b/man/libmosquitto.3.xml index 70e44cb3..0048ee4a 100644 --- a/man/libmosquitto.3.xml +++ b/man/libmosquitto.3.xml @@ -375,7 +375,7 @@ int mosquitto_subscribe_simple struct mosquitto_message **message int msg_count - bool retained + bool want_retained const char *topic intqos const char *host From 1961404ec90fd7c93fde1c3694e1f08346e2060b Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 2 Feb 2016 20:57:37 +0000 Subject: [PATCH 09/10] Add --retained-only to mosquitto_sub. --- ChangeLog.txt | 2 ++ client/client_shared.c | 5 +++++ client/client_shared.h | 1 + client/sub_client.c | 15 ++++++++++++++- man/mosquitto_sub.1.xml | 16 +++++++++++++++- 5 files changed, 37 insertions(+), 2 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index bb090f17..9ac4da30 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -40,6 +40,8 @@ Client library: Client: - Add -x to mosquitto_sub for printing the payload in hexadecimal format. - Add -U to mosquitto_sub for unsubscribing from topics. +- Add --retained-only to mosquitto_sub to exit after receiving all retained + messages. 1.4.7 - 20151221 diff --git a/client/client_shared.c b/client/client_shared.c index 82017a5b..212b7301 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -501,6 +501,11 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c goto unknown_option; } cfg->retain = 1; + }else if(!strcmp(argv[i], "--retained-only")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->retained_only = true; }else if(!strcmp(argv[i], "-s") || !strcmp(argv[i], "--stdin-file")){ if(pub_or_sub == CLIENT_SUB){ goto unknown_option; diff --git a/client/client_shared.h b/client/client_shared.h index 04d42346..acf37325 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -75,6 +75,7 @@ struct mosq_config { char **topics; /* sub */ int topic_count; /* sub */ bool no_retain; /* sub */ + bool retained_only; /* sub */ char **filter_outs; /* sub */ int filter_out_count; /* sub */ char **unsub_topics; /* sub */ diff --git a/client/sub_client.c b/client/sub_client.c index e4868449..a1a3bf04 100644 --- a/client/sub_client.c +++ b/client/sub_client.c @@ -57,6 +57,12 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit assert(obj); cfg = (struct mosq_config *)obj; + if(cfg->retained_only && !message->retain && process_messages){ + process_messages = false; + mosquitto_disconnect(mosq); + return; + } + if(message->retain && cfg->no_retain) return; if(cfg->filter_outs){ for(i=0; ifilter_out_count; i++){ @@ -146,7 +152,7 @@ void print_usage(void) printf("mosquitto_sub is a simple mqtt client that will subscribe to a single topic and print all messages it receives.\n"); printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision); printf("Usage: mosquitto_sub [-c] [-h host] [-k keepalive] [-p port] [-q qos] [-R] {-t topic ... | -U topic ...}\n"); - printf(" [-C msg_count] [-T filter_out]\n"); + printf(" [-C msg_count] [--retained-only] [-T filter_out]\n"); #ifdef WITH_SRV printf(" [-A bind_address] [-S]\n"); #else @@ -195,6 +201,8 @@ void print_usage(void) printf(" -x : print published message payloads as hexadecimal data.\n"); printf(" --help : display this message.\n"); printf(" --quiet : don't print error messages.\n"); + printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n"); + printf(" first non-retained message is received.\n"); printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n"); printf(" unexpected disconnection. If not given and will-topic is set, a zero\n"); printf(" length message will be sent.\n"); @@ -247,6 +255,11 @@ int main(int argc, char *argv[]) return 1; } + if(cfg.no_retain && cfg.retained_only){ + fprintf(stderr, "\nError: Combining '-R' and '--retained-only' makes no sense.\n"); + return 1; + } + mosquitto_lib_init(); if(client_id_generate(&cfg, "mosqsub")){ diff --git a/man/mosquitto_sub.1.xml b/man/mosquitto_sub.1.xml index d31d1046..68c716ec 100644 --- a/man/mosquitto_sub.1.xml +++ b/man/mosquitto_sub.1.xml @@ -27,7 +27,10 @@ keepalive time port number message QoS - + + + + @@ -354,6 +357,17 @@ their display. + + + + If this argument is given, only messages that are + received that have the retain bit set will be printed. + Messages with retain set are "stale", in that it is not + known when they were originally published. With this + argument in use, the receipt of the first non-stale + message will cause the client to exit. + + From 26b015908e5c4ff8a2a365eba176f0c27779d1a7 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Fri, 5 Feb 2016 21:22:42 +0000 Subject: [PATCH 10/10] Default to using MQTT v3.1.1. --- ChangeLog.txt | 3 +++ lib/mosquitto.c | 2 +- man/mosquitto_pub.1.xml | 4 ++-- man/mosquitto_sub.1.xml | 4 ++-- mosquitto.conf | 4 ++-- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 9ac4da30..dc8adc89 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -25,6 +25,7 @@ Broker: - Minimum supported libwebsockets version is now 1.3. - Support for Windows XP has been dropped. - Miscellaneous fixes on Windows. +- Bridge connections now default to using MQTT v3.1.1. Client library: - Outgoing messages with QoS>1 are no longer retried after a timeout period. @@ -36,12 +37,14 @@ Client library: - Add mosquitto_subscribe_callback() This is a helper function to make processing messages from a broker very straightforward. An example of its use is in examples/subscribe_simple. +- Connections now default to using MQTT v3.1.1. Client: - Add -x to mosquitto_sub for printing the payload in hexadecimal format. - Add -U to mosquitto_sub for unsubscribing from topics. - Add --retained-only to mosquitto_sub to exit after receiving all retained messages. +- Connections now default to using MQTT v3.1.1. 1.4.7 - 20151221 diff --git a/lib/mosquitto.c b/lib/mosquitto.c index fc5d736a..e90346c7 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -140,7 +140,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se }else{ mosq->userdata = mosq; } - mosq->protocol = mosq_p_mqtt31; + mosq->protocol = mosq_p_mqtt311; mosq->sock = INVALID_SOCKET; mosq->sockpairR = INVALID_SOCKET; mosq->sockpairW = INVALID_SOCKET; diff --git a/man/mosquitto_pub.1.xml b/man/mosquitto_pub.1.xml index c896dc33..567111d5 100644 --- a/man/mosquitto_pub.1.xml +++ b/man/mosquitto_pub.1.xml @@ -383,8 +383,8 @@ Specify which version of the MQTT protocol should be used when connecting to the rmeote broker. Can be - or . - Defaults to . + or . + Defaults to . diff --git a/man/mosquitto_sub.1.xml b/man/mosquitto_sub.1.xml index 68c716ec..7b1e5d2e 100644 --- a/man/mosquitto_sub.1.xml +++ b/man/mosquitto_sub.1.xml @@ -479,8 +479,8 @@ Specify which version of the MQTT protocol should be used when connecting to the rmeote broker. Can be - or . - Defaults to . + or . + Defaults to . diff --git a/mosquitto.conf b/mosquitto.conf index 7eed801c..4d9cdef2 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -650,8 +650,8 @@ #topic [[[out | in | both] qos-level] local-prefix remote-prefix] # Set the version of the MQTT protocol to use with for this bridge. Can be one -# of mqttv31 or mqttv311. Defaults to mqttv31. -#bridge_protocol_version mqttv31 +# of mqttv311 or mqttv11. Defaults to mqttv311. +#bridge_protocol_version mqttv311 # If a bridge has topics that have "out" direction, the default behaviour is to # send an unsubscribe request to the remote broker on that topic. This means