From 1ce1bce941ffce6f029b34d5e3db6b4d3ce5c360 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Sat, 2 Mar 2019 22:14:54 +0000 Subject: [PATCH] Add --remove-retained to mosquitto_sub This can be used to clear retained messages on a broker. --- ChangeLog.txt | 2 ++ client/client_shared.c | 5 +++++ client/client_shared.h | 1 + client/sub_client.c | 25 ++++++++++++++++++++++--- man/mosquitto_sub.1.xml | 31 +++++++++++++++++++++++++++++++ 5 files changed, 61 insertions(+), 3 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index e850c2c5..8ed8ad2b 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -28,6 +28,8 @@ Client features: - Add -E to mosquitto_sub, which causes it to exit immediately after having its subscriptions acknowledged. Use with -c to create a durable client session without requiring a message to be received. +- Add --remove-retained to mosquitto_sub, which can be used to clear retained + messages on a broker. - -V now accepts `5, `311`, `31`, as well as `mqttv5` etc. - Add TLS Engine support. - Add explicit support for TLS v1.3. diff --git a/client/client_shared.c b/client/client_shared.c index 1cee5aa5..585b0996 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -803,6 +803,11 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c i++; }else if(!strcmp(argv[i], "--quiet")){ cfg->quiet = true; + }else if(!strcmp(argv[i], "--remove-retained")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + } + cfg->remove_retained = true; }else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--retain")){ if(pub_or_sub == CLIENT_SUB){ goto unknown_option; diff --git a/client/client_shared.h b/client/client_shared.h index f44097cd..766b94d3 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -80,6 +80,7 @@ struct mosq_config { bool exit_after_sub; /* sub */ bool no_retain; /* sub */ bool retained_only; /* sub */ + bool remove_retained; /* sub */ char **filter_outs; /* sub */ int filter_out_count; /* sub */ char **unsub_topics; /* sub */ diff --git a/client/sub_client.c b/client/sub_client.c index f11f0844..5c2002da 100644 --- a/client/sub_client.c +++ b/client/sub_client.c @@ -39,6 +39,7 @@ static struct mosq_config cfg; bool process_messages = true; int msg_count = 0; struct mosquitto *mosq = NULL; +int last_mid = 0; #ifndef WIN32 void my_signal_handler(int signum) @@ -53,6 +54,14 @@ void my_signal_handler(int signum) void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties) +{ + if(process_messages == false && (mid == last_mid || last_mid == 0)){ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } +} + + void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties) { int i; @@ -60,9 +69,15 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit if(process_messages == false) return; + if(cfg.remove_retained && message->retain){ + mosquitto_publish(mosq, &last_mid, message->topic, 0, NULL, 1, true); + } + if(cfg.retained_only && !message->retain && process_messages){ process_messages = false; - mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + if(last_mid == 0){ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } return; } @@ -80,7 +95,9 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit msg_count++; if(cfg.msg_count == msg_count){ process_messages = false; - mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + if(last_mid == 0){ + mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props); + } } } } @@ -136,7 +153,7 @@ void print_usage(void) printf("mosquitto_sub version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision); printf("Usage: mosquitto_sub {[-h host] [-p port] [-u username [-P password]] -t topic | -L URL [-t topic]}\n"); printf(" [-c] [-k keepalive] [-q qos]\n"); - printf(" [-C msg_count] [-E] [-R] [--retained-only] [-T filter_out] [-U topic ...]\n"); + printf(" [-C msg_count] [-E] [-R] [--retained-only] [--remove-retained] [-T filter_out] [-U topic ...]\n"); printf(" [-F format]\n"); #ifndef WIN32 printf(" [-W timeout_secs]\n"); @@ -199,6 +216,8 @@ void print_usage(void) printf(" --quiet : don't print error messages.\n"); printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n"); printf(" first non-retained message is received.\n"); + printf(" --remove-retained : send a message to the server to clear any received retained messages\n"); + printf(" Use -T to filter out messages you do not want to be cleared.\n"); printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n"); printf(" unexpected disconnection. If not given and will-topic is set, a zero\n"); printf(" length message will be sent.\n"); diff --git a/man/mosquitto_sub.1.xml b/man/mosquitto_sub.1.xml index 824fe9c4..162fc09c 100644 --- a/man/mosquitto_sub.1.xml +++ b/man/mosquitto_sub.1.xml @@ -43,6 +43,7 @@ keepalive-time message-QoS + @@ -471,6 +472,36 @@ their display. + + + + If this argument is given, the when mosquitto_sub + receives a message with the retained bit set, it will + send a message to the broker to clear that retained + message. This applies to all received messages except + those that are filtered out by the + option. This option still takes effect even if + is used. See also the + and + options. + + + Remove all retained messages on the server, + assuming we have access to do so, and then exit: + + +mosquitto_sub -t '#' --remove-retained --retained-only + + + + Remove a whole tree, with the exception of a + single topic: + + +mosquitto_sub -t 'bbc/#' -T bbc/bbc1 --remove-retained + + +