From 0a95c9a3afc88562229d4d2851e4548e148de5c0 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 26 Jan 2016 17:00:08 +0000 Subject: [PATCH] Add mosquitto_subscribe_callback(). --- examples/subscribe_simple/Makefile | 8 +- examples/subscribe_simple/callback.c | 34 +++++++ lib/cpp/mosquittopp.cpp | 45 +++++++++ lib/cpp/mosquittopp.h | 31 ++++++ lib/helpers.c | 135 ++++++++++++++++++--------- lib/linker.version | 1 + lib/mosquitto.h | 52 +++++++++++ man/libmosquitto.3.xml | 16 ++++ 8 files changed, 275 insertions(+), 47 deletions(-) create mode 100644 examples/subscribe_simple/callback.c diff --git a/examples/subscribe_simple/Makefile b/examples/subscribe_simple/Makefile index f42b1364..f5576c53 100644 --- a/examples/subscribe_simple/Makefile +++ b/examples/subscribe_simple/Makefile @@ -2,7 +2,10 @@ include ../../config.mk .PHONY: all -all : sub_single sub_multiple +all : sub_callback sub_single sub_multiple + +sub_callback : callback.o + ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} sub_single : single.o ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} @@ -10,6 +13,9 @@ sub_single : single.o sub_multiple : multiple.o ${CROSS_COMPILE}${CC} $^ -o $@ ../../lib/libmosquitto.so.${SOVERSION} +callback.o : callback.c ../../lib/libmosquitto.so.${SOVERSION} + ${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS} + single.o : single.c ../../lib/libmosquitto.so.${SOVERSION} ${CROSS_COMPILE}${CC} -c $< -o $@ -I../../lib ${CFLAGS} diff --git a/examples/subscribe_simple/callback.c b/examples/subscribe_simple/callback.c new file mode 100644 index 00000000..647df32c --- /dev/null +++ b/examples/subscribe_simple/callback.c @@ -0,0 +1,34 @@ +#include +#include +#include "mosquitto.h" + +int on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) +{ + printf("%s %s (%d)\n", msg->topic, (const char *)msg->payload, msg->payloadlen); + return 0; +} + + +int main(int argc, char *argv[]) +{ + int rc; + + mosquitto_lib_init(); + + rc = mosquitto_subscribe_callback( + on_message, + "irc/#", 0, NULL, + "test.mosquitto.org", 1883, + NULL, 60, true, + NULL, NULL, + NULL, NULL); + + if(rc){ + printf("Error: %s\n", mosquitto_strerror(rc)); + } + + mosquitto_lib_cleanup(); + + return rc; +} + diff --git a/lib/cpp/mosquittopp.cpp b/lib/cpp/mosquittopp.cpp index 03301812..ce4198c3 100644 --- a/lib/cpp/mosquittopp.cpp +++ b/lib/cpp/mosquittopp.cpp @@ -106,6 +106,51 @@ int topic_matches_sub(const char *sub, const char *topic, bool *result) return mosquitto_topic_matches_sub(sub, topic, result); } +int subscribe_simple( + struct mosquitto_message **messages, + int msg_count, + const char *topic, + int qos, + bool retained, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls) +{ + return mosquitto_subscribe_simple(messages, msg_count, topic, qos, retained, + host, port, client_id, keepalive, clean_session, + username, password, + will, tls); +} + +mosqpp_EXPORT int subscribe_callback( + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + const char *topic, + int qos, + void *userdata, + bool retained, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls) +{ + return mosquitto_subscribe_callback(callback, topic, qos, userdata, retained, + host, port, client_id, keepalive, clean_session, + username, password, + will, tls); +} + + mosquittopp::mosquittopp(const char *id, bool clean_session) { m_mosq = mosquitto_new(id, clean_session, this); diff --git a/lib/cpp/mosquittopp.h b/lib/cpp/mosquittopp.h index d3d6f13e..9876d370 100644 --- a/lib/cpp/mosquittopp.h +++ b/lib/cpp/mosquittopp.h @@ -41,6 +41,37 @@ mosqpp_EXPORT int lib_version(int *major, int *minor, int *revision); mosqpp_EXPORT int lib_init(); mosqpp_EXPORT int lib_cleanup(); mosqpp_EXPORT int topic_matches_sub(const char *sub, const char *topic, bool *result); +mosqpp_EXPORT int subscribe_simple( + struct mosquitto_message **messages, + int msg_count, + const char *topic, + int qos=0, + bool retained=true, + const char *host="localhost", + int port=1883, + const char *client_id=NULL, + int keepalive=60, + bool clean_session=true, + const char *username=NULL, + const char *password=NULL, + const struct libmosquitto_will *will=NULL, + const struct libmosquitto_tls *tls=NULL); + +mosqpp_EXPORT int subscribe_callback( + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + const char *topic, + int qos=0, + void *userdata=NULL, + bool retained=true, + const char *host="localhost", + int port=1883, + const char *client_id=NULL, + int keepalive=60, + bool clean_session=true, + const char *username=NULL, + const char *password=NULL, + const struct libmosquitto_will *will=NULL, + const struct libmosquitto_tls *tls=NULL); /* * Class: mosquittopp diff --git a/lib/helpers.c b/lib/helpers.c index 5f38e3e1..c36cf35f 100644 --- a/lib/helpers.c +++ b/lib/helpers.c @@ -20,55 +20,69 @@ Contributors: #include "mosquitto.h" #include "mosquitto_internal.h" -struct subscribe__userdata { +struct userdata__callback { const char *topic; + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *); + void *userdata; + int qos; + int rc; +}; + +struct userdata__simple { struct mosquitto_message *messages; int max_msg_count; int message_count; - int qos; - int rc; bool retained; }; -void on_connect(struct mosquitto *mosq, void *obj, int rc) +static void on_connect(struct mosquitto *mosq, void *obj, int rc) { - struct subscribe__userdata *userdata = obj; + struct userdata__callback *userdata = obj; mosquitto_subscribe(mosq, NULL, userdata->topic, userdata->qos); } -void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +static void on_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { - struct subscribe__userdata *userdata = obj; + int rc; + struct userdata__callback *userdata = obj; + + rc = userdata->callback(mosq, userdata->userdata, message); + if(rc){ + mosquitto_disconnect(mosq); + } +} + +static int on_message_simple(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +{ + struct userdata__simple *userdata = obj; int rc; if(userdata->max_msg_count == 0){ - return; + return 0; } /* Don't process stale retained messages if 'retained' was false */ if(!userdata->retained && message->retain){ - return; + return 0; } userdata->max_msg_count--; rc = mosquitto_message_copy(&userdata->messages[userdata->message_count], message); if(rc){ - userdata->rc = rc; - mosquitto_disconnect(mosq); - return; + return rc; } userdata->message_count++; if(userdata->max_msg_count == 0){ mosquitto_disconnect(mosq); } + return 0; } - libmosq_EXPORT int mosquitto_subscribe_simple( struct mosquitto_message **messages, int msg_count, @@ -85,8 +99,7 @@ libmosq_EXPORT int mosquitto_subscribe_simple( const struct libmosquitto_will *will, const struct libmosquitto_tls *tls) { - struct mosquitto *mosq; - struct subscribe__userdata userdata; + struct userdata__simple userdata; int rc; int i; @@ -96,29 +109,72 @@ libmosq_EXPORT int mosquitto_subscribe_simple( *messages = NULL; - userdata.topic = topic; - userdata.qos = qos; - userdata.max_msg_count = msg_count; - userdata.retained = retained; userdata.messages = calloc(sizeof(struct mosquitto_message), msg_count); - userdata.rc = 0; if(!userdata.messages){ return MOSQ_ERR_NOMEM; } userdata.message_count = 0; + userdata.max_msg_count = msg_count; + userdata.retained = retained; - mosq = mosquitto_new(client_id, clean_session, &userdata); - if(!mosq){ + rc = mosquitto_subscribe_callback(on_message_simple, + topic, qos, + &userdata, + host, port, + client_id, keepalive, clean_session, + username, password, + will, tls); + + if(!rc && userdata.max_msg_count == 0){ + *messages = userdata.messages; + return MOSQ_ERR_SUCCESS; + }else{ + for(i=0; itopic, will->payloadlen, will->payload, will->qos, will->retain); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } @@ -126,8 +182,6 @@ libmosq_EXPORT int mosquitto_subscribe_simple( if(username){ rc = mosquitto_username_pw_set(mosq, username, password); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } @@ -135,45 +189,34 @@ libmosq_EXPORT int mosquitto_subscribe_simple( if(tls){ rc = mosquitto_tls_set(mosq, tls->cafile, tls->capath, tls->certfile, tls->keyfile, tls->pw_callback); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } rc = mosquitto_tls_opts_set(mosq, tls->cert_reqs, tls->tls_version, tls->ciphers); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } } mosquitto_connect_callback_set(mosq, on_connect); - mosquitto_message_callback_set(mosq, on_message); + mosquitto_message_callback_set(mosq, on_message_callback); rc = mosquitto_connect(mosq, host, port, keepalive); if(rc){ - free(userdata.messages); - userdata.messages = NULL; mosquitto_destroy(mosq); return rc; } rc = mosquitto_loop_forever(mosq, -1, 1); mosquitto_destroy(mosq); - if(userdata.rc){ - rc = userdata.rc; - } - if(!rc && userdata.max_msg_count == 0){ - *messages = userdata.messages; - return MOSQ_ERR_SUCCESS; - }else{ - for(i=0; i0 - on error. + */ +libmosq_EXPORT int mosquitto_subscribe_callback( + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *), + const char *topic, + int qos, + void *userdata, + const char *host, + int port, + const char *client_id, + int keepalive, + bool clean_session, + const char *username, + const char *password, + const struct libmosquitto_will *will, + const struct libmosquitto_tls *tls); #ifdef __cplusplus } #endif diff --git a/man/libmosquitto.3.xml b/man/libmosquitto.3.xml index 9d7bda1d..0961ff96 100644 --- a/man/libmosquitto.3.xml +++ b/man/libmosquitto.3.xml @@ -388,6 +388,22 @@ const struct libmosquitto_will *will const struct libmosquitto_tls *tls + + int mosquitto_subscribe_callback + int (*callback)(struct mosquitto *, void *, const struct mosquitto_message *) + const char *topic + int qos + void *userdata + const char *host + int port + const char *client_id + int keepalive + bool clean_session + const char *username + const char *password + const struct libmosquitto_will *will + const struct libmosquitto_tls *tls +