Merge branch 'sub_client_timeout_upstream' of git://github.com/I2SE/mosquitto into I2SE-sub_client_timeout_upstream

pull/568/merge
Roger A. Light 8 years ago
commit 9852f94ee0

@ -442,6 +442,22 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
}
i++;
}
}else if(!strcmp(argv[i], "-W")){
if(pub_or_sub == CLIENT_PUB){
goto unknown_option;
}else{
if(i==argc-1){
fprintf(stderr, "Error: -W argument given but no timeout specified.\n\n");
return 1;
}else{
cfg->timeout = atoi(argv[i+1]);
if(cfg->timeout < 1){
fprintf(stderr, "Error: Invalid timeout \"%d\".\n\n", cfg->msg_count);
return 1;
}
}
i++;
}
}else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug")){
cfg->debug = true;
}else if(!strcmp(argv[i], "-f") || !strcmp(argv[i], "--file")){

@ -84,6 +84,7 @@ struct mosq_config {
bool eol; /* sub */
int msg_count; /* sub */
char *format; /* sub */
int timeout; /* sub */
#ifdef WITH_SOCKS
char *socks5_host;
int socks5_port;

@ -14,6 +14,8 @@ Contributors:
Roger Light - initial implementation and documentation.
*/
#define _POSIX_C_SOURCE 200809L
#include <assert.h>
#include <errno.h>
#include <stdio.h>
@ -22,6 +24,7 @@ Contributors:
#include <time.h>
#ifndef WIN32
#include <unistd.h>
#include <signal.h>
#else
#include <process.h>
#include <winsock2.h>
@ -33,6 +36,17 @@ Contributors:
bool process_messages = true;
int msg_count = 0;
struct mosquitto *mosq = NULL;
#ifndef WIN32
void my_signal_handler(int signum)
{
if(signum == SIGALRM){
process_messages = false;
mosquitto_disconnect(mosq);
}
}
#endif
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message);
@ -127,6 +141,9 @@ void print_usage(void)
printf(" [-c] [-k keepalive] [-q qos]\n");
printf(" [-C msg_count] [-R] [--retained-only] [-T filter_out] [-U topic ...]\n");
printf(" [-F format]\n");
#ifndef WIN32
printf(" [-W timeout_secs]\n");
#endif
#ifdef WITH_SRV
printf(" [-A bind_address] [-S]\n");
#else
@ -174,6 +191,9 @@ void print_usage(void)
printf(" -v : print published messages verbosely.\n");
printf(" -V : specify the version of the MQTT protocol to use when connecting.\n");
printf(" Can be mqttv31 or mqttv311. Defaults to mqttv311.\n");
#ifndef WIN32
printf(" -W : Specifies a timeout in seconds how long to process incoming MQTT messages.\n");
#endif
printf(" --help : display this message.\n");
printf(" --quiet : don't print error messages.\n");
printf(" --retained-only : only handle messages with the retained flag set, and exit when the\n");
@ -214,10 +234,13 @@ void print_usage(void)
int main(int argc, char *argv[])
{
struct mosq_config cfg;
struct mosquitto *mosq = NULL;
int rc;
#ifndef WIN32
struct sigaction sigact;
#endif
memset(&cfg, 0, sizeof(struct mosq_config));
rc = client_config_load(&cfg, CLIENT_SUB, argc, argv);
if(rc){
client_config_cleanup(&cfg);
@ -267,6 +290,20 @@ int main(int argc, char *argv[])
rc = client_connect(mosq, &cfg);
if(rc) return rc;
#ifndef WIN32
sigact.sa_handler = my_signal_handler;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
if(sigaction(SIGALRM, &sigact, NULL) == -1){
perror("sigaction");
return 1;
}
if(cfg.timeout){
alarm(cfg.timeout);
}
#endif
rc = mosquitto_loop_forever(mosq, -1, 1);

@ -18,6 +18,9 @@ Contributors:
#include <errno.h>
#include <sys/select.h>
#ifndef WIN32
#include <time.h>
#endif
#include "mosquitto.h"
#include "mosquitto_internal.h"
@ -203,6 +206,9 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
int rc;
unsigned int reconnects = 0;
unsigned long reconnect_delay;
#ifndef WIN32
struct timespec req, rem;
#endif
if(!mosq) return MOSQ_ERR_INVAL;
@ -262,7 +268,11 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
#ifdef WIN32
Sleep(reconnect_delay*1000);
#else
sleep(reconnect_delay);
req.tv_sec = reconnect_delay;
req.tv_nsec = 0;
while(nanosleep(&req, &rem) == -1 && errno == EINTR){
req = rem;
}
#endif
pthread_mutex_lock(&mosq->state_mutex);

Loading…
Cancel
Save