From 764b7e0a91ceaceb9c158329a7577b3cf30e9e63 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 23 Jun 2014 17:57:35 +0100 Subject: [PATCH] Use hash functions to store client data. --- lib/mosquitto_internal.h | 11 +- lib/net_mosq.c | 7 + lib/net_mosq.h | 4 + lib/util_mosq.c | 10 +- lib/util_mosq.h | 4 + src/bridge.c | 41 ++---- src/context.c | 23 +--- src/database.c | 17 +-- src/loop.c | 291 +++++++++++++++++++++------------------ src/mosquitto.c | 14 +- src/mosquitto_broker.h | 14 +- src/net.c | 22 +-- src/persist.c | 51 ++----- src/read_handle_server.c | 60 ++++---- src/security_default.c | 72 +++++----- 15 files changed, 283 insertions(+), 358 deletions(-) diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index a81e53a0..5da00f2d 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -54,6 +54,7 @@ Contributors: #include "mosquitto.h" #include "time_mosq.h" #ifdef WITH_BROKER +# include "uthash.h" struct mosquitto_client_msg; #endif @@ -189,7 +190,6 @@ struct mosquitto { struct _mqtt3_listener *listener; time_t disconnect_t; int pollfd_index; - int db_index; struct _mosquitto_packet *out_packet_last; bool is_dropping; # ifdef WITH_WEBSOCKETS @@ -229,6 +229,15 @@ struct mosquitto { ares_channel achan; # endif #endif + +#ifdef WITH_BROKER + UT_hash_handle hh_id; + UT_hash_handle hh_sock; + UT_hash_handle hh_for_free; +# ifdef WITH_BRIDGE + UT_hash_handle hh_bridge; +# endif +#endif }; #endif diff --git a/lib/net_mosq.c b/lib/net_mosq.c index 94d79860..25b4ed6f 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -193,7 +193,11 @@ int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *pa * Returns 1 on failure (context is NULL) * Returns 0 on success. */ +#ifdef WITH_BROKER +int _mosquitto_socket_close(struct mosquitto_db *db, struct mosquitto *mosq) +#else int _mosquitto_socket_close(struct mosquitto *mosq) +#endif { int rc = 0; @@ -211,6 +215,9 @@ int _mosquitto_socket_close(struct mosquitto *mosq) #endif if(mosq->sock != INVALID_SOCKET){ +#ifdef WITH_BROKER + HASH_DELETE(hh_sock, db->contexts_by_sock, mosq); +#endif rc = COMPAT_CLOSE(mosq->sock); mosq->sock = INVALID_SOCKET; } diff --git a/lib/net_mosq.h b/lib/net_mosq.h index 77e14367..b3291402 100644 --- a/lib/net_mosq.h +++ b/lib/net_mosq.h @@ -59,7 +59,11 @@ void _mosquitto_net_cleanup(void); void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet); int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet); int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking); +#ifdef WITH_BROKER +int _mosquitto_socket_close(struct mosquitto_db *db, struct mosquitto *mosq); +#else int _mosquitto_socket_close(struct mosquitto *mosq); +#endif int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking); int _mosquitto_socket_nonblock(int sock); int _mosquitto_socketpair(int *sp1, int *sp2); diff --git a/lib/util_mosq.c b/lib/util_mosq.c index 91e3646b..3a076924 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -77,7 +77,11 @@ int _mosquitto_packet_alloc(struct _mosquitto_packet *packet) return MOSQ_ERR_SUCCESS; } +#ifdef WITH_BROKER +void _mosquitto_check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq) +#else void _mosquitto_check_keepalive(struct mosquitto *mosq) +#endif { time_t last_msg_out; time_t last_msg_in; @@ -94,7 +98,7 @@ void _mosquitto_check_keepalive(struct mosquitto *mosq) && now - mosq->last_msg_out >= mosq->bridge->idle_timeout){ _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id); - _mosquitto_socket_close(mosq); + _mosquitto_socket_close(db, mosq); return; } #endif @@ -119,9 +123,9 @@ void _mosquitto_check_keepalive(struct mosquitto *mosq) assert(mosq->listener->client_count >= 0); } mosq->listener = NULL; -#endif + _mosquitto_socket_close(db, mosq); +#else _mosquitto_socket_close(mosq); -#ifndef WITH_BROKER pthread_mutex_lock(&mosq->state_mutex); if(mosq->state == mosq_cs_disconnecting){ rc = MOSQ_ERR_SUCCESS; diff --git a/lib/util_mosq.h b/lib/util_mosq.h index fc23093e..b737514b 100644 --- a/lib/util_mosq.h +++ b/lib/util_mosq.h @@ -22,7 +22,11 @@ Contributors: #include "mosquitto.h" int _mosquitto_packet_alloc(struct _mosquitto_packet *packet); +#ifdef WITH_BROKER +void _mosquitto_check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq); +#else void _mosquitto_check_keepalive(struct mosquitto *mosq); +#endif uint16_t _mosquitto_mid_generate(struct mosquitto *mosq); int _mosquitto_pub_topic_check(const char *str); int _mosquitto_sub_topic_check(const char *str); diff --git a/src/bridge.c b/src/bridge.c index 9ab6ebc1..03eebbe6 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -44,10 +44,7 @@ Contributors: int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge) { - int i; struct mosquitto *new_context = NULL; - int null_index = -1; - struct mosquitto **tmp_contexts; char hostname[256]; int len; char *id, *local_id; @@ -80,40 +77,17 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge) bridge->local_clientid = local_id; } - /* Search for existing id (possible from persistent db) and also look for a - * gap in the db->contexts[] array in case the id isn't found. */ - for(i=0; icontext_count; i++){ - if(db->contexts[i]){ - if(!strcmp(db->contexts[i]->id, local_id)){ - new_context = db->contexts[i]; - break; - } - }else if(db->contexts[i] == NULL && null_index == -1){ - null_index = i; - break; - } - } - if(!new_context){ + HASH_FIND(hh_id, db->contexts_by_id, local_id, strlen(local_id), new_context); + if(new_context){ + /* (possible from persistent db) */ + }else{ /* id wasn't found, so generate a new context */ new_context = mqtt3_context_init(-1); if(!new_context){ return MOSQ_ERR_NOMEM; } - if(null_index == -1){ - /* There were no gaps in the db->contexts[] array, so need to append. */ - db->context_count++; - tmp_contexts = _mosquitto_realloc(db->contexts, sizeof(struct mosquitto*)*db->context_count); - if(tmp_contexts){ - db->contexts = tmp_contexts; - db->contexts[db->context_count-1] = new_context; - }else{ - _mosquitto_free(new_context); - return MOSQ_ERR_NOMEM; - } - }else{ - db->contexts[null_index] = new_context; - } new_context->id = local_id; + HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, new_context->id, strlen(new_context->id), new_context); } new_context->bridge = bridge; new_context->is_bridge = true; @@ -137,6 +111,8 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge) bridge->try_private_accepted = true; + HASH_ADD_KEYPTR(hh_bridge, db->contexts_bridge, new_context->id, strlen(new_context->id), new_context); + return mqtt3_bridge_connect(db, new_context); } @@ -224,6 +200,7 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) return rc; } + HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context); rc = _mosquitto_send_connect(context, context->keepalive, context->clean_session); if(rc == MOSQ_ERR_SUCCESS){ return MOSQ_ERR_SUCCESS; @@ -237,7 +214,7 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) }else if(rc == MOSQ_ERR_EAI){ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno)); } - _mosquitto_socket_close(context); + _mosquitto_socket_close(db, context); return rc; } } diff --git a/src/context.c b/src/context.c index cf117cf6..44523ef9 100644 --- a/src/context.c +++ b/src/context.c @@ -89,7 +89,6 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b { struct _mosquitto_packet *packet; struct mosquitto_client_msg *msg, *next; - struct _clientid_index_hash *find_cih; if(!context) return; @@ -111,15 +110,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b } } #endif -#ifdef WITH_TLS - if(context->ssl){ - SSL_free(context->ssl); - context->ssl = NULL; - } -#endif - if(context->sock != -1){ - _mosquitto_socket_close(context); - } + _mosquitto_socket_close(db, context); if(context->clean_session && db){ mqtt3_subs_clean_session(db, context, &db->subs); mqtt3_db_messages_delete(context); @@ -132,15 +123,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b assert(db); /* db can only be NULL here if the client hasn't sent a CONNECT and hence wouldn't have an id. */ - // Remove the context's ID from the DB hash - HASH_FIND_STR(db->clientid_index_hash, context->id, find_cih); - if(find_cih){ - // FIXME - internal level debug? _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Found id for client \"%s\", their index was %d.", context->id, find_cih->db_context_index); - HASH_DEL(db->clientid_index_hash, find_cih); - _mosquitto_free(find_cih); - }else{ - // FIXME - internal level debug? _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Unable to find id for client \"%s\".", context->id); - } + HASH_DELETE(hh_id, db->contexts_by_id, context); _mosquitto_free(context->id); context->id = NULL; } @@ -194,6 +177,6 @@ void mqtt3_context_disconnect(struct mosquitto_db *db, struct mosquitto *ctxt) db->disconnected_count++; } #endif - _mosquitto_socket_close(ctxt); + _mosquitto_socket_close(db, ctxt); } diff --git a/src/database.c b/src/database.c index 8e8f4c7a..b53ad99b 100644 --- a/src/database.c +++ b/src/database.c @@ -39,10 +39,11 @@ int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db) db->last_db_id = 0; - db->context_count = 1; - db->contexts = _mosquitto_malloc(sizeof(struct mosquitto*)*db->context_count); - if(!db->contexts) return MOSQ_ERR_NOMEM; - db->contexts[0] = NULL; + db->contexts_by_id = NULL; + db->contexts_by_sock = NULL; + db->contexts_for_free = NULL; + db->contexts_bridge = NULL; + // Initialize the hashtable db->clientid_index_hash = NULL; @@ -610,18 +611,14 @@ int mqtt3_db_message_reconnect_reset(struct mosquitto *context) int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout) { - int i; time_t threshold; enum mosquitto_msg_state new_state; - struct mosquitto *context; + struct mosquitto *context, *ctxt_tmp; struct mosquitto_client_msg *msg; threshold = mosquitto_time() - timeout; - - for(i=0; icontext_count; i++){ - context = db->contexts[i]; - if(!context) continue; + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ msg = context->msgs; while(msg){ new_state = mosq_ms_invalid; diff --git a/src/loop.c b/src/loop.c index ba9d233e..f32391e0 100644 --- a/src/loop.c +++ b/src/loop.c @@ -63,6 +63,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock time_t now_time; int time_count; int fdcount; + struct mosquitto *context, *ctxt_tmp; #ifndef WIN32 sigset_t sigblock, origsig; #endif @@ -74,21 +75,32 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock int bridge_sock; int rc; #endif + int context_count; + time_t expiration_check_time = 0; #ifndef WIN32 sigemptyset(&sigblock); sigaddset(&sigblock, SIGINT); #endif + if(db->config->persistent_client_expiration > 0){ + expiration_check_time = time(NULL) + db->config->persistent_client_expiration; + } + while(run){ + HASH_ITER(hh_for_free, db->contexts_for_free, context, ctxt_tmp){ + HASH_DELETE(hh_for_free, db->contexts_for_free, context); + mqtt3_context_cleanup(db, context, true); + } #ifdef WITH_SYS_TREE if(db->config->sys_interval > 0){ mqtt3_db_sys_update(db, db->config->sys_interval, start_time); } #endif - if(listensock_count + db->context_count > pollfd_count || !pollfds){ - pollfd_count = listensock_count + db->context_count; + context_count = HASH_CNT(hh_sock, db->contexts_by_sock); + if(listensock_count + context_count > pollfd_count || !pollfds){ + pollfd_count = listensock_count + context_count; pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count); if(!pollfds){ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); @@ -109,130 +121,137 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock now_time = time(NULL); time_count = 0; - for(i=0; icontext_count; i++){ - if(db->contexts[i]){ + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(time_count > 0){ + time_count--; + }else{ + time_count = 1000; + now = mosquitto_time(); + } + context->pollfd_index = -1; + + if(context->sock != INVALID_SOCKET){ +#ifdef WITH_BRIDGE + if(context->bridge){ + _mosquitto_check_keepalive(db, context); + if(context->bridge->round_robin == false + && context->bridge->cur_address != 0 + && now > context->bridge->primary_retry){ + + /* FIXME - this should be non-blocking */ + if(_mosquitto_try_connect(context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, true) == MOSQ_ERR_SUCCESS){ + COMPAT_CLOSE(bridge_sock); + _mosquitto_socket_close(db, context); + context->bridge->cur_address = context->bridge->address_count-1; + } + } + } +#endif + + /* Local bridges never time out in this fashion. */ + if(!(context->keepalive) + || context->bridge + || now - context->last_msg_in < (time_t)(context->keepalive)*3/2){ + + if(mqtt3_db_message_write(context) == MOSQ_ERR_SUCCESS){ + pollfds[pollfd_index].fd = context->sock; + pollfds[pollfd_index].events = POLLIN; + pollfds[pollfd_index].revents = 0; + if(context->current_out_packet){ + pollfds[pollfd_index].events |= POLLOUT; + } + context->pollfd_index = pollfd_index; + pollfd_index++; + }else{ + mqtt3_context_disconnect(db, context); + } + }else{ + if(db->config->connection_messages == true){ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", context->id); + } + /* Client has exceeded keepalive*1.5 */ + mqtt3_context_disconnect(db, context); + } + } + } + +#ifdef WITH_BRIDGE + time_count = 0; + HASH_ITER(hh_bridge, db->contexts_bridge, context, ctxt_tmp){ + if(context->sock == INVALID_SOCKET){ if(time_count > 0){ time_count--; }else{ time_count = 1000; now = mosquitto_time(); } - db->contexts[i]->pollfd_index = -1; - - if(db->contexts[i]->sock != INVALID_SOCKET){ -#ifdef WITH_BRIDGE - if(db->contexts[i]->bridge){ - _mosquitto_check_keepalive(db->contexts[i]); - if(db->contexts[i]->bridge->round_robin == false - && db->contexts[i]->bridge->cur_address != 0 - && now > db->contexts[i]->bridge->primary_retry){ - - /* FIXME - this should be non-blocking */ - if(_mosquitto_try_connect(db->contexts[i]->bridge->addresses[0].address, db->contexts[i]->bridge->addresses[0].port, &bridge_sock, NULL, true) == MOSQ_ERR_SUCCESS){ - COMPAT_CLOSE(bridge_sock); - _mosquitto_socket_close(db->contexts[i]); - db->contexts[i]->bridge->cur_address = db->contexts[i]->bridge->address_count-1; + /* Want to try to restart the bridge connection */ + if(!context->bridge->restart_t){ + context->bridge->restart_t = now+context->bridge->restart_timeout; + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } + if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ + context->bridge->primary_retry = now + 5; + } + }else{ + if(context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect){ + rc = mqtt3_bridge_connect(db, context); + if(rc){ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; } } } -#endif - - /* Local bridges never time out in this fashion. */ - if(!(db->contexts[i]->keepalive) - || db->contexts[i]->bridge - || now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){ - - if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){ - pollfds[pollfd_index].fd = db->contexts[i]->sock; + if(context->bridge->start_type == bst_automatic && now > context->bridge->restart_t){ + context->bridge->restart_t = 0; + rc = mqtt3_bridge_connect(db, context); + if(rc == MOSQ_ERR_SUCCESS){ + pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; - if(db->contexts[i]->current_out_packet){ + if(context->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } - db->contexts[i]->pollfd_index = pollfd_index; + context->pollfd_index = pollfd_index; pollfd_index++; }else{ - mqtt3_context_disconnect(db, db->contexts[i]); - } - }else{ - if(db->config->connection_messages == true){ - _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->contexts[i]->id); - } - /* Client has exceeded keepalive*1.5 */ - mqtt3_context_disconnect(db, db->contexts[i]); - } - }else{ -#ifdef WITH_BRIDGE - if(db->contexts[i]->bridge){ - /* Want to try to restart the bridge connection */ - if(!db->contexts[i]->bridge->restart_t){ - db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout; - db->contexts[i]->bridge->cur_address++; - if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){ - db->contexts[i]->bridge->cur_address = 0; - } - if(db->contexts[i]->bridge->round_robin == false && db->contexts[i]->bridge->cur_address != 0){ - db->contexts[i]->bridge->primary_retry = now + 5; - } - }else{ - if(db->contexts[i]->bridge->start_type == bst_lazy && db->contexts[i]->bridge->lazy_reconnect){ - rc = mqtt3_bridge_connect(db, db->contexts[i]); - if(rc){ - db->contexts[i]->bridge->cur_address++; - if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){ - db->contexts[i]->bridge->cur_address = 0; - } - } - } - if(db->contexts[i]->bridge->start_type == bst_automatic && now > db->contexts[i]->bridge->restart_t){ - db->contexts[i]->bridge->restart_t = 0; - rc = mqtt3_bridge_connect(db, db->contexts[i]); - if(rc == MOSQ_ERR_SUCCESS){ - pollfds[pollfd_index].fd = db->contexts[i]->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(db->contexts[i]->current_out_packet){ - pollfds[pollfd_index].events |= POLLOUT; - } - db->contexts[i]->pollfd_index = pollfd_index; - pollfd_index++; - }else{ - /* Retry later. */ - db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout; - - db->contexts[i]->bridge->cur_address++; - if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){ - db->contexts[i]->bridge->cur_address = 0; - } - } + /* Retry later. */ + context->bridge->restart_t = now+context->bridge->restart_timeout; + + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; } } - }else{ + } + } + } + } #endif - if(db->contexts[i]->clean_session == true){ - mqtt3_context_cleanup(db, db->contexts[i], true); - db->contexts[i] = NULL; - }else if(db->config->persistent_client_expiration > 0){ - /* This is a persistent client, check to see if the - * last time it connected was longer than - * persistent_client_expiration seconds ago. If so, - * expire it and clean up. - */ - if(now_time > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){ - _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db->contexts[i]->id); + now_time = time(NULL); + if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){ + HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ + if(context->sock == -1 && context->clean_session == 0){ + /* This is a persistent client, check to see if the + * last time it connected was longer than + * persistent_client_expiration seconds ago. If so, + * expire it and clean up. + */ + if(now_time > context->disconnect_t+db->config->persistent_client_expiration){ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", context->id); #ifdef WITH_SYS_TREE - g_clients_expired++; + g_clients_expired++; #endif - db->contexts[i]->clean_session = true; - mqtt3_context_cleanup(db, db->contexts[i], true); - db->contexts[i] = NULL; - } - } -#ifdef WITH_BRIDGE + context->clean_session = true; + mqtt3_context_cleanup(db, context, true); + context = NULL; } -#endif } } + expiration_check_time = time(NULL) + db->config->persistent_client_expiration; } mqtt3_db_message_timeout_check(db, db->config->retry_interval); @@ -319,16 +338,16 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock return MOSQ_ERR_SUCCESS; } -static void do_disconnect(struct mosquitto_db *db, int context_index) +static void do_disconnect(struct mosquitto_db *db, struct mosquitto *context) { if(db->config->connection_messages == true){ - if(db->contexts[context_index]->state != mosq_cs_disconnecting){ - _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", db->contexts[context_index]->id); + if(context->state != mosq_cs_disconnecting){ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", context->id); }else{ - _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", db->contexts[context_index]->id); + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", context->id); } } - mqtt3_context_disconnect(db, db->contexts[context_index]); + mqtt3_context_disconnect(db, context); } /* Error ocurred, probably an fd has been closed. @@ -336,53 +355,51 @@ static void do_disconnect(struct mosquitto_db *db, int context_index) */ static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds) { - int i; + struct mosquitto *context, *ctxt_tmp; - for(i=0; icontext_count; i++){ - if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){ - if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){ - do_disconnect(db, i); - } + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL)){ + do_disconnect(db, context); } } } static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds) { - int i; + struct mosquitto *context, *ctxt_tmp; - for(i=0; icontext_count; i++){ - if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){ - assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock); + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + assert(pollfds[context->pollfd_index].fd == context->sock); #ifdef WITH_TLS - if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT || - db->contexts[i]->want_write || - (db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){ + if(pollfds[context->pollfd_index].revents & POLLOUT || + context->want_write || + (context->ssl && context->state == mosq_cs_new)){ #else - if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT){ + if(pollfds[context->pollfd_index].revents & POLLOUT){ #endif - if(_mosquitto_packet_write(db->contexts[i])){ - do_disconnect(db, i); - } + if(_mosquitto_packet_write(context)){ + do_disconnect(db, context); + continue; } } - if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){ - assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock); + } + + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ #ifdef WITH_TLS - if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN || - (db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){ + if(pollfds[context->pollfd_index].revents & POLLIN || + (context->ssl && context->state == mosq_cs_new)){ #else - if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN){ + if(pollfds[context->pollfd_index].revents & POLLIN){ #endif - if(_mosquitto_packet_read(db, db->contexts[i])){ - do_disconnect(db, i); - } + if(_mosquitto_packet_read(db, context)){ + do_disconnect(db, context); + continue; } } - if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){ - if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){ - do_disconnect(db, i); - } + + if(pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL)){ + do_disconnect(db, context); + continue; } } } diff --git a/src/mosquitto.c b/src/mosquitto.c index b7bfd230..8d7f7255 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -161,6 +161,7 @@ int main(int argc, char *argv[]) #else struct timeval tv; #endif + struct mosquitto *ctxt, *ctxt_tmp; #if defined(WIN32) || defined(__CYGWIN__) if(argc == 2){ @@ -265,7 +266,6 @@ int main(int argc, char *argv[]) for(i=0; icontexts array */ - int db_context_index; - UT_hash_handle hh; -}; - struct mosquitto_db{ dbid_t last_db_id; struct _mosquitto_subhier subs; @@ -212,9 +204,11 @@ struct mosquitto_db{ struct _mosquitto_acl_user *acl_list; struct _mosquitto_acl *acl_patterns; struct _mosquitto_unpwd *psk_id; - struct mosquitto **contexts; + struct mosquitto *contexts_by_id; + struct mosquitto *contexts_by_sock; + struct mosquitto *contexts_for_free; + struct mosquitto *contexts_bridge; struct _clientid_index_hash *clientid_index_hash; - int context_count; struct mosquitto_msg_store *msg_store; int msg_store_count; struct mqtt3_config *config; diff --git a/src/net.c b/src/net.c index 44f5dd21..6a46b2d5 100644 --- a/src/net.c +++ b/src/net.c @@ -68,7 +68,6 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock) int i; int j; int new_sock = -1; - struct mosquitto **tmp_contexts = NULL; struct mosquitto *new_context; #ifdef WITH_TLS BIO *bio; @@ -172,26 +171,7 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock) #endif _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "New connection from %s on port %d.", new_context->address, new_context->listener->port); - for(i=0; icontext_count; i++){ - if(db->contexts[i] == NULL){ - db->contexts[i] = new_context; - break; - } - } - if(i==db->context_count){ - tmp_contexts = _mosquitto_realloc(db->contexts, sizeof(struct mosquitto*)*(db->context_count+1)); - if(tmp_contexts){ - db->context_count++; - db->contexts = tmp_contexts; - db->contexts[i] = new_context; - }else{ - // Out of memory - mqtt3_context_cleanup(NULL, new_context, true); - return -1; - } - } - // If we got here then the context's DB index is "i" regardless of how we got here - new_context->db_index = i; + HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(new_context->sock), new_context); return new_sock; } diff --git a/src/persist.c b/src/persist.c index d47a4182..84b0442f 100644 --- a/src/persist.c +++ b/src/persist.c @@ -42,41 +42,21 @@ static int _db_restore_sub(struct mosquitto_db *db, const char *client_id, const static struct mosquitto *_db_find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid) { struct mosquitto *context; - struct mosquitto **tmp_contexts; - int i; context = NULL; - for(i=0; icontext_count; i++){ - if(db->contexts[i] && !strcmp(db->contexts[i]->id, client_id)){ - context = db->contexts[i]; - break; - } - } + HASH_FIND(hh_id, db->contexts_by_id, client_id, strlen(client_id), context); if(!context){ context = mqtt3_context_init(-1); if(!context) return NULL; + context->id = _mosquitto_strdup(client_id); + if(!context){ + _mosquitto_free(context); + return NULL; + } context->clean_session = false; - for(i=0; icontext_count; i++){ - if(!db->contexts[i]){ - db->contexts[i] = context; - break; - } - } - if(i==db->context_count){ - db->context_count++; - tmp_contexts = _mosquitto_realloc(db->contexts, sizeof(struct mosquitto*)*db->context_count); - if(tmp_contexts){ - db->contexts = tmp_contexts; - db->contexts[db->context_count-1] = context; - }else{ - mqtt3_context_cleanup(db, context, true); - return NULL; - } - } - context->id = _mosquitto_strdup(client_id); - context->db_index = i; + HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, context->id, strlen(context->id), context); } if(last_mid){ context->last_mid = last_mid; @@ -224,8 +204,7 @@ error: static int mqtt3_db_client_write(struct mosquitto_db *db, FILE *db_fptr) { - int i; - struct mosquitto *context; + struct mosquitto *context, *ctxt_tmp; uint16_t i16temp, slen; uint32_t length; time_t disconnect_t; @@ -233,8 +212,7 @@ static int mqtt3_db_client_write(struct mosquitto_db *db, FILE *db_fptr) assert(db); assert(db_fptr); - for(i=0; icontext_count; i++){ - context = db->contexts[i]; + HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ if(context && context->clean_session == false){ length = htonl(2+strlen(context->id) + sizeof(uint16_t) + sizeof(time_t)); @@ -480,7 +458,6 @@ static int _db_client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) int rc = 0; struct mosquitto *context; time_t disconnect_t; - struct _clientid_index_hash *new_cih; read_e(db_fptr, &i16temp, sizeof(uint16_t)); slen = ntohs(i16temp); @@ -515,16 +492,6 @@ static int _db_client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) _mosquitto_free(client_id); - if(!rc){ - new_cih = _mosquitto_malloc(sizeof(struct _clientid_index_hash)); - if(!new_cih){ - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - new_cih->id = context->id; - new_cih->db_context_index = context->db_index; - HASH_ADD_KEYPTR(hh, db->clientid_index_hash, context->id, strlen(context->id), new_cih); - } return rc; error: _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno)); diff --git a/src/read_handle_server.c b/src/read_handle_server.c index 36e0abc6..9441c469 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -87,14 +87,13 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) int rc; struct _mosquitto_acl_user *acl_tail; struct mosquitto_client_msg *msg_tail, *msg_prev; + struct mosquitto *found_context; int slen; #ifdef WITH_TLS X509 *client_cert; X509_NAME *name; X509_NAME_ENTRY *name_entry; #endif - struct _clientid_index_hash *find_cih; - struct _clientid_index_hash *new_cih; #ifdef WITH_SYS_TREE g_connection_count++; @@ -396,11 +395,10 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) #endif /* Find if this client already has an entry. This must be done *after* any security checks. */ - HASH_FIND_STR(db->clientid_index_hash, client_id, find_cih); - if(find_cih){ - i = find_cih->db_context_index; + HASH_FIND(hh_id, db->contexts_by_id, client_id, strlen(client_id), found_context); + if(found_context){ /* Found a matching client */ - if(db->contexts[i]->sock == -1){ + if(found_context->sock == -1){ /* Client is reconnecting after a disconnect */ /* FIXME - does anything else need to be done here? */ #ifdef WITH_SYS_TREE @@ -417,33 +415,39 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) connect_ack |= 0x01; } } - db->contexts[i]->clean_session = clean_session; - mqtt3_context_cleanup(db, db->contexts[i], false); - db->contexts[i]->state = mosq_cs_connected; + found_context->clean_session = clean_session; + mqtt3_context_cleanup(db, found_context, false); + found_context->state = mosq_cs_connected; if(context->address){ - db->contexts[i]->address = _mosquitto_strdup(context->address); + //found_context->address = _mosquitto_strdup(context->address); + found_context->address = context->address; + context->address = NULL; }else{ - db->contexts[i]->address = NULL; - } - db->contexts[i]->disconnect_t = 0; - db->contexts[i]->sock = context->sock; - db->contexts[i]->listener = context->listener; - db->contexts[i]->last_msg_in = mosquitto_time(); - db->contexts[i]->last_msg_out = mosquitto_time(); - db->contexts[i]->keepalive = context->keepalive; - db->contexts[i]->pollfd_index = context->pollfd_index; + found_context->address = NULL; + } + found_context->disconnect_t = 0; + HASH_DELETE(hh_sock, db->contexts_by_sock, context); + found_context->sock = context->sock; + found_context->listener = context->listener; + context->listener = NULL; + found_context->last_msg_in = mosquitto_time(); + found_context->last_msg_out = mosquitto_time(); + found_context->keepalive = context->keepalive; + found_context->pollfd_index = context->pollfd_index; #ifdef WITH_TLS - db->contexts[i]->ssl = context->ssl; + found_context->ssl = context->ssl; #endif if(context->username){ - db->contexts[i]->username = _mosquitto_strdup(context->username); + found_context->username = _mosquitto_strdup(context->username); } context->sock = -1; #ifdef WITH_TLS context->ssl = NULL; #endif context->state = mosq_cs_disconnecting; - context = db->contexts[i]; + HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(context), context); + context = found_context; + HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context); if(context->msgs){ mqtt3_db_message_reconnect_reset(context); } @@ -536,17 +540,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) } } - // Add the client ID to the DB hash table here - new_cih = _mosquitto_malloc(sizeof(struct _clientid_index_hash)); - if(!new_cih){ - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - mqtt3_context_disconnect(db, context); - rc = MOSQ_ERR_NOMEM; - goto handle_connect_error; - } - new_cih->id = context->id; - new_cih->db_context_index = context->db_index; - HASH_ADD_KEYPTR(hh, db->clientid_index_hash, context->id, strlen(context->id), new_cih); + HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, context->id, strlen(context->id), context); #ifdef WITH_PERSISTENCE if(!clean_session){ diff --git a/src/security_default.c b/src/security_default.c index 58ea526c..db8d5f59 100644 --- a/src/security_default.c +++ b/src/security_default.c @@ -434,7 +434,7 @@ static void _free_acl(struct _mosquitto_acl *acl) static int _acl_cleanup(struct mosquitto_db *db, bool reload) { - int i; + struct mosquitto *context, *ctxt_tmp; struct _mosquitto_acl_user *user_tail; if(!db) return MOSQ_ERR_INVAL; @@ -446,12 +446,8 @@ static int _acl_cleanup(struct mosquitto_db *db, bool reload) * is called if we are reloading the config. If this is not done, all * access will be denied to currently connected clients. */ - if(db->contexts){ - for(i=0; icontext_count; i++){ - if(db->contexts[i] && db->contexts[i]->acl_list){ - db->contexts[i]->acl_list = NULL; - } - } + HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ + context->acl_list = NULL; } while(db->acl_list){ @@ -695,49 +691,45 @@ static int _unpwd_cleanup(struct _mosquitto_unpwd **root, bool reload) */ int mosquitto_security_apply_default(struct mosquitto_db *db) { + struct mosquitto *context, *ctxt_tmp; struct _mosquitto_acl_user *acl_user_tail; bool allow_anonymous; - int i; if(!db) return MOSQ_ERR_INVAL; allow_anonymous = db->config->allow_anonymous; - if(db->contexts){ - for(i=0; icontext_count; i++){ - if(db->contexts[i]){ - /* Check for anonymous clients when allow_anonymous is false */ - if(!allow_anonymous && !db->contexts[i]->username){ - db->contexts[i]->state = mosq_cs_disconnecting; - _mosquitto_socket_close(db->contexts[i]); - continue; - } - /* Check for connected clients that are no longer authorised */ - if(mosquitto_unpwd_check_default(db, db->contexts[i]->username, db->contexts[i]->password) != MOSQ_ERR_SUCCESS){ - db->contexts[i]->state = mosq_cs_disconnecting; - _mosquitto_socket_close(db->contexts[i]); - continue; - } - /* Check for ACLs and apply to user. */ - if(db->acl_list){ - acl_user_tail = db->acl_list; - while(acl_user_tail){ - if(acl_user_tail->username){ - if(db->contexts[i]->username){ - if(!strcmp(acl_user_tail->username, db->contexts[i]->username)){ - db->contexts[i]->acl_list = acl_user_tail; - break; - } - } - }else{ - if(!db->contexts[i]->username){ - db->contexts[i]->acl_list = acl_user_tail; - break; - } + HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ + /* Check for anonymous clients when allow_anonymous is false */ + if(!allow_anonymous && !context->username){ + context->state = mosq_cs_disconnecting; + _mosquitto_socket_close(db, context); + continue; + } + /* Check for connected clients that are no longer authorised */ + if(mosquitto_unpwd_check_default(db, context->username, context->password) != MOSQ_ERR_SUCCESS){ + context->state = mosq_cs_disconnecting; + _mosquitto_socket_close(db, context); + continue; + } + /* Check for ACLs and apply to user. */ + if(db->acl_list){ + acl_user_tail = db->acl_list; + while(acl_user_tail){ + if(acl_user_tail->username){ + if(context->username){ + if(!strcmp(acl_user_tail->username, context->username)){ + context->acl_list = acl_user_tail; + break; } - acl_user_tail = acl_user_tail->next; + } + }else{ + if(!context->username){ + context->acl_list = acl_user_tail; + break; } } + acl_user_tail = acl_user_tail->next; } } }