diff --git a/ChangeLog.txt b/ChangeLog.txt index 106ef56b..4913779a 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,8 @@ +1.4 - xxxxxxxx +============== + Broker: +- Initial websockets support. - Default TLS mode now accepts TLS v1.2, v1.1 and v1.0. - Support for ECDHE-ECDSA family ciphers. diff --git a/config.mk b/config.mk index bd588fd5..63311762 100644 --- a/config.mk +++ b/config.mk @@ -70,6 +70,9 @@ WITH_PYTHON:=yes # Build with SRV lookup support. WITH_SRV:=yes +# Build with websockets support on the broker. +WITH_WEBSOCKETS:=yes + # Use elliptic keys in broker WITH_EC:=yes @@ -212,6 +215,11 @@ ifeq ($(WITH_SRV),yes) LIB_LIBS:=$(LIB_LIBS) -lcares endif +ifeq ($(WITH_WEBSOCKETS),yes) + BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_WEBSOCKETS + BROKER_LIBS:=$(BROKER_LIBS) -lwebsockets +endif + ifeq ($(UNAME),SunOS) BROKER_LIBS:=$(BROKER_LIBS) -lsocket -lnsl LIB_LIBS:=$(LIB_LIBS) -lsocket -lnsl diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index d2d2c6ff..a81e53a0 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -192,6 +192,10 @@ struct mosquitto { int db_index; struct _mosquitto_packet *out_packet_last; bool is_dropping; +# ifdef WITH_WEBSOCKETS + struct libwebsocket_context *ws_context; + struct libwebsocket *wsi; +# endif #else void *userdata; bool in_callback; diff --git a/lib/net_mosq.c b/lib/net_mosq.c index 42ea4ce9..fea4d3b3 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -65,6 +65,9 @@ Contributors: extern unsigned long g_pub_msgs_received; extern unsigned long g_pub_msgs_sent; # endif +# ifdef WITH_WEBSOCKETS +# include +# endif #else # include #endif @@ -155,7 +158,16 @@ int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *pa mosq->out_packet_last = packet; pthread_mutex_unlock(&mosq->out_packet_mutex); #ifdef WITH_BROKER +# ifdef WITH_WEBSOCKETS + if(mosq->wsi){ + libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi); + return 0; + }else{ + return _mosquitto_packet_write(mosq); + } +# else return _mosquitto_packet_write(mosq); +# endif #else /* Write a single byte to sockpairW (connected to sockpairR) to break out diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 765fa9c3..81fc49a0 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -97,7 +97,12 @@ int _mosquitto_send_publish(struct mosquitto *mosq, uint16_t mid, const char *to assert(mosq); assert(topic); +#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) + if(mosq->sock == INVALID_SOCKET && !mosq->wsi) return MOSQ_ERR_NO_CONN; +#else if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; +#endif + #ifdef WITH_BROKER if(mosq->listener && mosq->listener->mount_point){ len = strlen(mosq->listener->mount_point); diff --git a/lib/util_mosq.c b/lib/util_mosq.c index ea71c86e..088d1d87 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -34,6 +34,10 @@ Contributors: #include #endif +#ifdef WITH_WEBSOCKETS +#include +#endif + int _mosquitto_packet_alloc(struct _mosquitto_packet *packet) { uint8_t remaining_bytes[5], byte; @@ -57,7 +61,11 @@ int _mosquitto_packet_alloc(struct _mosquitto_packet *packet) }while(remaining_length > 0 && packet->remaining_count < 5); if(packet->remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE; packet->packet_length = packet->remaining_length + 1 + packet->remaining_count; +#ifdef WITH_WEBSOCKETS + packet->payload = _mosquitto_malloc(sizeof(uint8_t)*packet->packet_length + LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING); +#else packet->payload = _mosquitto_malloc(sizeof(uint8_t)*packet->packet_length); +#endif if(!packet->payload) return MOSQ_ERR_NOMEM; packet->payload[0] = packet->command; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0d99fd56..e663cae1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -26,6 +26,7 @@ set (MOSQ_SRCS ../lib/time_mosq.c ../lib/tls_mosq.c ../lib/util_mosq.c ../lib/util_mosq.h + websockets.c ../lib/will_mosq.c ../lib/will_mosq.h) option(INC_BRIDGE_SUPPORT @@ -65,6 +66,12 @@ if (${WITH_SYS_TREE} STREQUAL ON) add_definitions("-DWITH_SYS_TREE") endif (${WITH_SYS_TREE} STREQUAL ON) +option(WITH_WEBSOCKETS + "Include websockets support?" ON) +if (${WITH_WEBSOCKETS} STREQUAL ON) + add_definitions("-DWITH_WEBSOCKETS") +endif (${WITH_WEBSOCKETS} STREQUAL ON) + if (WIN32 OR CYGWIN) set (MOSQ_SRCS ${MOSQ_SRCS} service.c) endif (WIN32 OR CYGWIN) @@ -87,6 +94,10 @@ if (WIN32) set (MOSQ_LIBS ${MOSQ_LIBS} ws2_32) endif (WIN32) +if (${WITH_WEBSOCKETS} STREQUAL ON) + set (MOSQ_LIBS ${MOSQ_LIBS} websockets) +endif (${WITH_WEBSOCKETS} STREQUAL ON) + target_link_libraries(mosquitto ${MOSQ_LIBS}) install(TARGETS mosquitto RUNTIME DESTINATION ${SBINDIR} LIBRARY DESTINATION ${LIBDIR}) diff --git a/src/Makefile b/src/Makefile index 5dbf06fa..456fe153 100644 --- a/src/Makefile +++ b/src/Makefile @@ -8,7 +8,7 @@ else all : mosquitto endif -mosquitto : mosquitto.o bridge.o conf.o context.o database.o logging.o loop.o memory_mosq.o persist.o net.o net_mosq.o read_handle.o read_handle_client.o read_handle_server.o read_handle_shared.o security.o security_default.o send_client_mosq.o send_mosq.o send_server.o service.o subs.o sys_tree.o time_mosq.o tls_mosq.o util_mosq.o will_mosq.o +mosquitto : mosquitto.o bridge.o conf.o context.o database.o logging.o loop.o memory_mosq.o persist.o net.o net_mosq.o read_handle.o read_handle_client.o read_handle_server.o read_handle_shared.o security.o security_default.o send_client_mosq.o send_mosq.o send_server.o service.o subs.o sys_tree.o time_mosq.o tls_mosq.o util_mosq.o websockets.o will_mosq.o ${CC} $^ -o $@ ${LDFLAGS} $(BROKER_LIBS) mosquitto.o : mosquitto.c mosquitto_broker.h @@ -89,6 +89,9 @@ tls_mosq.o : ../lib/tls_mosq.c util_mosq.o : ../lib/util_mosq.c ../lib/util_mosq.h ${CC} $(BROKER_CFLAGS) -c $< -o $@ +websockets.o : websockets.c mosquitto_broker.h + ${CC} $(BROKER_CFLAGS) -c $< -o $@ + will_mosq.o : ../lib/will_mosq.c ../lib/will_mosq.h ${CC} $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/database.c b/src/database.c index da3b1c81..39cdd851 100644 --- a/src/database.c +++ b/src/database.c @@ -256,7 +256,11 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, } } } +#ifdef WITH_WEBSOCKETS + if(context->sock == INVALID_SOCKET && !context->wsi){ +#else if(context->sock == INVALID_SOCKET){ +#endif /* Client is not connected only queue messages with QoS>0. */ if(qos == 0 && !db->config->queue_qos0_messages){ if(!context->bridge){ @@ -269,7 +273,11 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, } } +#ifdef WITH_WEBSOCKETS + if(context->sock != INVALID_SOCKET || context->wsi){ +#else if(context->sock != INVALID_SOCKET){ +#endif if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){ if(dir == mosq_md_out){ switch(qos){ @@ -383,7 +391,15 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, } #endif +#ifdef WITH_WEBSOCKETS + if(context->wsi){ + return mqtt3_db_message_write(context); + }else{ + return rc; + } +#else return rc; +#endif } int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state) @@ -743,8 +759,13 @@ int mqtt3_db_message_write(struct mosquitto *context) const void *payload; int msg_count = 0; +#ifdef WITH_WEBSOCKETS + if(!context || (context->sock == -1 && !context->wsi) + || (context->state == mosq_cs_connected && !context->id)){ +#else if(!context || context->sock == -1 || (context->state == mosq_cs_connected && !context->id)){ +#endif return MOSQ_ERR_INVAL; } diff --git a/src/loop.c b/src/loop.c index cfe17f21..886c8e56 100644 --- a/src/loop.c +++ b/src/loop.c @@ -32,6 +32,10 @@ Contributors: #include #include +#ifdef WITH_WEBSOCKETS +# include +#endif + #include #include #include @@ -69,12 +73,19 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock int bridge_sock; int rc; #endif +#ifdef WITH_WEBSOCKETS + struct libwebsocket_context *ws_context; +#endif #ifndef WIN32 sigemptyset(&sigblock); sigaddset(&sigblock, SIGINT); #endif +#ifdef WITH_WEBSOCKETS + ws_context = mosq_websockets_init(8080); +#endif + while(run){ #ifdef WITH_SYS_TREE if(db->config->sys_interval > 0){ @@ -287,7 +298,13 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock mqtt3_sub_tree_print(&db->subs, 0); flag_tree_print = false; } +#ifdef WITH_WEBSOCKETS + libwebsocket_service(ws_context, 0); +#endif } +#ifdef WITH_WEBSOCKETS + libwebsocket_context_destroy(ws_context); +#endif if(pollfds) _mosquitto_free(pollfds); return MOSQ_ERR_SUCCESS; diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 497f3d9a..d3a71103 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -420,4 +420,11 @@ void service_uninstall(void); void service_run(void); #endif +/* ============================================================ + * Websockets related functions + * ============================================================ */ +#ifdef WITH_WEBSOCKETS +struct libwebsocket_context *mosq_websockets_init(int port); +#endif + #endif diff --git a/src/read_handle_server.c b/src/read_handle_server.c index 5765007e..63e11954 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -290,7 +290,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) } #ifdef WITH_TLS - if(context->listener->use_identity_as_username){ + if(context->listener && context->listener->use_identity_as_username){ if(!context->ssl){ _mosquitto_send_connack(context, CONNACK_REFUSED_BAD_USERNAME_PASSWORD); mqtt3_context_disconnect(db, context); diff --git a/src/websockets.c b/src/websockets.c new file mode 100644 index 00000000..2018087c --- /dev/null +++ b/src/websockets.c @@ -0,0 +1,304 @@ +/* +Copyright (c) 2014 Roger Light +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. Neither the name of mosquitto nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. +*/ + +#include +#include "mosquitto_internal.h" +#include "mosquitto_broker.h" +#include "mqtt3_protocol.h" +#include "memory_mosq.h" + +#ifdef WITH_WEBSOCKETS + +#ifdef WITH_SYS_TREE +extern uint64_t g_bytes_received; +extern uint64_t g_bytes_sent; +extern unsigned long g_msgs_received; +extern unsigned long g_msgs_sent; +extern unsigned long g_pub_msgs_received; +extern unsigned long g_pub_msgs_sent; +#endif +extern struct mosquitto_db int_db; + +static int callback_mqtt(struct libwebsocket_context *context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, + void *user, + void *in, + size_t len); +static int callback_http(struct libwebsocket_context *context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, + void *user, + void *in, + size_t len); + +enum mosq_ws_protocols { + PROTOCOL_HTTP = 0, + PROTOCOL_MQTT, + DEMO_PROTOCOL_COUNT +}; + +struct libws_mqtt_data { + struct mosquitto *mosq; +}; + +struct libws_http_data { + char blank; +}; + +static struct libwebsocket_protocols protocols[] = { + /* first protocol must always be HTTP handler */ + { + "http-only", + callback_http, + sizeof (struct libws_http_data), + 0, + }, + { + "mqttv3.1", + callback_mqtt, + sizeof(struct libws_mqtt_data), + 0, + }, + { NULL, NULL, 0, 0 } +}; + + +static int callback_mqtt(struct libwebsocket_context *context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, + void *user, + void *in, + size_t len) +{ + struct mosquitto_db *db; + struct mosquitto *mosq = NULL; + struct _mosquitto_packet *packet; + int count; + struct libws_mqtt_data *u = (struct libws_mqtt_data *)user; + size_t pos; + uint8_t *buf; + int rc; + uint8_t byte; + + db = &int_db; + + switch (reason) { + case LWS_CALLBACK_ESTABLISHED: + mosq = mqtt3_context_init(-1); + if(mosq){ + mosq->ws_context = context; + mosq->wsi = wsi; + u->mosq = mosq; + } + break; + + case LWS_CALLBACK_CLOSED: + mosq = u->mosq; + mqtt3_context_cleanup(db, mosq, true); + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + mosq = u->mosq; + pthread_mutex_lock(&mosq->current_out_packet_mutex); + pthread_mutex_lock(&mosq->out_packet_mutex); + if(mosq->out_packet && !mosq->current_out_packet){ + mosq->current_out_packet = mosq->out_packet; + mosq->out_packet = mosq->out_packet->next; + if(!mosq->out_packet){ + mosq->out_packet_last = NULL; + } + } + pthread_mutex_unlock(&mosq->out_packet_mutex); + + while(mosq->current_out_packet){ + packet = mosq->current_out_packet; + + if(packet->pos == 0 && packet->to_process == packet->packet_length){ + /* First time this packet has been dealt with. + * libwebsockets requires that the payload has + * LWS_SEND_BUFFER_PRE_PADDING space available before the + * actual data and LWS_SEND_BUFFER_POST_PADDING afterwards. + * We've already made the payload big enough to allow this, + * but need to move it into position here. */ + memmove(&packet->payload[LWS_SEND_BUFFER_PRE_PADDING], packet->payload, packet->packet_length); + packet->pos += LWS_SEND_BUFFER_PRE_PADDING; + } + count = libwebsocket_write(wsi, &packet->payload[packet->pos], packet->to_process, LWS_WRITE_BINARY); + packet->to_process -= count; + packet->pos += count; + if(packet->to_process > 0){ + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + break; + } + + /* Free data and reset values */ + pthread_mutex_lock(&mosq->out_packet_mutex); + mosq->current_out_packet = mosq->out_packet; + if(mosq->out_packet){ + mosq->out_packet = mosq->out_packet->next; + if(!mosq->out_packet){ + mosq->out_packet_last = NULL; + } + } + pthread_mutex_unlock(&mosq->out_packet_mutex); + + _mosquitto_packet_cleanup(packet); + _mosquitto_free(packet); + + pthread_mutex_lock(&mosq->msgtime_mutex); + mosq->last_msg_out = mosquitto_time(); + pthread_mutex_unlock(&mosq->msgtime_mutex); + } + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + break; + + case LWS_CALLBACK_RECEIVE: + mosq = u->mosq; + pos = 0; + buf = (uint8_t *)in; +#ifdef WITH_SYS_TREE + g_bytes_received += len; +#endif + while(pos < len){ + if(!mosq->in_packet.command){ + mosq->in_packet.command = buf[pos]; + pos++; + /* Clients must send CONNECT as their first command. */ + if(mosq->state == mosq_cs_new && (mosq->in_packet.command&0xF0) != CONNECT){ + return -1; + } + } + if(!mosq->in_packet.have_remaining){ + do{ + if(pos == len){ + break; + } + byte = buf[pos]; + pos++; + + mosq->in_packet.remaining_count++; + /* Max 4 bytes length for remaining length as defined by protocol. + * Anything more likely means a broken/malicious client. + */ + if(mosq->in_packet.remaining_count > 4){ + return -1; + } + + mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult; + mosq->in_packet.remaining_mult *= 128; + }while((byte & 128) != 0); + + if(mosq->in_packet.remaining_length > 0){ + mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t)); + if(!mosq->in_packet.payload) return -1; + mosq->in_packet.to_process = mosq->in_packet.remaining_length; + } + mosq->in_packet.have_remaining = 1; + } + while(mosq->in_packet.to_process>0){ + if(len - pos >= mosq->in_packet.to_process){ + memcpy(&mosq->in_packet.payload[mosq->in_packet.pos], &buf[pos], mosq->in_packet.to_process); + mosq->in_packet.pos += mosq->in_packet.to_process; + pos += mosq->in_packet.to_process; + mosq->in_packet.to_process = 0; + }else{ + memcpy(&mosq->in_packet.payload[mosq->in_packet.pos], &buf[pos], len-pos); + mosq->in_packet.pos += len-pos; + mosq->in_packet.to_process -= len-pos; + break; + } + } + + /* All data for this packet is read. */ + mosq->in_packet.pos = 0; +#ifdef WITH_SYS_TREE + g_msgs_received++; + if(((mosq->in_packet.command)&0xF5) == PUBLISH){ + g_pub_msgs_received++; + } +#endif + rc = mqtt3_packet_handle(db, mosq); + + /* Free data and reset values */ + _mosquitto_packet_cleanup(&mosq->in_packet); + + pthread_mutex_lock(&mosq->msgtime_mutex); + mosq->last_msg_in = mosquitto_time(); + pthread_mutex_unlock(&mosq->msgtime_mutex); + + if(rc){ + if(db->config->connection_messages == true){ + if(mosq->state != mosq_cs_disconnecting){ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", mosq->id); + }else{ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", mosq->id); + } + } + mqtt3_context_disconnect(db, mosq); + return -1; + } + } + break; + + default: + break; + } + + return 0; +} + + +static int callback_http(struct libwebsocket_context *context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, + void *user, + void *in, + size_t len) +{ + return 0; +} + +struct libwebsocket_context *mosq_websockets_init(int port) +{ + struct lws_context_creation_info info; + + memset(&info, 0, sizeof(info)); + info.port = port; + info.protocols = protocols; + info.gid = -1; + info.uid = -1; + + lws_set_log_level(0, NULL); + + return libwebsocket_create_context(&info); +} + + +#endif