diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ecda70a1..29b07d28 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -25,6 +25,7 @@ set (MOSQ_SRCS handle_unsubscribe.c keepalive.c lib_load.h + listeners.c logging.c loop.c ../lib/memory_mosq.c ../lib/memory_mosq.h diff --git a/src/Makefile b/src/Makefile index f3163550..e15da600 100644 --- a/src/Makefile +++ b/src/Makefile @@ -27,6 +27,7 @@ OBJS= mosquitto.o \ handle_unsuback.o \ handle_unsubscribe.o \ keepalive.o \ + listeners.o \ logging.o \ loop.o \ memory_mosq.o \ @@ -154,6 +155,9 @@ handle_unsubscribe.o : handle_unsubscribe.c mosquitto_broker_internal.h keepalive.o : keepalive.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +listeners.o : listeners.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + logging.o : logging.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/listeners.c b/src/listeners.c new file mode 100644 index 00000000..7d3d214f --- /dev/null +++ b/src/listeners.c @@ -0,0 +1,256 @@ +/* +Copyright (c) 2009-2021 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" +#include "memory_mosq.h" +#include "net_mosq.h" +#include "mosquitto_broker_internal.h" + +static int listensock_index = 0; + +void listener__set_defaults(struct mosquitto__listener *listener) +{ + listener->security_options.allow_anonymous = -1; + listener->security_options.allow_zero_length_clientid = true; + listener->protocol = mp_mqtt; + listener->max_connections = -1; + listener->max_qos = 2; + listener->max_topic_alias = 10; + listener->max_topic_alias_broker = 10; +} + + +void listeners__reload_all_certificates(void) +{ +#ifdef WITH_TLS + int i; + int rc; + struct mosquitto__listener *listener; + + for(i=0; ilistener_count; i++){ + listener = &db.config->listeners[i]; + if(listener->ssl_ctx && listener->certfile && listener->keyfile){ + rc = net__load_certificates(listener); + if(rc){ + log__printf(NULL, MOSQ_LOG_ERR, "Error when reloading certificate '%s' or key '%s'.", + listener->certfile, listener->keyfile); + } + } + } +#endif +} + + +int listeners__start_single_mqtt(struct mosquitto__listener *listener) +{ + int i; + struct mosquitto__listener_sock *listensock_new; + + if(net__socket_listen(listener)){ + return 1; + } + listensock_count += listener->sock_count; + listensock_new = mosquitto__realloc(listensock, sizeof(struct mosquitto__listener_sock)*(size_t)listensock_count); + if(!listensock_new){ + return 1; + } + listensock = listensock_new; + + for(i=0; isock_count; i++){ + if(listener->socks[i] == INVALID_SOCKET){ + return 1; + } + listensock[listensock_index].sock = listener->socks[i]; + listensock[listensock_index].listener = listener; +#if defined(WITH_EPOLL) || defined(WITH_KQUEUE) + listensock[listensock_index].ident = id_listener; +#endif + listensock_index++; + } + return MOSQ_ERR_SUCCESS; +} + + +#ifdef WITH_WEBSOCKETS +void listeners__add_websockets(struct lws_context *ws_context, mosq_sock_t fd) +{ + int i; + struct mosquitto__listener *listener = NULL; + struct mosquitto__listener_sock *listensock_new; + + /* Don't add more listeners after we've started the main loop */ + if(run || ws_context == NULL) return; + + /* Find context */ + for(i=0; ilistener_count; i++){ + if(db.config->listeners[i].ws_in_init){ + listener = &db.config->listeners[i]; + break; + } + } + if(listener == NULL){ + return; + } + + listensock_count++; + listensock_new = mosquitto__realloc(listensock, sizeof(struct mosquitto__listener_sock)*(size_t)listensock_count); + if(!listensock_new){ + return; + } + listensock = listensock_new; + + listensock[listensock_index].sock = fd; + listensock[listensock_index].listener = listener; +#if defined(WITH_EPOLL) || defined(WITH_KQUEUE) + listensock[listensock_index].ident = id_listener_ws; +#endif + listensock_index++; +} +#endif + + +int listeners__add_local(const char *host, uint16_t port) +{ + struct mosquitto__listener *listeners; + listeners = db.config->listeners; + + listener__set_defaults(&listeners[db.config->listener_count]); + listeners[db.config->listener_count].security_options.allow_anonymous = true; + listeners[db.config->listener_count].port = port; + listeners[db.config->listener_count].host = mosquitto__strdup(host); + if(listeners[db.config->listener_count].host == NULL){ + return MOSQ_ERR_NOMEM; + } + if(listeners__start_single_mqtt(&listeners[db.config->listener_count])){ + mosquitto__free(listeners[db.config->listener_count].host); + listeners[db.config->listener_count].host = NULL; + return MOSQ_ERR_UNKNOWN; + } + db.config->listener_count++; + return MOSQ_ERR_SUCCESS; +} + + +int listeners__start_local_only(void) +{ + /* Attempt to open listeners bound to 127.0.0.1 and ::1 only */ + int i; + int rc; + struct mosquitto__listener *listeners; + + listeners = mosquitto__realloc(db.config->listeners, 2*sizeof(struct mosquitto__listener)); + if(listeners == NULL){ + return MOSQ_ERR_NOMEM; + } + memset(listeners, 0, 2*sizeof(struct mosquitto__listener)); + db.config->listener_count = 0; + db.config->listeners = listeners; + + log__printf(NULL, MOSQ_LOG_WARNING, "Starting in local only mode. Connections will only be possible from clients running on this machine."); + log__printf(NULL, MOSQ_LOG_WARNING, "Create a configuration file which defines a listener to allow remote access."); + if(db.config->cmd_port_count == 0){ + rc = listeners__add_local("127.0.0.1", 1883); + if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; + rc = listeners__add_local("::1", 1883); + if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; + }else{ + for(i=0; icmd_port_count; i++){ + rc = listeners__add_local("127.0.0.1", db.config->cmd_port[i]); + if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; + rc = listeners__add_local("::1", db.config->cmd_port[i]); + if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; + } + } + + if(db.config->listener_count > 0){ + return MOSQ_ERR_SUCCESS; + }else{ + return MOSQ_ERR_UNKNOWN; + } +} + + +int listeners__start(void) +{ + int i; + + listensock_count = 0; + + if(db.config->listener_count == 0){ + if(listeners__start_local_only()){ + db__close(); + if(db.config->pid_file){ + (void)remove(db.config->pid_file); + } + return 1; + } + return MOSQ_ERR_SUCCESS; + } + + for(i=0; ilistener_count; i++){ + if(db.config->listeners[i].protocol == mp_mqtt){ + if(listeners__start_single_mqtt(&db.config->listeners[i])){ + db__close(); + if(db.config->pid_file){ + (void)remove(db.config->pid_file); + } + return 1; + } + }else if(db.config->listeners[i].protocol == mp_websockets){ +#ifdef WITH_WEBSOCKETS + mosq_websockets_init(&db.config->listeners[i], db.config); + if(!db.config->listeners[i].ws_context){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to create websockets listener on port %d.", db.config->listeners[i].port); + return 1; + } +#endif + } + } + if(listensock == NULL){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to start any listening sockets, exiting."); + return 1; + } + return MOSQ_ERR_SUCCESS; +} + + +void listeners__stop(void) +{ + int i; + + for(i=0; ilistener_count; i++){ +#ifdef WITH_WEBSOCKETS + if(db.config->listeners[i].ws_context){ + lws_context_destroy(db.config->listeners[i].ws_context); + } + mosquitto__free(db.config->listeners[i].ws_protocol); +#endif +#ifdef WITH_UNIX_SOCKETS + if(db.config->listeners[i].unix_socket_path != NULL){ + unlink(db.config->listeners[i].unix_socket_path); + } +#endif + } + + for(i=0; isecurity_options.allow_anonymous = -1; - listener->security_options.allow_zero_length_clientid = true; - listener->protocol = mp_mqtt; - listener->max_connections = -1; - listener->max_qos = 2; - listener->max_topic_alias = 10; - listener->max_topic_alias_broker = 10; -} - - -void listeners__reload_all_certificates(void) -{ -#ifdef WITH_TLS - int i; - int rc; - struct mosquitto__listener *listener; - - for(i=0; ilistener_count; i++){ - listener = &db.config->listeners[i]; - if(listener->ssl_ctx && listener->certfile && listener->keyfile){ - rc = net__load_certificates(listener); - if(rc){ - log__printf(NULL, MOSQ_LOG_ERR, "Error when reloading certificate '%s' or key '%s'.", - listener->certfile, listener->keyfile); - } - } - } -#endif -} - - -int listeners__start_single_mqtt(struct mosquitto__listener *listener) -{ - int i; - struct mosquitto__listener_sock *listensock_new; - - if(net__socket_listen(listener)){ - return 1; - } - listensock_count += listener->sock_count; - listensock_new = mosquitto__realloc(listensock, sizeof(struct mosquitto__listener_sock)*(size_t)listensock_count); - if(!listensock_new){ - return 1; - } - listensock = listensock_new; - - for(i=0; isock_count; i++){ - if(listener->socks[i] == INVALID_SOCKET){ - return 1; - } - listensock[listensock_index].sock = listener->socks[i]; - listensock[listensock_index].listener = listener; -#if defined(WITH_EPOLL) || defined(WITH_KQUEUE) - listensock[listensock_index].ident = id_listener; -#endif - listensock_index++; - } - return MOSQ_ERR_SUCCESS; -} - - -#ifdef WITH_WEBSOCKETS -void listeners__add_websockets(struct lws_context *ws_context, mosq_sock_t fd) -{ - int i; - struct mosquitto__listener *listener = NULL; - struct mosquitto__listener_sock *listensock_new; - - /* Don't add more listeners after we've started the main loop */ - if(run || ws_context == NULL) return; - - /* Find context */ - for(i=0; ilistener_count; i++){ - if(db.config->listeners[i].ws_in_init){ - listener = &db.config->listeners[i]; - break; - } - } - if(listener == NULL){ - return; - } - - listensock_count++; - listensock_new = mosquitto__realloc(listensock, sizeof(struct mosquitto__listener_sock)*(size_t)listensock_count); - if(!listensock_new){ - return; - } - listensock = listensock_new; - - listensock[listensock_index].sock = fd; - listensock[listensock_index].listener = listener; -#if defined(WITH_EPOLL) || defined(WITH_KQUEUE) - listensock[listensock_index].ident = id_listener_ws; -#endif - listensock_index++; -} -#endif - -int listeners__add_local(const char *host, uint16_t port) -{ - struct mosquitto__listener *listeners; - listeners = db.config->listeners; - - listener__set_defaults(&listeners[db.config->listener_count]); - listeners[db.config->listener_count].security_options.allow_anonymous = true; - listeners[db.config->listener_count].port = port; - listeners[db.config->listener_count].host = mosquitto__strdup(host); - if(listeners[db.config->listener_count].host == NULL){ - return MOSQ_ERR_NOMEM; - } - if(listeners__start_single_mqtt(&listeners[db.config->listener_count])){ - mosquitto__free(listeners[db.config->listener_count].host); - listeners[db.config->listener_count].host = NULL; - return MOSQ_ERR_UNKNOWN; - } - db.config->listener_count++; - return MOSQ_ERR_SUCCESS; -} - -int listeners__start_local_only(void) -{ - /* Attempt to open listeners bound to 127.0.0.1 and ::1 only */ - int i; - int rc; - struct mosquitto__listener *listeners; - - listeners = mosquitto__realloc(db.config->listeners, 2*sizeof(struct mosquitto__listener)); - if(listeners == NULL){ - return MOSQ_ERR_NOMEM; - } - memset(listeners, 0, 2*sizeof(struct mosquitto__listener)); - db.config->listener_count = 0; - db.config->listeners = listeners; - - log__printf(NULL, MOSQ_LOG_WARNING, "Starting in local only mode. Connections will only be possible from clients running on this machine."); - log__printf(NULL, MOSQ_LOG_WARNING, "Create a configuration file which defines a listener to allow remote access."); - if(db.config->cmd_port_count == 0){ - rc = listeners__add_local("127.0.0.1", 1883); - if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; - rc = listeners__add_local("::1", 1883); - if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; - }else{ - for(i=0; icmd_port_count; i++){ - rc = listeners__add_local("127.0.0.1", db.config->cmd_port[i]); - if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; - rc = listeners__add_local("::1", db.config->cmd_port[i]); - if(rc == MOSQ_ERR_NOMEM) return MOSQ_ERR_NOMEM; - } - } - - if(db.config->listener_count > 0){ - return MOSQ_ERR_SUCCESS; - }else{ - return MOSQ_ERR_UNKNOWN; - } -} - - -int listeners__start(void) -{ - int i; - - listensock_count = 0; - - if(db.config->listener_count == 0){ - if(listeners__start_local_only()){ - db__close(); - if(db.config->pid_file){ - (void)remove(db.config->pid_file); - } - return 1; - } - return MOSQ_ERR_SUCCESS; - } - - for(i=0; ilistener_count; i++){ - if(db.config->listeners[i].protocol == mp_mqtt){ - if(listeners__start_single_mqtt(&db.config->listeners[i])){ - db__close(); - if(db.config->pid_file){ - (void)remove(db.config->pid_file); - } - return 1; - } - }else if(db.config->listeners[i].protocol == mp_websockets){ -#ifdef WITH_WEBSOCKETS - mosq_websockets_init(&db.config->listeners[i], db.config); - if(!db.config->listeners[i].ws_context){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to create websockets listener on port %d.", db.config->listeners[i].port); - return 1; - } -#endif - } - } - if(listensock == NULL){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to start any listening sockets, exiting."); - return 1; - } - return MOSQ_ERR_SUCCESS; -} - - -void listeners__stop(void) -{ - int i; - - for(i=0; ilistener_count; i++){ -#ifdef WITH_WEBSOCKETS - if(db.config->listeners[i].ws_context){ - lws_context_destroy(db.config->listeners[i].ws_context); - } - mosquitto__free(db.config->listeners[i].ws_protocol); -#endif -#ifdef WITH_UNIX_SOCKETS - if(db.config->listeners[i].unix_socket_path != NULL){ - unlink(db.config->listeners[i].unix_socket_path); - } -#endif - } - - for(i=0; i