diff --git a/client/client_shared.c b/client/client_shared.c index 3851f37c..f14ebb1e 100644 --- a/client/client_shared.c +++ b/client/client_shared.c @@ -442,6 +442,22 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c } i++; } + }else if(!strcmp(argv[i], "-W")){ + if(pub_or_sub == CLIENT_PUB){ + goto unknown_option; + }else{ + if(i==argc-1){ + fprintf(stderr, "Error: -W argument given but no timeout specified.\n\n"); + return 1; + }else{ + cfg->timeout = atoi(argv[i+1]); + if(cfg->timeout < 1){ + fprintf(stderr, "Error: Invalid timeout \"%d\".\n\n", cfg->msg_count); + return 1; + } + } + i++; + } }else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){ cfg->debug = true; }else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){ diff --git a/client/client_shared.h b/client/client_shared.h index 09d0add6..f1ce6f31 100644 --- a/client/client_shared.h +++ b/client/client_shared.h @@ -84,6 +84,7 @@ struct mosq_config { bool eol; /* sub */ int msg_count; /* sub */ char *format; /* sub */ + int timeout; /* sub */ #ifdef WITH_SOCKS char *socks5_host; int socks5_port; diff --git a/client/sub_client.c b/client/sub_client.c index 5011259e..1d836846 100644 --- a/client/sub_client.c +++ b/client/sub_client.c @@ -14,6 +14,8 @@ Contributors: Roger Light - initial implementation and documentation. */ +#define _POSIX_C_SOURCE 200809L + #include #include #include @@ -22,6 +24,7 @@ Contributors: #include #ifndef WIN32 #include +#include #else #include #include @@ -33,6 +36,17 @@ Contributors: bool process_messages = true; int msg_count = 0; +struct mosquitto *mosq = NULL; + +#ifndef WIN32 +void my_signal_handler(int signum) +{ + if(signum == SIGALRM){ + process_messages = false; + mosquitto_disconnect(mosq); + } +} +#endif void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); @@ -127,6 +141,9 @@ void print_usage(void) printf(" [-c] [-k keepalive] [-q qos]\n"); printf(" [-C msg_count] [-R] [--retained-only] [-T filter_out] [-U topic ...]\n"); printf(" [-F format]\n"); +#ifndef WIN32 + printf(" [-W timeout_secs]\n"); +#endif #ifdef WITH_SRV printf(" [-A bind_address] [-S]\n"); #else @@ -174,6 +191,9 @@ void print_usage(void) printf(" -v : print published messages verbosely.\n"); printf(" -V : specify the version of the MQTT protocol to use when connecting.\n"); printf(" Can be mqttv31 or mqttv311. Defaults to mqttv311.\n"); +#ifndef WIN32 + printf(" -W : Specifies a timeout in seconds how long to process incoming MQTT messages.\n"); +#endif printf(" --help : display this message.\n"); printf(" --quiet : don't print error messages.\n"); printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n"); @@ -214,10 +234,13 @@ void print_usage(void) int main(int argc, char *argv[]) { struct mosq_config cfg; - struct mosquitto *mosq = NULL; int rc; +#ifndef WIN32 + struct sigaction sigact; +#endif memset(&cfg, 0, sizeof(struct mosq_config)); + rc = client_config_load(&cfg, CLIENT_SUB, argc, argv); if(rc){ client_config_cleanup(&cfg); @@ -267,6 +290,20 @@ int main(int argc, char *argv[]) rc = client_connect(mosq, &cfg); if(rc) return rc; +#ifndef WIN32 + sigact.sa_handler = my_signal_handler; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = 0; + + if(sigaction(SIGALRM, &sigact, NULL) == -1){ + perror("sigaction"); + return 1; + } + + if(cfg.timeout){ + alarm(cfg.timeout); + } +#endif rc = mosquitto_loop_forever(mosq, -1, 1); diff --git a/lib/loop.c b/lib/loop.c index 902e35ce..d2f9f8e1 100644 --- a/lib/loop.c +++ b/lib/loop.c @@ -18,6 +18,9 @@ Contributors: #include #include +#ifndef WIN32 +#include +#endif #include "mosquitto.h" #include "mosquitto_internal.h" @@ -203,6 +206,9 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) int rc; unsigned int reconnects = 0; unsigned long reconnect_delay; +#ifndef WIN32 + struct timespec req, rem; +#endif if(!mosq) return MOSQ_ERR_INVAL; @@ -262,7 +268,11 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) #ifdef WIN32 Sleep(reconnect_delay*1000); #else - sleep(reconnect_delay); + req.tv_sec = reconnect_delay; + req.tv_nsec = 0; + while(nanosleep(&req, &rem) == -1 && errno == EINTR){ + req = rem; + } #endif pthread_mutex_lock(&mosq->state_mutex);