From 4995436b5a3e5d7245fbb3c3f53f5989b85e5be5 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 11 Apr 2019 19:13:59 +0100 Subject: [PATCH] Add --repeat and --repeat-delay to mosquitto_pub. --- ChangeLog.txt | 2 ++ client/client_shared.c | 37 ++++++++++++++++++++++++ client/client_shared.h | 3 ++ client/pub_client.c | 63 +++++++++++++++++++++++++++++++++++++++-- man/mosquitto_pub.1.xml | 29 +++++++++++++++++++ 5 files changed, 132 insertions(+), 2 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 3980e2e8..61245eaf 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -39,6 +39,8 @@ Client features: session without requiring a message to be received. - Add --remove-retained to mosquitto_sub, which can be used to clear retained messages on a broker. +- Add --repeat and --repeat-delay to mosquitto_pub, which can be used to + repeat single message publishes at a regular interval. - -V now accepts `5, `311`, `31`, as well as `mqttv5` etc. - Add TLS Engine support. - Add support for ALPN on TLS connections. Closes #924. diff --git a/client/client_shared.c b/client/client_shared.c index 1dea07ec..cc85d37e 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -129,6 +129,9 @@ void init_config(struct mosq_config *cfg, int pub_or_sub) cfg->keepalive = 60; cfg->clean_session = true; cfg->eol = true; + cfg->repeat_count = 1; + cfg->repeat_delay.tv_sec = 0; + cfg->repeat_delay.tv_usec = 0; if(pub_or_sub == CLIENT_RR){ cfg->protocol_version = MQTT_PROTOCOL_V5; cfg->msg_count = 1; @@ -445,6 +448,7 @@ int cfg_add_topic(struct mosq_config *cfg, int type, char *topic, const char *ar int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]) { int i; + float f; for(i=1; iremove_retained = true; + }else if(!strcmp(argv[i], "--repeat")){ + if(pub_or_sub != CLIENT_PUB){ + goto unknown_option; + } + if(i==argc-1){ + fprintf(stderr, "Error: --repeat argument given but no count specified.\n\n"); + return 1; + }else{ + cfg->repeat_count = atoi(argv[i+1]); + if(cfg->repeat_count < 1){ + fprintf(stderr, "Error: --repeat argument must be >0.\n\n"); + return 1; + } + } + i++; + }else if(!strcmp(argv[i], "--repeat-delay")){ + if(pub_or_sub != CLIENT_PUB){ + goto unknown_option; + } + if(i==argc-1){ + fprintf(stderr, "Error: --repeat-delay argument given but no time specified.\n\n"); + return 1; + }else{ + f = atof(argv[i+1]); + if(f < 0.0f){ + fprintf(stderr, "Error: --repeat-delay argument must be >=0.0.\n\n"); + return 1; + } + f *= 1.0e6; + cfg->repeat_delay.tv_sec = (int)f/1e6; + cfg->repeat_delay.tv_usec = (int)f%1000000; + } + i++; }else if(!strcmp(argv[i], "--retain-as-published")){ if(pub_or_sub == CLIENT_PUB){ goto unknown_option; diff --git a/client/client_shared.h b/client/client_shared.h index 46592a23..ca8ae640 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -18,6 +18,7 @@ Contributors: #define CLIENT_CONFIG_H #include +#include /* pub_client.c modes */ #define MSGMODE_NONE 0 @@ -47,6 +48,8 @@ struct mosq_config { long msglen; /* pub, rr */ char *topic; /* pub, rr */ char *bind_address; + int repeat_count; /* pub */ + struct timeval repeat_delay; /* pub */ #ifdef WITH_SRV bool use_srv; #endif diff --git a/client/pub_client.c b/client/pub_client.c index e1ee2cbe..3f8020cc 100644 --- a/client/pub_client.c +++ b/client/pub_client.c @@ -43,7 +43,35 @@ static char *line_buf = NULL; static int line_buf_len = 1024; static bool connected = true; static bool disconnect_sent = false; +static int publish_count = 0; +static bool ready_for_repeat = false; +static struct timeval next_publish_tv; +static void set_repeat_time(void) +{ + gettimeofday(&next_publish_tv, NULL); + next_publish_tv.tv_sec += cfg.repeat_delay.tv_sec; + next_publish_tv.tv_usec += cfg.repeat_delay.tv_usec; + + next_publish_tv.tv_sec += next_publish_tv.tv_usec/1e6; + next_publish_tv.tv_usec = next_publish_tv.tv_usec%1000000; +} + +static int check_repeat_time(void) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + if(tv.tv_sec > next_publish_tv.tv_sec){ + return 1; + }else if(tv.tv_sec == next_publish_tv.tv_sec + && tv.tv_usec > next_publish_tv.tv_usec){ + + return 1; + } + return 0; +} void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties) { @@ -57,6 +85,7 @@ void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mos int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain) { + ready_for_repeat = false; if(cfg.protocol_version == MQTT_PROTOCOL_V5 && cfg.have_topic_alias && first_publish == false){ return mosquitto_publish_v5(mosq, mid, NULL, payloadlen, payload, qos, retain, cfg.publish_props); }else{ @@ -134,11 +163,16 @@ void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_ if(reason_code > 127){ if(!cfg.quiet) fprintf(stderr, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code)); } + publish_count++; + if(cfg.pub_mode == MSGMODE_STDIN_LINE){ if(mid == last_mid){ mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); disconnect_sent = true; } + }else if(publish_count < cfg.repeat_count){ + ready_for_repeat = true; + set_repeat_time(); }else if(disconnect_sent == false){ mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); disconnect_sent = true; @@ -165,6 +199,11 @@ int pub_shared_loop(struct mosquitto *mosq) char *buf2; int buf_len_actual; int mode; + int loop_delay = 1000; + + if(cfg.repeat_count > 1 && (cfg.repeat_delay.tv_sec == 0 || cfg.repeat_delay.tv_usec != 0)){ + loop_delay = cfg.repeat_delay.tv_usec / 2000; + } mode = cfg.pub_mode; @@ -226,7 +265,25 @@ int pub_shared_loop(struct mosquitto *mosq) } rc = MOSQ_ERR_SUCCESS; }else{ - rc = mosquitto_loop(mosq, -1, 1); + rc = mosquitto_loop(mosq, loop_delay, 1); + if(ready_for_repeat && check_repeat_time()){ + rc = 0; + switch(cfg.pub_mode){ + case MSGMODE_CMD: + case MSGMODE_FILE: + case MSGMODE_STDIN_FILE: + rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain); + break; + case MSGMODE_NULL: + rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain); + break; + case MSGMODE_STDIN_LINE: + break; + } + if(rc){ + fprintf(stderr, "Error sending repeat publish: %s", mosquitto_strerror(rc)); + } + } } }while(rc == MOSQ_ERR_SUCCESS && connected); @@ -252,7 +309,7 @@ void print_usage(void) printf("mosquitto_pub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision); printf("Usage: mosquitto_pub {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL}\n"); printf(" {-f file | -l | -n | -m message}\n"); - printf(" [-c] [-k keepalive] [-q qos] [-r]\n"); + printf(" [-c] [-k keepalive] [-q qos] [-r] [--repeat N] [--repeat-delay time]\n"); #ifdef WITH_SRV printf(" [-A bind_address] [-S]\n"); #else @@ -307,6 +364,8 @@ void print_usage(void) printf(" -V : specify the version of the MQTT protocol to use when connecting.\n"); printf(" Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311.\n"); printf(" --help : display this message.\n"); + printf(" --repeat : if publish mode is -f, -m, or -s, then repeat the publish N times.\n"); + printf(" --repeat-delay : if using --repeat, wait time seconds between publishes. Defaults to 0.\n"); printf(" --quiet : don't print error messages.\n"); printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n"); printf(" unexpected disconnection. If not given and will-topic is set, a zero\n"); diff --git a/man/mosquitto_pub.1.xml b/man/mosquitto_pub.1.xml index 0e2fdf1d..46bf09ba 100644 --- a/man/mosquitto_pub.1.xml +++ b/man/mosquitto_pub.1.xml @@ -39,6 +39,8 @@ message-QoS + count + seconds file @@ -431,6 +433,33 @@ If retain is given, the message will be retained as a "last known good" value on the broker. See mqtt7 for more information. + + + + If the publish mode is, + , or (i.e. the modes + where only a single message is sent), then + can be used to specify that the + message will be published multiple times. + See also . + + + + + + If using , then the default + behaviour is to publish repeated messages as soon as the + previous message is delivered. Use + to specify the number of + seconds to wait after the previous message was delivered + before publishing the next. Does not need to be an integer + number of seconds. + Note that there is no guarantee as to the actual interval + between messages, this option simply defines the minimum + time from delivery of one message to the start of the + publish of the next. + +