diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index 39a12d1d..d53772ed 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -8,7 +8,7 @@ if (${WITH_SRV} STREQUAL ON) add_definitions("-DWITH_SRV") endif (${WITH_SRV} STREQUAL ON) -add_executable(mosquitto_pub pub_client.c ${shared_src}) +add_executable(mosquitto_pub pub_client.c pub_shared.c ${shared_src}) add_executable(mosquitto_sub sub_client.c sub_client_output.c ${shared_src}) target_link_libraries(mosquitto_pub libmosquitto) diff --git a/client/Makefile b/client/Makefile index 4ffc46b1..39310b97 100644 --- a/client/Makefile +++ b/client/Makefile @@ -8,13 +8,13 @@ static : static_pub static_sub # This makes mosquitto_pub/sub versions that are statically linked with # libmosquitto only. -static_pub : pub_client.o client_shared.o ../lib/libmosquitto.a +static_pub : pub_client.o pub_shared.o client_shared.o ../lib/libmosquitto.a ${CROSS_COMPILE}${CC} $^ -o mosquitto_pub ${CLIENT_LDFLAGS} -lssl -lcrypto -lpthread static_sub : sub_client.o sub_client_output.o client_shared.o ../lib/libmosquitto.a ${CROSS_COMPILE}${CC} $^ -o mosquitto_sub ${CLIENT_LDFLAGS} -lssl -lcrypto -lpthread -mosquitto_pub : pub_client.o client_shared.o client_props.o +mosquitto_pub : pub_client.o pub_shared.o client_shared.o client_props.o ${CROSS_COMPILE}${CC} $^ -o $@ ${CLIENT_LDFLAGS} mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o @@ -23,6 +23,9 @@ mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o pub_client.o : pub_client.c ../lib/libmosquitto.so.${SOVERSION} ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} +pub_shared.o : pub_shared.c ../lib/libmosquitto.so.${SOVERSION} + ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} + sub_client.o : sub_client.c ../lib/libmosquitto.so.${SOVERSION} ${CROSS_COMPILE}${CC} -c $< -o $@ ${CLIENT_CFLAGS} diff --git a/client/pub_client.c b/client/pub_client.c index 9b5291db..3f95b742 100644 --- a/client/pub_client.c +++ b/client/pub_client.c @@ -31,29 +31,10 @@ Contributors: #include #include "client_shared.h" - -#define STATUS_CONNECTING 0 -#define STATUS_CONNACK_RECVD 1 -#define STATUS_WAITING 2 -#define STATUS_DISCONNECTING 3 +#include "pub_shared.h" /* Global variables for use in callbacks. See sub_client.c for an example of * using a struct to hold variables for use in callbacks. */ -static char *topic = NULL; -static char *message = NULL; -static long msglen = 0; -static int qos = 0; -static int retain = 0; -static int mode = MSGMODE_NONE; -static int status = STATUS_CONNECTING; -static int mid_sent = 0; -static int last_mid = -1; -static int last_mid_sent = -1; -static bool connected = true; -static char *username = NULL; -static char *password = NULL; -static bool disconnect_sent = false; -static bool quiet = false; static struct mosq_config cfg; static bool first_publish = true; @@ -73,21 +54,21 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result) int rc = MOSQ_ERR_SUCCESS; if(!result){ - switch(mode){ + switch(cfg.pub_mode){ case MSGMODE_CMD: case MSGMODE_FILE: case MSGMODE_STDIN_FILE: - rc = my_publish(mosq, &mid_sent, topic, msglen, message, qos, retain); + rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain); break; case MSGMODE_NULL: - rc = my_publish(mosq, &mid_sent, topic, 0, NULL, qos, retain); + rc = my_publish(mosq, &mid_sent, cfg.topic, 0, NULL, cfg.qos, cfg.retain); break; case MSGMODE_STDIN_LINE: status = STATUS_CONNACK_RECVD; break; } if(rc){ - if(!quiet){ + if(!cfg.quiet){ switch(rc){ case MOSQ_ERR_INVAL: fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n"); @@ -109,109 +90,12 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result) mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); } }else{ - if(result && !quiet){ + if(result && !cfg.quiet){ fprintf(stderr, "%s\n", mosquitto_connack_string(result)); } } } -void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) -{ - connected = false; -} - -void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) -{ - last_mid_sent = mid; - if(mode == MSGMODE_STDIN_LINE){ - if(mid == last_mid){ - mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); - disconnect_sent = true; - } - }else if(disconnect_sent == false){ - mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); - disconnect_sent = true; - } -} - -void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) -{ - printf("%s\n", str); -} - -int load_stdin(void) -{ - long pos = 0, rlen; - char buf[1024]; - char *aux_message = NULL; - - mode = MSGMODE_STDIN_FILE; - - while(!feof(stdin)){ - rlen = fread(buf, 1, 1024, stdin); - aux_message = realloc(message, pos+rlen); - if(!aux_message){ - if(!quiet) fprintf(stderr, "Error: Out of memory.\n"); - free(message); - return 1; - } else - { - message = aux_message; - } - memcpy(&(message[pos]), buf, rlen); - pos += rlen; - } - msglen = pos; - - if(!msglen){ - if(!quiet) fprintf(stderr, "Error: Zero length input.\n"); - return 1; - } - - return 0; -} - -int load_file(const char *filename) -{ - long pos, rlen; - FILE *fptr = NULL; - - fptr = fopen(filename, "rb"); - if(!fptr){ - if(!quiet) fprintf(stderr, "Error: Unable to open file \"%s\".\n", filename); - return 1; - } - mode = MSGMODE_FILE; - fseek(fptr, 0, SEEK_END); - msglen = ftell(fptr); - if(msglen > 268435455){ - fclose(fptr); - if(!quiet) fprintf(stderr, "Error: File \"%s\" is too large (>268,435,455 bytes).\n", filename); - return 1; - }else if(msglen == 0){ - fclose(fptr); - if(!quiet) fprintf(stderr, "Error: File \"%s\" is empty.\n", filename); - return 1; - }else if(msglen < 0){ - fclose(fptr); - if(!quiet) fprintf(stderr, "Error: Unable to determine size of file \"%s\".\n", filename); - return 1; - } - fseek(fptr, 0, SEEK_SET); - message = malloc(msglen); - if(!message){ - fclose(fptr); - if(!quiet) fprintf(stderr, "Error: Out of memory.\n"); - return 1; - } - pos = 0; - while(pos < msglen){ - rlen = fread(&(message[pos]), sizeof(char), msglen-pos, fptr); - pos += rlen; - } - fclose(fptr); - return 0; -} void print_usage(void) { @@ -313,21 +197,11 @@ int main(int argc, char *argv[]) { struct mosquitto *mosq = NULL; int rc; - int rc2; - char *buf, *buf2; - int buf_len = 1024; - int buf_len_actual; - int read_len; - int pos; - - buf = malloc(buf_len); - if(!buf){ - fprintf(stderr, "Error: Out of memory.\n"); - return 1; - } mosquitto_lib_init(); + if(pub_shared_init()) return 1; + memset(&cfg, 0, sizeof(struct mosq_config)); rc = client_config_load(&cfg, CLIENT_PUB, argc, argv); if(rc){ @@ -340,16 +214,6 @@ int main(int argc, char *argv[]) goto cleanup; } - topic = cfg.topic; - message = cfg.message; - msglen = cfg.msglen; - qos = cfg.qos; - retain = cfg.retain; - mode = cfg.pub_mode; - username = cfg.username; - password = cfg.password; - quiet = cfg.quiet; - #ifndef WITH_THREADING if(cfg.pub_mode == MSGMODE_STDIN_LINE){ fprintf(stderr, "Error: '-l' mode not available, threading support has not been compiled in.\n"); @@ -369,7 +233,7 @@ int main(int argc, char *argv[]) } } - if(!topic || mode == MSGMODE_NONE){ + if(!cfg.topic || cfg.pub_mode == MSGMODE_NONE){ fprintf(stderr, "Error: Both topic and message must be supplied.\n"); print_usage(); goto cleanup; @@ -384,10 +248,10 @@ int main(int argc, char *argv[]) if(!mosq){ switch(errno){ case ENOMEM: - if(!quiet) fprintf(stderr, "Error: Out of memory.\n"); + if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n"); break; case EINVAL: - if(!quiet) fprintf(stderr, "Error: Invalid id.\n"); + if(!cfg.quiet) fprintf(stderr, "Error: Invalid id.\n"); break; } goto cleanup; @@ -402,84 +266,21 @@ int main(int argc, char *argv[]) if(client_opts_set(mosq, &cfg)){ goto cleanup; } + rc = client_connect(mosq, &cfg); if(rc){ goto cleanup; } - if(mode == MSGMODE_STDIN_LINE){ - mosquitto_loop_start(mosq); - } - - do{ - if(mode == MSGMODE_STDIN_LINE){ - if(status == STATUS_CONNACK_RECVD){ - pos = 0; - read_len = buf_len; - while(fgets(&buf[pos], read_len, stdin)){ - buf_len_actual = strlen(buf); - if(buf[buf_len_actual-1] == '\n'){ - buf[buf_len_actual-1] = '\0'; - rc2 = my_publish(mosq, &mid_sent, topic, buf_len_actual-1, buf, qos, retain); - if(rc2){ - if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2); - mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); - } - break; - }else{ - buf_len += 1024; - pos += 1023; - read_len = 1024; - buf2 = realloc(buf, buf_len); - if(!buf2){ - fprintf(stderr, "Error: Out of memory.\n"); - goto cleanup; - } - buf = buf2; - } - } - if(feof(stdin)){ - if(last_mid == -1){ - /* Empty file */ - mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); - disconnect_sent = true; - status = STATUS_DISCONNECTING; - }else{ - last_mid = mid_sent; - status = STATUS_WAITING; - } - } - }else if(status == STATUS_WAITING){ - if(last_mid_sent == last_mid && disconnect_sent == false){ - mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); - disconnect_sent = true; - } -#ifdef WIN32 - Sleep(100); -#else - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 100000000; - nanosleep(&ts, NULL); -#endif - } - rc = MOSQ_ERR_SUCCESS; - }else{ - rc = mosquitto_loop(mosq, -1, 1); - } - }while(rc == MOSQ_ERR_SUCCESS && connected); - - if(mode == MSGMODE_STDIN_LINE){ - mosquitto_loop_stop(mosq, false); - } + rc = pub_shared_loop(mosq); - if(message && mode == MSGMODE_FILE){ - free(message); + if(cfg.message && cfg.pub_mode == MSGMODE_FILE){ + free(cfg.message); } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); client_config_cleanup(&cfg); - free(buf); + pub_shared_cleanup(); if(rc){ fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc)); @@ -489,6 +290,6 @@ int main(int argc, char *argv[]) cleanup: mosquitto_lib_cleanup(); client_config_cleanup(&cfg); - free(buf); + pub_shared_cleanup(); return 1; } diff --git a/client/pub_shared.c b/client/pub_shared.c new file mode 100644 index 00000000..26fcab91 --- /dev/null +++ b/client/pub_shared.c @@ -0,0 +1,238 @@ +/* +Copyright (c) 2009-2018 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include +#include +#include +#include +#include +#ifndef WIN32 +#include +#else +#include +#include +#define snprintf sprintf_s +#endif + +#include +#include "client_shared.h" +#include "pub_shared.h" + +/* Global variables for use in callbacks. See sub_client.c for an example of + * using a struct to hold variables for use in callbacks. */ +int mid_sent = 0; +int status = STATUS_CONNECTING; +static int last_mid = -1; +static int last_mid_sent = -1; +static bool connected = true; +static bool disconnect_sent = false; +static struct mosq_config cfg; +static char *buf; +static int buf_len = 1024; + +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) +{ + connected = false; +} + +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) +{ + last_mid_sent = mid; + if(cfg.pub_mode == MSGMODE_STDIN_LINE){ + if(mid == last_mid){ + mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); + disconnect_sent = true; + } + }else if(disconnect_sent == false){ + mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); + disconnect_sent = true; + } +} + +void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) +{ + printf("%s\n", str); +} + +int load_stdin(void) +{ + long pos = 0, rlen; + char buf[1024]; + char *aux_message = NULL; + + cfg.pub_mode = MSGMODE_STDIN_FILE; + + while(!feof(stdin)){ + rlen = fread(buf, 1, 1024, stdin); + aux_message = realloc(cfg.message, pos+rlen); + if(!aux_message){ + if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n"); + free(cfg.message); + return 1; + } else + { + cfg.message = aux_message; + } + memcpy(&(cfg.message[pos]), buf, rlen); + pos += rlen; + } + cfg.msglen = pos; + + if(!cfg.msglen){ + if(!cfg.quiet) fprintf(stderr, "Error: Zero length input.\n"); + return 1; + } + + return 0; +} + +int load_file(const char *filename) +{ + long pos, rlen; + FILE *fptr = NULL; + + fptr = fopen(filename, "rb"); + if(!fptr){ + if(!cfg.quiet) fprintf(stderr, "Error: Unable to open file \"%s\".\n", filename); + return 1; + } + cfg.pub_mode = MSGMODE_FILE; + fseek(fptr, 0, SEEK_END); + cfg.msglen = ftell(fptr); + if(cfg.msglen > 268435455){ + fclose(fptr); + if(!cfg.quiet) fprintf(stderr, "Error: File \"%s\" is too large (>268,435,455 bytes).\n", filename); + return 1; + }else if(cfg.msglen == 0){ + fclose(fptr); + if(!cfg.quiet) fprintf(stderr, "Error: File \"%s\" is empty.\n", filename); + return 1; + }else if(cfg.msglen < 0){ + fclose(fptr); + if(!cfg.quiet) fprintf(stderr, "Error: Unable to determine size of file \"%s\".\n", filename); + return 1; + } + fseek(fptr, 0, SEEK_SET); + cfg.message = malloc(cfg.msglen); + if(!cfg.message){ + fclose(fptr); + if(!cfg.quiet) fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + pos = 0; + while(pos < cfg.msglen){ + rlen = fread(&(cfg.message[pos]), sizeof(char), cfg.msglen-pos, fptr); + pos += rlen; + } + fclose(fptr); + return 0; +} + + +int pub_shared_init(void) +{ + buf = malloc(buf_len); + if(!buf){ + fprintf(stderr, "Error: Out of memory.\n"); + return 1; + } + return 0; +} + + +int pub_shared_loop(struct mosquitto *mosq) +{ + int read_len; + int pos; + int rc, rc2; + char *buf2; + int buf_len_actual; + + if(cfg.pub_mode == MSGMODE_STDIN_LINE){ + mosquitto_loop_start(mosq); + } + + do{ + if(cfg.pub_mode == MSGMODE_STDIN_LINE){ + if(status == STATUS_CONNACK_RECVD){ + pos = 0; + read_len = buf_len; + while(fgets(&buf[pos], read_len, stdin)){ + buf_len_actual = strlen(buf); + if(buf[buf_len_actual-1] == '\n'){ + buf[buf_len_actual-1] = '\0'; + rc2 = my_publish(mosq, &mid_sent, cfg.topic, buf_len_actual-1, buf, cfg.qos, cfg.retain); + if(rc2){ + if(!cfg.quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2); + mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); + } + break; + }else{ + buf_len += 1024; + pos += 1023; + read_len = 1024; + buf2 = realloc(buf, buf_len); + if(!buf2){ + fprintf(stderr, "Error: Out of memory.\n"); + return MOSQ_ERR_NOMEM; + } + buf = buf2; + } + } + if(feof(stdin)){ + if(last_mid == -1){ + /* Empty file */ + mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); + disconnect_sent = true; + status = STATUS_DISCONNECTING; + }else{ + last_mid = mid_sent; + status = STATUS_WAITING; + } + } + }else if(status == STATUS_WAITING){ + if(last_mid_sent == last_mid && disconnect_sent == false){ + mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props); + disconnect_sent = true; + } +#ifdef WIN32 + Sleep(100); +#else + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 100000000; + nanosleep(&ts, NULL); +#endif + } + rc = MOSQ_ERR_SUCCESS; + }else{ + rc = mosquitto_loop(mosq, -1, 1); + } + }while(rc == MOSQ_ERR_SUCCESS && connected); + + if(cfg.pub_mode == MSGMODE_STDIN_LINE){ + mosquitto_loop_stop(mosq, false); + } + return 0; +} + + +void pub_shared_cleanup(void) +{ + free(buf); +} diff --git a/client/pub_shared.h b/client/pub_shared.h new file mode 100644 index 00000000..f91a55fc --- /dev/null +++ b/client/pub_shared.h @@ -0,0 +1,41 @@ +/* +Copyright (c) 2009-2018 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ +#ifndef PUB_SHARED_H +#define PUB_SHARED_H + +#define STATUS_CONNECTING 0 +#define STATUS_CONNACK_RECVD 1 +#define STATUS_WAITING 2 +#define STATUS_DISCONNECTING 3 + +extern int mid_sent; +extern int status; + + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result); +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc); +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid); +void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str); +int load_stdin(void); +int load_file(const char *filename); + +int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain); + +int pub_shared_init(void); +int pub_shared_loop(struct mosquitto *mosq); +void pub_shared_cleanup(void); + +#endif