diff --git a/client/client_shared.c b/client/client_shared.c index b8c8bdfc..bd2cd68e 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -440,6 +440,22 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } i++; } + }else if(!strcmp(argv[i], "-W")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + }else{ + if(i==argc-1){ + fprintf(stderr, "Error: -W argument given but no timeout specified.\n\n"); + return 1; + }else{ + cfg->timeout = atoi(argv[i+1]); + if(cfg->timeout < 1){ + fprintf(stderr, "Error: Invalid timeout \"%d\".\n\n", cfg->msg_count); + return 1; + } + } + i++; + } }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){ cfg->debug = true; }else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){ diff --git a/client/client_shared.h b/client/client_shared.h index 88fdba13..53eee667 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -84,6 +84,7 @@ struct mosq_config { bool eol; /* sub */ int msg_count; /* sub */ char *format; /* sub */ + int timeout; /* sub */ #ifdef WITH_SOCKS char *socks5_host; int socks5_port; diff --git a/client/sub_client.c b/client/sub_client.c index 1fb6fb75..cfdbf29a 100644 --- a/client/sub_client.c +++ b/client/sub_client.c @@ -22,6 +22,7 @@ Contributors: #include #ifndef WIN32 #include +#include #else #include #include @@ -33,6 +34,17 @@ Contributors: bool process_messages = true; int msg_count = 0; +struct mosquitto *mosq = NULL; + +#ifndef WIN32 +void my_signal_handler(int signum) +{ + if(signum == SIGALRM){ + process_messages = false; + mosquitto_disconnect(mosq); + } +} +#endif static int get_time(struct tm **ti, long *ns) { @@ -401,6 +413,9 @@ void print_usage(void) printf(" [-c] [-k keepalive] [-q qos]\n"); printf(" [-C msg_count] [-R] [--retained-only] [-T filter_out] [-U topic ...]\n"); printf(" [-F format]\n"); +#ifndef WIN32 + printf(" [-W timeout_secs]\n"); +#endif #ifdef WITH_SRV printf(" [-A bind_address] [-S]\n"); #else @@ -448,6 +463,9 @@ void print_usage(void) printf(" -v : print published messages verbosely.\n"); printf(" -V : specify the version of the MQTT protocol to use when connecting.\n"); printf(" Can be mqttv31 or mqttv311. Defaults to mqttv311.\n"); +#ifndef WIN32 + printf(" -W : Specifies a timeout in seconds how long to process incoming MQTT messages.\n"); +#endif printf(" --help : display this message.\n"); printf(" --quiet : don't print error messages.\n"); printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n"); @@ -488,10 +506,13 @@ void print_usage(void) int main(int argc, char *argv[]) { struct mosq_config cfg; - struct mosquitto *mosq = NULL; int rc; +#ifndef WIN32 + struct sigaction sigact; +#endif memset(&cfg, 0, sizeof(struct mosq_config)); + rc = client_config_load(&cfg, CLIENT_SUB, argc, argv); if(rc){ client_config_cleanup(&cfg); @@ -541,6 +562,20 @@ int main(int argc, char *argv[]) rc = client_connect(mosq, &cfg); if(rc) return rc; +#ifndef WIN32 + sigact.sa_handler = my_signal_handler; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = 0; + + if(sigaction(SIGALRM, &sigact, NULL) == -1){ + perror("sigaction"); + return 1; + } + + if(cfg.timeout){ + alarm(cfg.timeout); + } +#endif rc = mosquitto_loop_forever(mosq, -1, 1);