From 970ba58da63b2790607585edcc364b7ada614e3b Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 29 Apr 2015 21:23:59 +0100 Subject: [PATCH] Code reorganise. --- lib/CMakeLists.txt | 1 + lib/Makefile | 4 + lib/mosquitto.c | 1 + lib/net_mosq.c | 460 +------------------------------ lib/net_mosq.h | 19 -- lib/packet_mosq.c | 576 +++++++++++++++++++++++++++++++++++++++ lib/packet_mosq.h | 46 ++++ lib/read_handle.c | 1 + lib/read_handle_client.c | 1 + lib/read_handle_shared.c | 1 + lib/send_client_mosq.c | 1 + lib/send_mosq.c | 1 + lib/socks_mosq.c | 1 + src/CMakeLists.txt | 1 + src/Makefile | 5 +- src/bridge.c | 1 + src/context.c | 1 + src/loop.c | 1 + src/read_handle.c | 1 + src/read_handle_client.c | 1 + src/read_handle_server.c | 1 + src/send_server.c | 1 + 22 files changed, 651 insertions(+), 475 deletions(-) create mode 100644 lib/packet_mosq.c create mode 100644 lib/packet_mosq.h diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 32a04b4d..98190e3f 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(libmosquitto SHARED mosquitto_internal.h mqtt3_protocol.h net_mosq.c net_mosq.h + packet_mosq.c packet_mosq.h read_handle.c read_handle.h read_handle_client.c read_handle_shared.c diff --git a/lib/Makefile b/lib/Makefile index 825fcead..9d3722eb 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -7,6 +7,7 @@ MOSQ_OBJS=mosquitto.o \ memory_mosq.o \ messages_mosq.o \ net_mosq.o \ + packet_mosq.o \ read_handle.o \ read_handle_client.o \ read_handle_shared.o \ @@ -62,6 +63,9 @@ memory_mosq.o : memory_mosq.c memory_mosq.h net_mosq.o : net_mosq.c net_mosq.h ${CROSS_COMPILE}$(CC) $(LIB_CFLAGS) -c $< -o $@ +packet_mosq.o : packet_mosq.c packet_mosq.h + ${CROSS_COMPILE}$(CC) $(LIB_CFLAGS) -c $< -o $@ + read_handle.o : read_handle.c read_handle.h ${CROSS_COMPILE}$(CC) $(LIB_CFLAGS) -c $< -o $@ diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 09625515..88bcf020 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -36,6 +36,7 @@ typedef int ssize_t; #include #include #include +#include #include #include #include diff --git a/lib/net_mosq.c b/lib/net_mosq.c index b256ee0c..22a1087b 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -81,6 +81,7 @@ Contributors: #include #include + #ifdef WITH_TLS int tls_ex_index_mosq = -1; #endif @@ -126,72 +127,6 @@ void mosquitto__net_cleanup(void) #endif } -void mosquitto__packet_cleanup(struct mosquitto__packet *packet) -{ - if(!packet) return; - - /* Free data and reset values */ - packet->command = 0; - packet->remaining_count = 0; - packet->remaining_mult = 1; - packet->remaining_length = 0; - if(packet->payload) mosquitto__free(packet->payload); - packet->payload = NULL; - packet->to_process = 0; - packet->pos = 0; -} - -int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *packet) -{ -#ifndef WITH_BROKER - char sockpair_data = 0; -#endif - assert(mosq); - assert(packet); - - packet->pos = 0; - packet->to_process = packet->packet_length; - - packet->next = NULL; - pthread_mutex_lock(&mosq->out_packet_mutex); - if(mosq->out_packet){ - mosq->out_packet_last->next = packet; - }else{ - mosq->out_packet = packet; - } - mosq->out_packet_last = packet; - pthread_mutex_unlock(&mosq->out_packet_mutex); -#ifdef WITH_BROKER -# ifdef WITH_WEBSOCKETS - if(mosq->wsi){ - libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi); - return 0; - }else{ - return mosquitto__packet_write(mosq); - } -# else - return mosquitto__packet_write(mosq); -# endif -#else - - /* Write a single byte to sockpairW (connected to sockpairR) to break out - * of select() if in threaded mode. */ - if(mosq->sockpairW != INVALID_SOCKET){ -#ifndef WIN32 - if(write(mosq->sockpairW, &sockpair_data, 1)){ - } -#else - send(mosq->sockpairW, &sockpair_data, 1, 0); -#endif - } - - if(mosq->in_callback == false && mosq->threaded == false){ - return mosquitto__packet_write(mosq); - }else{ - return MOSQ_ERR_SUCCESS; - } -#endif -} /* Close a socket associated with a context and set it to -1. * Returns 1 on failure (context is NULL) @@ -247,6 +182,7 @@ int mosquitto__socket_close(struct mosquitto *mosq) return rc; } + #ifdef REAL_WITH_TLS_PSK static unsigned int psk_client_callback(SSL *ssl, const char *hint, char *identity, unsigned int max_identity_len, @@ -266,6 +202,7 @@ static unsigned int psk_client_callback(SSL *ssl, const char *hint, } #endif + int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking) { struct addrinfo hints; @@ -370,6 +307,7 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po return rc; } + #ifdef WITH_TLS int mosquitto__socket_connect_tls(struct mosquitto *mosq) { @@ -396,6 +334,7 @@ int mosquitto__socket_connect_tls(struct mosquitto *mosq) } #endif + /* Create a socket and connect it to 'ip' on port 'port'. * Returns -1 on failure (ip is NULL, socket creation/connection error) * Returns sock number on success. @@ -556,98 +495,6 @@ int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t return rc; } -int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte) -{ - assert(packet); - if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL; - - *byte = packet->payload[packet->pos]; - packet->pos++; - - return MOSQ_ERR_SUCCESS; -} - -void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte) -{ - assert(packet); - assert(packet->pos+1 <= packet->packet_length); - - packet->payload[packet->pos] = byte; - packet->pos++; -} - -int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count) -{ - assert(packet); - if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL; - - memcpy(bytes, &(packet->payload[packet->pos]), count); - packet->pos += count; - - return MOSQ_ERR_SUCCESS; -} - -void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count) -{ - assert(packet); - assert(packet->pos+count <= packet->packet_length); - - memcpy(&(packet->payload[packet->pos]), bytes, count); - packet->pos += count; -} - -int mosquitto__read_string(struct mosquitto__packet *packet, char **str) -{ - uint16_t len; - int rc; - - assert(packet); - rc = mosquitto__read_uint16(packet, &len); - if(rc) return rc; - - if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL; - - *str = mosquitto__malloc(len+1); - if(*str){ - memcpy(*str, &(packet->payload[packet->pos]), len); - (*str)[len] = '\0'; - packet->pos += len; - }else{ - return MOSQ_ERR_NOMEM; - } - - return MOSQ_ERR_SUCCESS; -} - -void mosquitto__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length) -{ - assert(packet); - mosquitto__write_uint16(packet, length); - mosquitto__write_bytes(packet, str, length); -} - -int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word) -{ - uint8_t msb, lsb; - - assert(packet); - if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL; - - msb = packet->payload[packet->pos]; - packet->pos++; - lsb = packet->payload[packet->pos]; - packet->pos++; - - *word = (msb<<8) + lsb; - - return MOSQ_ERR_SUCCESS; -} - -void mosquitto__write_uint16(struct mosquitto__packet *packet, uint16_t word) -{ - mosquitto__write_byte(packet, MOSQ_MSB(word)); - mosquitto__write_byte(packet, MOSQ_LSB(word)); -} ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count) { @@ -745,303 +592,6 @@ ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count) #endif } -int mosquitto__packet_write(struct mosquitto *mosq) -{ - ssize_t write_length; - struct mosquitto__packet *packet; - - if(!mosq) return MOSQ_ERR_INVAL; - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; - - pthread_mutex_lock(&mosq->current_out_packet_mutex); - pthread_mutex_lock(&mosq->out_packet_mutex); - if(mosq->out_packet && !mosq->current_out_packet){ - mosq->current_out_packet = mosq->out_packet; - mosq->out_packet = mosq->out_packet->next; - if(!mosq->out_packet){ - mosq->out_packet_last = NULL; - } - } - pthread_mutex_unlock(&mosq->out_packet_mutex); - - if(mosq->state == mosq_cs_connect_pending){ - pthread_mutex_unlock(&mosq->current_out_packet_mutex); - return MOSQ_ERR_SUCCESS; - } - - while(mosq->current_out_packet){ - packet = mosq->current_out_packet; - - while(packet->to_process > 0){ - write_length = mosquitto__net_write(mosq, &(packet->payload[packet->pos]), packet->to_process); - if(write_length > 0){ -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - g_bytes_sent += write_length; -#endif - packet->to_process -= write_length; - packet->pos += write_length; - }else{ -#ifdef WIN32 - errno = WSAGetLastError(); -#endif - if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ - pthread_mutex_unlock(&mosq->current_out_packet_mutex); - return MOSQ_ERR_SUCCESS; - }else{ - pthread_mutex_unlock(&mosq->current_out_packet_mutex); - switch(errno){ - case COMPAT_ECONNRESET: - return MOSQ_ERR_CONN_LOST; - default: - return MOSQ_ERR_ERRNO; - } - } - } - } - -#ifdef WITH_BROKER -# ifdef WITH_SYS_TREE - g_msgs_sent++; - if(((packet->command)&0xF6) == PUBLISH){ - g_pub_msgs_sent++; - } -# endif -#else - if(((packet->command)&0xF6) == PUBLISH){ - pthread_mutex_lock(&mosq->callback_mutex); - if(mosq->on_publish){ - /* This is a QoS=0 message */ - mosq->in_callback = true; - mosq->on_publish(mosq, mosq->userdata, packet->mid); - mosq->in_callback = false; - } - pthread_mutex_unlock(&mosq->callback_mutex); - }else if(((packet->command)&0xF0) == DISCONNECT){ - /* FIXME what cleanup needs doing here? - * incoming/outgoing messages? */ - mosquitto__socket_close(mosq); - - /* Start of duplicate, possibly unnecessary code. - * This does leave things in a consistent state at least. */ - /* Free data and reset values */ - pthread_mutex_lock(&mosq->out_packet_mutex); - mosq->current_out_packet = mosq->out_packet; - if(mosq->out_packet){ - mosq->out_packet = mosq->out_packet->next; - if(!mosq->out_packet){ - mosq->out_packet_last = NULL; - } - } - pthread_mutex_unlock(&mosq->out_packet_mutex); - - mosquitto__packet_cleanup(packet); - mosquitto__free(packet); - - pthread_mutex_lock(&mosq->msgtime_mutex); - mosq->last_msg_out = mosquitto_time(); - pthread_mutex_unlock(&mosq->msgtime_mutex); - /* End of duplicate, possibly unnecessary code */ - - pthread_mutex_lock(&mosq->callback_mutex); - if(mosq->on_disconnect){ - mosq->in_callback = true; - mosq->on_disconnect(mosq, mosq->userdata, 0); - mosq->in_callback = false; - } - pthread_mutex_unlock(&mosq->callback_mutex); - pthread_mutex_unlock(&mosq->current_out_packet_mutex); - return MOSQ_ERR_SUCCESS; - } -#endif - - /* Free data and reset values */ - pthread_mutex_lock(&mosq->out_packet_mutex); - mosq->current_out_packet = mosq->out_packet; - if(mosq->out_packet){ - mosq->out_packet = mosq->out_packet->next; - if(!mosq->out_packet){ - mosq->out_packet_last = NULL; - } - } - pthread_mutex_unlock(&mosq->out_packet_mutex); - - mosquitto__packet_cleanup(packet); - mosquitto__free(packet); - - pthread_mutex_lock(&mosq->msgtime_mutex); - mosq->last_msg_out = mosquitto_time(); - pthread_mutex_unlock(&mosq->msgtime_mutex); - } - pthread_mutex_unlock(&mosq->current_out_packet_mutex); - return MOSQ_ERR_SUCCESS; -} - -#ifdef WITH_BROKER -int mosquitto__packet_read(struct mosquitto_db *db, struct mosquitto *mosq) -#else -int mosquitto__packet_read(struct mosquitto *mosq) -#endif -{ - uint8_t byte; - ssize_t read_length; - int rc = 0; - - if(!mosq) return MOSQ_ERR_INVAL; - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; - if(mosq->state == mosq_cs_connect_pending){ - return MOSQ_ERR_SUCCESS; - } - - /* This gets called if pselect() indicates that there is network data - * available - ie. at least one byte. What we do depends on what data we - * already have. - * If we've not got a command, attempt to read one and save it. This should - * always work because it's only a single byte. - * Then try to read the remaining length. This may fail because it is may - * be more than one byte - will need to save data pending next read if it - * does fail. - * Then try to read the remaining payload, where 'payload' here means the - * combined variable header and actual payload. This is the most likely to - * fail due to longer length, so save current data and current position. - * After all data is read, send to mosquitto__handle_packet() to deal with. - * Finally, free the memory and reset everything to starting conditions. - */ - if(!mosq->in_packet.command){ - read_length = mosquitto__net_read(mosq, &byte, 1); - if(read_length == 1){ - mosq->in_packet.command = byte; -#ifdef WITH_BROKER -# ifdef WITH_SYS_TREE - g_bytes_received++; -# endif - /* Clients must send CONNECT as their first command. */ - if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL; -#endif - }else{ - if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */ -#ifdef WIN32 - errno = WSAGetLastError(); -#endif - if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ - return MOSQ_ERR_SUCCESS; - }else{ - switch(errno){ - case COMPAT_ECONNRESET: - return MOSQ_ERR_CONN_LOST; - default: - return MOSQ_ERR_ERRNO; - } - } - } - } - /* remaining_count is the number of bytes that the remaining_length - * parameter occupied in this incoming packet. We don't use it here as such - * (it is used when allocating an outgoing packet), but we must be able to - * determine whether all of the remaining_length parameter has been read. - * remaining_count has three states here: - * 0 means that we haven't read any remaining_length bytes - * <0 means we have read some remaining_length bytes but haven't finished - * >0 means we have finished reading the remaining_length bytes. - */ - if(mosq->in_packet.remaining_count <= 0){ - do{ - read_length = mosquitto__net_read(mosq, &byte, 1); - if(read_length == 1){ - mosq->in_packet.remaining_count--; - /* Max 4 bytes length for remaining length as defined by protocol. - * Anything more likely means a broken/malicious client. - */ - if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL; - -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - g_bytes_received++; -#endif - mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult; - mosq->in_packet.remaining_mult *= 128; - }else{ - if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */ -#ifdef WIN32 - errno = WSAGetLastError(); -#endif - if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ - return MOSQ_ERR_SUCCESS; - }else{ - switch(errno){ - case COMPAT_ECONNRESET: - return MOSQ_ERR_CONN_LOST; - default: - return MOSQ_ERR_ERRNO; - } - } - } - }while((byte & 128) != 0); - /* We have finished reading remaining_length, so make remaining_count - * positive. */ - mosq->in_packet.remaining_count *= -1; - - if(mosq->in_packet.remaining_length > 0){ - mosq->in_packet.payload = mosquitto__malloc(mosq->in_packet.remaining_length*sizeof(uint8_t)); - if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM; - mosq->in_packet.to_process = mosq->in_packet.remaining_length; - } - } - while(mosq->in_packet.to_process>0){ - read_length = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); - if(read_length > 0){ -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - g_bytes_received += read_length; -#endif - mosq->in_packet.to_process -= read_length; - mosq->in_packet.pos += read_length; - }else{ -#ifdef WIN32 - errno = WSAGetLastError(); -#endif - if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ - if(mosq->in_packet.to_process > 1000){ - /* Update last_msg_in time if more than 1000 bytes left to - * receive. Helps when receiving large messages. - * This is an arbitrary limit, but with some consideration. - * If a client can't send 1000 bytes in a second it - * probably shouldn't be using a 1 second keep alive. */ - pthread_mutex_lock(&mosq->msgtime_mutex); - mosq->last_msg_in = mosquitto_time(); - pthread_mutex_unlock(&mosq->msgtime_mutex); - } - return MOSQ_ERR_SUCCESS; - }else{ - switch(errno){ - case COMPAT_ECONNRESET: - return MOSQ_ERR_CONN_LOST; - default: - return MOSQ_ERR_ERRNO; - } - } - } - } - - /* All data for this packet is read. */ - mosq->in_packet.pos = 0; -#ifdef WITH_BROKER -# ifdef WITH_SYS_TREE - g_msgs_received++; - if(((mosq->in_packet.command)&0xF5) == PUBLISH){ - g_pub_msgs_received++; - } -# endif - rc = mqtt3_packet_handle(db, mosq); -#else - rc = mosquitto__packet_handle(mosq); -#endif - - /* Free data and reset values */ - mosquitto__packet_cleanup(&mosq->in_packet); - - pthread_mutex_lock(&mosq->msgtime_mutex); - mosq->last_msg_in = mosquitto_time(); - pthread_mutex_unlock(&mosq->msgtime_mutex); - return rc; -} int mosquitto__socket_nonblock(int sock) { diff --git a/lib/net_mosq.h b/lib/net_mosq.h index 024137a9..9f4d1a2b 100644 --- a/lib/net_mosq.h +++ b/lib/net_mosq.h @@ -52,8 +52,6 @@ struct mosquitto_db; void mosquitto__net_init(void); void mosquitto__net_cleanup(void); -void mosquitto__packet_cleanup(struct mosquitto__packet *packet); -int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *packet); int mosquitto__socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking); #ifdef WITH_BROKER int mosquitto__socket_close(struct mosquitto_db *db, struct mosquitto *mosq); @@ -64,26 +62,9 @@ int mosquitto__try_connect(struct mosquitto *mosq, const char *host, uint16_t po int mosquitto__socket_nonblock(int sock); int mosquitto__socketpair(int *sp1, int *sp2); -int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte); -int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count); -int mosquitto__read_string(struct mosquitto__packet *packet, char **str); -int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word); - -void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte); -void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count); -void mosquitto__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length); -void mosquitto__write_uint16(struct mosquitto__packet *packet, uint16_t word); - ssize_t mosquitto__net_read(struct mosquitto *mosq, void *buf, size_t count); ssize_t mosquitto__net_write(struct mosquitto *mosq, void *buf, size_t count); -int mosquitto__packet_write(struct mosquitto *mosq); -#ifdef WITH_BROKER -int mosquitto__packet_read(struct mosquitto_db *db, struct mosquitto *mosq); -#else -int mosquitto__packet_read(struct mosquitto *mosq); -#endif - #ifdef WITH_TLS int mosquitto__socket_apply_tls(struct mosquitto *mosq); int mosquitto__socket_connect_tls(struct mosquitto *mosq); diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c new file mode 100644 index 00000000..7cc5f4c5 --- /dev/null +++ b/lib/packet_mosq.c @@ -0,0 +1,576 @@ +/* +Copyright (c) 2009-2015 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#if 0 +#include +#include +#include +#include +#ifndef WIN32 +#include +#include +#include +#else +#include +#include +#endif + +#ifdef __ANDROID__ +#include +#include +#include +#endif + +#ifdef __FreeBSD__ +# include +#endif + +#ifdef __SYMBIAN32__ +#include +#endif + +#ifdef __QNX__ +#ifndef AI_ADDRCONFIG +#define AI_ADDRCONFIG 0 +#endif +#include +#include +#endif + +#ifdef WITH_TLS +#include +#include +#include +#include +#endif + +#ifdef WITH_BROKER +# include +# ifdef WITH_SYS_TREE + extern uint64_t g_bytes_received; + extern uint64_t g_bytes_sent; + extern unsigned long g_msgs_received; + extern unsigned long g_msgs_sent; + extern unsigned long g_pub_msgs_received; + extern unsigned long g_pub_msgs_sent; +# endif +# ifdef WITH_WEBSOCKETS +# include +# endif +#else +# include +#endif + +#include +#include +#include +#endif + +#include +#include + +#ifdef WITH_BROKER +# include +# ifdef WITH_SYS_TREE + extern uint64_t g_bytes_received; + extern uint64_t g_bytes_sent; + extern unsigned long g_msgs_received; + extern unsigned long g_msgs_sent; + extern unsigned long g_pub_msgs_received; + extern unsigned long g_pub_msgs_sent; +# endif +# ifdef WITH_WEBSOCKETS +# include +# endif +#else +# include +#endif + + +#include "memory_mosq.h" +#include "mqtt3_protocol.h" +#include "net_mosq.h" +#include "packet_mosq.h" +#include "read_handle.h" + +void mosquitto__packet_cleanup(struct mosquitto__packet *packet) +{ + if(!packet) return; + + /* Free data and reset values */ + packet->command = 0; + packet->remaining_count = 0; + packet->remaining_mult = 1; + packet->remaining_length = 0; + if(packet->payload) mosquitto__free(packet->payload); + packet->payload = NULL; + packet->to_process = 0; + packet->pos = 0; +} + +int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *packet) +{ +#ifndef WITH_BROKER + char sockpair_data = 0; +#endif + assert(mosq); + assert(packet); + + packet->pos = 0; + packet->to_process = packet->packet_length; + + packet->next = NULL; + pthread_mutex_lock(&mosq->out_packet_mutex); + if(mosq->out_packet){ + mosq->out_packet_last->next = packet; + }else{ + mosq->out_packet = packet; + } + mosq->out_packet_last = packet; + pthread_mutex_unlock(&mosq->out_packet_mutex); +#ifdef WITH_BROKER +# ifdef WITH_WEBSOCKETS + if(mosq->wsi){ + libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi); + return 0; + }else{ + return mosquitto__packet_write(mosq); + } +# else + return mosquitto__packet_write(mosq); +# endif +#else + + /* Write a single byte to sockpairW (connected to sockpairR) to break out + * of select() if in threaded mode. */ + if(mosq->sockpairW != INVALID_SOCKET){ +#ifndef WIN32 + if(write(mosq->sockpairW, &sockpair_data, 1)){ + } +#else + send(mosq->sockpairW, &sockpair_data, 1, 0); +#endif + } + + if(mosq->in_callback == false && mosq->threaded == false){ + return mosquitto__packet_write(mosq); + }else{ + return MOSQ_ERR_SUCCESS; + } +#endif +} + + +int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte) +{ + assert(packet); + if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL; + + *byte = packet->payload[packet->pos]; + packet->pos++; + + return MOSQ_ERR_SUCCESS; +} + + +void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte) +{ + assert(packet); + assert(packet->pos+1 <= packet->packet_length); + + packet->payload[packet->pos] = byte; + packet->pos++; +} + + +int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count) +{ + assert(packet); + if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL; + + memcpy(bytes, &(packet->payload[packet->pos]), count); + packet->pos += count; + + return MOSQ_ERR_SUCCESS; +} + + +void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count) +{ + assert(packet); + assert(packet->pos+count <= packet->packet_length); + + memcpy(&(packet->payload[packet->pos]), bytes, count); + packet->pos += count; +} + + +int mosquitto__read_string(struct mosquitto__packet *packet, char **str) +{ + uint16_t len; + int rc; + + assert(packet); + rc = mosquitto__read_uint16(packet, &len); + if(rc) return rc; + + if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL; + + *str = mosquitto__malloc(len+1); + if(*str){ + memcpy(*str, &(packet->payload[packet->pos]), len); + (*str)[len] = '\0'; + packet->pos += len; + }else{ + return MOSQ_ERR_NOMEM; + } + + return MOSQ_ERR_SUCCESS; +} + + +void mosquitto__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length) +{ + assert(packet); + mosquitto__write_uint16(packet, length); + mosquitto__write_bytes(packet, str, length); +} + + +int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word) +{ + uint8_t msb, lsb; + + assert(packet); + if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL; + + msb = packet->payload[packet->pos]; + packet->pos++; + lsb = packet->payload[packet->pos]; + packet->pos++; + + *word = (msb<<8) + lsb; + + return MOSQ_ERR_SUCCESS; +} + + +void mosquitto__write_uint16(struct mosquitto__packet *packet, uint16_t word) +{ + mosquitto__write_byte(packet, MOSQ_MSB(word)); + mosquitto__write_byte(packet, MOSQ_LSB(word)); +} + + +int mosquitto__packet_write(struct mosquitto *mosq) +{ + ssize_t write_length; + struct mosquitto__packet *packet; + + if(!mosq) return MOSQ_ERR_INVAL; + if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + + pthread_mutex_lock(&mosq->current_out_packet_mutex); + pthread_mutex_lock(&mosq->out_packet_mutex); + if(mosq->out_packet && !mosq->current_out_packet){ + mosq->current_out_packet = mosq->out_packet; + mosq->out_packet = mosq->out_packet->next; + if(!mosq->out_packet){ + mosq->out_packet_last = NULL; + } + } + pthread_mutex_unlock(&mosq->out_packet_mutex); + + if(mosq->state == mosq_cs_connect_pending){ + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + return MOSQ_ERR_SUCCESS; + } + + while(mosq->current_out_packet){ + packet = mosq->current_out_packet; + + while(packet->to_process > 0){ + write_length = mosquitto__net_write(mosq, &(packet->payload[packet->pos]), packet->to_process); + if(write_length > 0){ +#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) + g_bytes_sent += write_length; +#endif + packet->to_process -= write_length; + packet->pos += write_length; + }else{ +#ifdef WIN32 + errno = WSAGetLastError(); +#endif + if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + return MOSQ_ERR_SUCCESS; + }else{ + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + switch(errno){ + case COMPAT_ECONNRESET: + return MOSQ_ERR_CONN_LOST; + default: + return MOSQ_ERR_ERRNO; + } + } + } + } + +#ifdef WITH_BROKER +# ifdef WITH_SYS_TREE + g_msgs_sent++; + if(((packet->command)&0xF6) == PUBLISH){ + g_pub_msgs_sent++; + } +# endif +#else + if(((packet->command)&0xF6) == PUBLISH){ + pthread_mutex_lock(&mosq->callback_mutex); + if(mosq->on_publish){ + /* This is a QoS=0 message */ + mosq->in_callback = true; + mosq->on_publish(mosq, mosq->userdata, packet->mid); + mosq->in_callback = false; + } + pthread_mutex_unlock(&mosq->callback_mutex); + }else if(((packet->command)&0xF0) == DISCONNECT){ + /* FIXME what cleanup needs doing here? + * incoming/outgoing messages? */ + mosquitto__socket_close(mosq); + + /* Start of duplicate, possibly unnecessary code. + * This does leave things in a consistent state at least. */ + /* Free data and reset values */ + pthread_mutex_lock(&mosq->out_packet_mutex); + mosq->current_out_packet = mosq->out_packet; + if(mosq->out_packet){ + mosq->out_packet = mosq->out_packet->next; + if(!mosq->out_packet){ + mosq->out_packet_last = NULL; + } + } + pthread_mutex_unlock(&mosq->out_packet_mutex); + + mosquitto__packet_cleanup(packet); + mosquitto__free(packet); + + pthread_mutex_lock(&mosq->msgtime_mutex); + mosq->last_msg_out = mosquitto_time(); + pthread_mutex_unlock(&mosq->msgtime_mutex); + /* End of duplicate, possibly unnecessary code */ + + pthread_mutex_lock(&mosq->callback_mutex); + if(mosq->on_disconnect){ + mosq->in_callback = true; + mosq->on_disconnect(mosq, mosq->userdata, 0); + mosq->in_callback = false; + } + pthread_mutex_unlock(&mosq->callback_mutex); + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + return MOSQ_ERR_SUCCESS; + } +#endif + + /* Free data and reset values */ + pthread_mutex_lock(&mosq->out_packet_mutex); + mosq->current_out_packet = mosq->out_packet; + if(mosq->out_packet){ + mosq->out_packet = mosq->out_packet->next; + if(!mosq->out_packet){ + mosq->out_packet_last = NULL; + } + } + pthread_mutex_unlock(&mosq->out_packet_mutex); + + mosquitto__packet_cleanup(packet); + mosquitto__free(packet); + + pthread_mutex_lock(&mosq->msgtime_mutex); + mosq->last_msg_out = mosquitto_time(); + pthread_mutex_unlock(&mosq->msgtime_mutex); + } + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + return MOSQ_ERR_SUCCESS; +} + + +#ifdef WITH_BROKER +int mosquitto__packet_read(struct mosquitto_db *db, struct mosquitto *mosq) +#else +int mosquitto__packet_read(struct mosquitto *mosq) +#endif +{ + uint8_t byte; + ssize_t read_length; + int rc = 0; + + if(!mosq) return MOSQ_ERR_INVAL; + if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + if(mosq->state == mosq_cs_connect_pending){ + return MOSQ_ERR_SUCCESS; + } + + /* This gets called if pselect() indicates that there is network data + * available - ie. at least one byte. What we do depends on what data we + * already have. + * If we've not got a command, attempt to read one and save it. This should + * always work because it's only a single byte. + * Then try to read the remaining length. This may fail because it is may + * be more than one byte - will need to save data pending next read if it + * does fail. + * Then try to read the remaining payload, where 'payload' here means the + * combined variable header and actual payload. This is the most likely to + * fail due to longer length, so save current data and current position. + * After all data is read, send to mosquitto__handle_packet() to deal with. + * Finally, free the memory and reset everything to starting conditions. + */ + if(!mosq->in_packet.command){ + read_length = mosquitto__net_read(mosq, &byte, 1); + if(read_length == 1){ + mosq->in_packet.command = byte; +#ifdef WITH_BROKER +# ifdef WITH_SYS_TREE + g_bytes_received++; +# endif + /* Clients must send CONNECT as their first command. */ + if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL; +#endif + }else{ + if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */ +#ifdef WIN32 + errno = WSAGetLastError(); +#endif + if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + return MOSQ_ERR_SUCCESS; + }else{ + switch(errno){ + case COMPAT_ECONNRESET: + return MOSQ_ERR_CONN_LOST; + default: + return MOSQ_ERR_ERRNO; + } + } + } + } + /* remaining_count is the number of bytes that the remaining_length + * parameter occupied in this incoming packet. We don't use it here as such + * (it is used when allocating an outgoing packet), but we must be able to + * determine whether all of the remaining_length parameter has been read. + * remaining_count has three states here: + * 0 means that we haven't read any remaining_length bytes + * <0 means we have read some remaining_length bytes but haven't finished + * >0 means we have finished reading the remaining_length bytes. + */ + if(mosq->in_packet.remaining_count <= 0){ + do{ + read_length = mosquitto__net_read(mosq, &byte, 1); + if(read_length == 1){ + mosq->in_packet.remaining_count--; + /* Max 4 bytes length for remaining length as defined by protocol. + * Anything more likely means a broken/malicious client. + */ + if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL; + +#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) + g_bytes_received++; +#endif + mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult; + mosq->in_packet.remaining_mult *= 128; + }else{ + if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */ +#ifdef WIN32 + errno = WSAGetLastError(); +#endif + if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + return MOSQ_ERR_SUCCESS; + }else{ + switch(errno){ + case COMPAT_ECONNRESET: + return MOSQ_ERR_CONN_LOST; + default: + return MOSQ_ERR_ERRNO; + } + } + } + }while((byte & 128) != 0); + /* We have finished reading remaining_length, so make remaining_count + * positive. */ + mosq->in_packet.remaining_count *= -1; + + if(mosq->in_packet.remaining_length > 0){ + mosq->in_packet.payload = mosquitto__malloc(mosq->in_packet.remaining_length*sizeof(uint8_t)); + if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM; + mosq->in_packet.to_process = mosq->in_packet.remaining_length; + } + } + while(mosq->in_packet.to_process>0){ + read_length = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); + if(read_length > 0){ +#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) + g_bytes_received += read_length; +#endif + mosq->in_packet.to_process -= read_length; + mosq->in_packet.pos += read_length; + }else{ +#ifdef WIN32 + errno = WSAGetLastError(); +#endif + if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + if(mosq->in_packet.to_process > 1000){ + /* Update last_msg_in time if more than 1000 bytes left to + * receive. Helps when receiving large messages. + * This is an arbitrary limit, but with some consideration. + * If a client can't send 1000 bytes in a second it + * probably shouldn't be using a 1 second keep alive. */ + pthread_mutex_lock(&mosq->msgtime_mutex); + mosq->last_msg_in = mosquitto_time(); + pthread_mutex_unlock(&mosq->msgtime_mutex); + } + return MOSQ_ERR_SUCCESS; + }else{ + switch(errno){ + case COMPAT_ECONNRESET: + return MOSQ_ERR_CONN_LOST; + default: + return MOSQ_ERR_ERRNO; + } + } + } + } + + /* All data for this packet is read. */ + mosq->in_packet.pos = 0; +#ifdef WITH_BROKER +# ifdef WITH_SYS_TREE + g_msgs_received++; + if(((mosq->in_packet.command)&0xF5) == PUBLISH){ + g_pub_msgs_received++; + } +# endif + rc = mqtt3_packet_handle(db, mosq); +#else + rc = mosquitto__packet_handle(mosq); +#endif + + /* Free data and reset values */ + mosquitto__packet_cleanup(&mosq->in_packet); + + pthread_mutex_lock(&mosq->msgtime_mutex); + mosq->last_msg_in = mosquitto_time(); + pthread_mutex_unlock(&mosq->msgtime_mutex); + return rc; +} + diff --git a/lib/packet_mosq.h b/lib/packet_mosq.h new file mode 100644 index 00000000..1d97c11f --- /dev/null +++ b/lib/packet_mosq.h @@ -0,0 +1,46 @@ +/* +Copyright (c) 2010-2015 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ +#ifndef _PACKET_MOSQ_H_ +#define _PACKET_MOSQ_H_ + +#include +#include + +#ifdef WITH_BROKER +struct mosquitto_db; +#endif + +void mosquitto__packet_cleanup(struct mosquitto__packet *packet); +int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *packet); + +int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte); +int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count); +int mosquitto__read_string(struct mosquitto__packet *packet, char **str); +int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word); + +void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte); +void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count); +void mosquitto__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length); +void mosquitto__write_uint16(struct mosquitto__packet *packet, uint16_t word); + +int mosquitto__packet_write(struct mosquitto *mosq); +#ifdef WITH_BROKER +int mosquitto__packet_read(struct mosquitto_db *db, struct mosquitto *mosq); +#else +int mosquitto__packet_read(struct mosquitto *mosq); +#endif + +#endif diff --git a/lib/read_handle.c b/lib/read_handle.c index dfbb33ca..4bb6281e 100644 --- a/lib/read_handle.c +++ b/lib/read_handle.c @@ -24,6 +24,7 @@ Contributors: #include #include #include +#include #include #include #include diff --git a/lib/read_handle_client.c b/lib/read_handle_client.c index b02bfe08..47410b19 100644 --- a/lib/read_handle_client.c +++ b/lib/read_handle_client.c @@ -20,6 +20,7 @@ Contributors: #include #include #include +#include #include int mosquitto__handle_connack(struct mosquitto *mosq) diff --git a/lib/read_handle_shared.c b/lib/read_handle_shared.c index ec0c38f7..99a4dc69 100644 --- a/lib/read_handle_shared.c +++ b/lib/read_handle_shared.c @@ -24,6 +24,7 @@ Contributors: #include #include #include +#include #include #include #include diff --git a/lib/send_client_mosq.c b/lib/send_client_mosq.c index 230f318e..4ed7c1cf 100644 --- a/lib/send_client_mosq.c +++ b/lib/send_client_mosq.c @@ -22,6 +22,7 @@ Contributors: #include #include #include +#include #include #include diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 6af7ea89..903e3c22 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -24,6 +24,7 @@ Contributors: #include #include #include +#include #include #include #include diff --git a/lib/socks_mosq.c b/lib/socks_mosq.c index d8e04eba..930acdf0 100644 --- a/lib/socks_mosq.c +++ b/lib/socks_mosq.c @@ -20,6 +20,7 @@ Contributors: #include "mosquitto_internal.h" #include "memory_mosq.h" #include "net_mosq.h" +#include "packet_mosq.h" #include "send_mosq.h" #define SOCKS_AUTH_NONE 0x00 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5bebeedd..672a5b16 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,6 +14,7 @@ set (MOSQ_SRCS mosquitto_broker.h net.c ../lib/net_mosq.c ../lib/net_mosq.h + ../lib/packet.c ../lib/packet.h persist.c persist.h read_handle.c read_handle_client.c read_handle_server.c ../lib/read_handle_shared.c ../lib/read_handle.h diff --git a/src/Makefile b/src/Makefile index 2cfb7d40..27c0e53c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -8,7 +8,7 @@ else all : mosquitto endif -mosquitto : mosquitto.o bridge.o conf.o context.o database.o logging.o loop.o memory_mosq.o persist.o net.o net_mosq.o read_handle.o read_handle_client.o read_handle_server.o read_handle_shared.o security.o security_default.o send_client_mosq.o send_mosq.o send_server.o service.o subs.o sys_tree.o time_mosq.o tls_mosq.o util_mosq.o websockets.o will_mosq.o +mosquitto : mosquitto.o bridge.o conf.o context.o database.o logging.o loop.o memory_mosq.o persist.o net.o net_mosq.o packet_mosq.o read_handle.o read_handle_client.o read_handle_server.o read_handle_shared.o security.o security_default.o send_client_mosq.o send_mosq.o send_server.o service.o subs.o sys_tree.o time_mosq.o tls_mosq.o util_mosq.o websockets.o will_mosq.o ${CROSS_COMPILE}${CC} $^ -o $@ ${LDFLAGS} $(BROKER_LIBS) mosquitto.o : mosquitto.c mosquitto_broker.h @@ -44,6 +44,9 @@ net_mosq.o : ../lib/net_mosq.c ../lib/net_mosq.h persist.o : persist.c persist.h mosquitto_broker.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ +packet_mosq.o : ../lib/packet_mosq.c ../lib/packet_mosq.h + ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ + read_handle.o : read_handle.c mosquitto_broker.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/bridge.c b/src/bridge.c index 810affa2..7c2da958 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -34,6 +34,7 @@ Contributors: #include #include #include +#include "packet_mosq.h" #include #include #include diff --git a/src/context.c b/src/context.c index 2115fc90..0bd8e7b8 100644 --- a/src/context.c +++ b/src/context.c @@ -21,6 +21,7 @@ Contributors: #include #include +#include "packet_mosq.h" #include #include "uthash.h" diff --git a/src/loop.c b/src/loop.c index 97bff48e..46d2f635 100644 --- a/src/loop.c +++ b/src/loop.c @@ -42,6 +42,7 @@ Contributors: #include #include +#include "packet_mosq.h" #include #include #include diff --git a/src/read_handle.c b/src/read_handle.c index 810f1b4f..1202fbfa 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -23,6 +23,7 @@ Contributors: #include #include #include +#include #include #include #include diff --git a/src/read_handle_client.c b/src/read_handle_client.c index 0286feab..2df52bc1 100644 --- a/src/read_handle_client.c +++ b/src/read_handle_client.c @@ -21,6 +21,7 @@ Contributors: #include #include #include +#include "packet_mosq.h" #include #include diff --git a/src/read_handle_server.c b/src/read_handle_server.c index b3571640..d8d74419 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -22,6 +22,7 @@ Contributors: #include #include #include +#include #include #include #include diff --git a/src/send_server.c b/src/send_server.c index 114ea46b..28b64056 100644 --- a/src/send_server.c +++ b/src/send_server.c @@ -19,6 +19,7 @@ Contributors: #include #include #include +#include #include int mosquitto__send_connack(struct mosquitto *context, int ack, int result)