Use hash functions to store client data.

pull/211/merge
Roger A. Light 11 years ago
parent 7911db1a1e
commit 764b7e0a91

@ -54,6 +54,7 @@ Contributors:
#include "mosquitto.h"
#include "time_mosq.h"
#ifdef WITH_BROKER
# include "uthash.h"
struct mosquitto_client_msg;
#endif
@ -189,7 +190,6 @@ struct mosquitto {
struct _mqtt3_listener *listener;
time_t disconnect_t;
int pollfd_index;
int db_index;
struct _mosquitto_packet *out_packet_last;
bool is_dropping;
# ifdef WITH_WEBSOCKETS
@ -229,6 +229,15 @@ struct mosquitto {
ares_channel achan;
# endif
#endif
#ifdef WITH_BROKER
UT_hash_handle hh_id;
UT_hash_handle hh_sock;
UT_hash_handle hh_for_free;
# ifdef WITH_BRIDGE
UT_hash_handle hh_bridge;
# endif
#endif
};
#endif

@ -193,7 +193,11 @@ int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *pa
* Returns 1 on failure (context is NULL)
* Returns 0 on success.
*/
#ifdef WITH_BROKER
int _mosquitto_socket_close(struct mosquitto_db *db, struct mosquitto *mosq)
#else
int _mosquitto_socket_close(struct mosquitto *mosq)
#endif
{
int rc = 0;
@ -211,6 +215,9 @@ int _mosquitto_socket_close(struct mosquitto *mosq)
#endif
if(mosq->sock != INVALID_SOCKET){
#ifdef WITH_BROKER
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
#endif
rc = COMPAT_CLOSE(mosq->sock);
mosq->sock = INVALID_SOCKET;
}

@ -59,7 +59,11 @@ void _mosquitto_net_cleanup(void);
void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet);
int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet);
int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking);
#ifdef WITH_BROKER
int _mosquitto_socket_close(struct mosquitto_db *db, struct mosquitto *mosq);
#else
int _mosquitto_socket_close(struct mosquitto *mosq);
#endif
int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking);
int _mosquitto_socket_nonblock(int sock);
int _mosquitto_socketpair(int *sp1, int *sp2);

@ -77,7 +77,11 @@ int _mosquitto_packet_alloc(struct _mosquitto_packet *packet)
return MOSQ_ERR_SUCCESS;
}
#ifdef WITH_BROKER
void _mosquitto_check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq)
#else
void _mosquitto_check_keepalive(struct mosquitto *mosq)
#endif
{
time_t last_msg_out;
time_t last_msg_in;
@ -94,7 +98,7 @@ void _mosquitto_check_keepalive(struct mosquitto *mosq)
&& now - mosq->last_msg_out >= mosq->bridge->idle_timeout){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id);
_mosquitto_socket_close(mosq);
_mosquitto_socket_close(db, mosq);
return;
}
#endif
@ -119,9 +123,9 @@ void _mosquitto_check_keepalive(struct mosquitto *mosq)
assert(mosq->listener->client_count >= 0);
}
mosq->listener = NULL;
#endif
_mosquitto_socket_close(db, mosq);
#else
_mosquitto_socket_close(mosq);
#ifndef WITH_BROKER
pthread_mutex_lock(&mosq->state_mutex);
if(mosq->state == mosq_cs_disconnecting){
rc = MOSQ_ERR_SUCCESS;

@ -22,7 +22,11 @@ Contributors:
#include "mosquitto.h"
int _mosquitto_packet_alloc(struct _mosquitto_packet *packet);
#ifdef WITH_BROKER
void _mosquitto_check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq);
#else
void _mosquitto_check_keepalive(struct mosquitto *mosq);
#endif
uint16_t _mosquitto_mid_generate(struct mosquitto *mosq);
int _mosquitto_pub_topic_check(const char *str);
int _mosquitto_sub_topic_check(const char *str);

@ -44,10 +44,7 @@ Contributors:
int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge)
{
int i;
struct mosquitto *new_context = NULL;
int null_index = -1;
struct mosquitto **tmp_contexts;
char hostname[256];
int len;
char *id, *local_id;
@ -80,40 +77,17 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge)
bridge->local_clientid = local_id;
}
/* Search for existing id (possible from persistent db) and also look for a
* gap in the db->contexts[] array in case the id isn't found. */
for(i=0; i<db->context_count; i++){
if(db->contexts[i]){
if(!strcmp(db->contexts[i]->id, local_id)){
new_context = db->contexts[i];
break;
}
}else if(db->contexts[i] == NULL && null_index == -1){
null_index = i;
break;
}
}
if(!new_context){
HASH_FIND(hh_id, db->contexts_by_id, local_id, strlen(local_id), new_context);
if(new_context){
/* (possible from persistent db) */
}else{
/* id wasn't found, so generate a new context */
new_context = mqtt3_context_init(-1);
if(!new_context){
return MOSQ_ERR_NOMEM;
}
if(null_index == -1){
/* There were no gaps in the db->contexts[] array, so need to append. */
db->context_count++;
tmp_contexts = _mosquitto_realloc(db->contexts, sizeof(struct mosquitto*)*db->context_count);
if(tmp_contexts){
db->contexts = tmp_contexts;
db->contexts[db->context_count-1] = new_context;
}else{
_mosquitto_free(new_context);
return MOSQ_ERR_NOMEM;
}
}else{
db->contexts[null_index] = new_context;
}
new_context->id = local_id;
HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, new_context->id, strlen(new_context->id), new_context);
}
new_context->bridge = bridge;
new_context->is_bridge = true;
@ -137,6 +111,8 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge)
bridge->try_private_accepted = true;
HASH_ADD_KEYPTR(hh_bridge, db->contexts_bridge, new_context->id, strlen(new_context->id), new_context);
return mqtt3_bridge_connect(db, new_context);
}
@ -224,6 +200,7 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
return rc;
}
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
rc = _mosquitto_send_connect(context, context->keepalive, context->clean_session);
if(rc == MOSQ_ERR_SUCCESS){
return MOSQ_ERR_SUCCESS;
@ -237,7 +214,7 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context)
}else if(rc == MOSQ_ERR_EAI){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
}
_mosquitto_socket_close(context);
_mosquitto_socket_close(db, context);
return rc;
}
}

@ -89,7 +89,6 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b
{
struct _mosquitto_packet *packet;
struct mosquitto_client_msg *msg, *next;
struct _clientid_index_hash *find_cih;
if(!context) return;
@ -111,15 +110,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b
}
}
#endif
#ifdef WITH_TLS
if(context->ssl){
SSL_free(context->ssl);
context->ssl = NULL;
}
#endif
if(context->sock != -1){
_mosquitto_socket_close(context);
}
_mosquitto_socket_close(db, context);
if(context->clean_session && db){
mqtt3_subs_clean_session(db, context, &db->subs);
mqtt3_db_messages_delete(context);
@ -132,15 +123,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b
assert(db); /* db can only be NULL here if the client hasn't sent a
CONNECT and hence wouldn't have an id. */
// Remove the context's ID from the DB hash
HASH_FIND_STR(db->clientid_index_hash, context->id, find_cih);
if(find_cih){
// FIXME - internal level debug? _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Found id for client \"%s\", their index was %d.", context->id, find_cih->db_context_index);
HASH_DEL(db->clientid_index_hash, find_cih);
_mosquitto_free(find_cih);
}else{
// FIXME - internal level debug? _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Unable to find id for client \"%s\".", context->id);
}
HASH_DELETE(hh_id, db->contexts_by_id, context);
_mosquitto_free(context->id);
context->id = NULL;
}
@ -194,6 +177,6 @@ void mqtt3_context_disconnect(struct mosquitto_db *db, struct mosquitto *ctxt)
db->disconnected_count++;
}
#endif
_mosquitto_socket_close(ctxt);
_mosquitto_socket_close(db, ctxt);
}

@ -39,10 +39,11 @@ int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db)
db->last_db_id = 0;
db->context_count = 1;
db->contexts = _mosquitto_malloc(sizeof(struct mosquitto*)*db->context_count);
if(!db->contexts) return MOSQ_ERR_NOMEM;
db->contexts[0] = NULL;
db->contexts_by_id = NULL;
db->contexts_by_sock = NULL;
db->contexts_for_free = NULL;
db->contexts_bridge = NULL;
// Initialize the hashtable
db->clientid_index_hash = NULL;
@ -610,18 +611,14 @@ int mqtt3_db_message_reconnect_reset(struct mosquitto *context)
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout)
{
int i;
time_t threshold;
enum mosquitto_msg_state new_state;
struct mosquitto *context;
struct mosquitto *context, *ctxt_tmp;
struct mosquitto_client_msg *msg;
threshold = mosquitto_time() - timeout;
for(i=0; i<db->context_count; i++){
context = db->contexts[i];
if(!context) continue;
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
msg = context->msgs;
while(msg){
new_state = mosq_ms_invalid;

@ -63,6 +63,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
time_t now_time;
int time_count;
int fdcount;
struct mosquitto *context, *ctxt_tmp;
#ifndef WIN32
sigset_t sigblock, origsig;
#endif
@ -74,21 +75,32 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
int bridge_sock;
int rc;
#endif
int context_count;
time_t expiration_check_time = 0;
#ifndef WIN32
sigemptyset(&sigblock);
sigaddset(&sigblock, SIGINT);
#endif
if(db->config->persistent_client_expiration > 0){
expiration_check_time = time(NULL) + db->config->persistent_client_expiration;
}
while(run){
HASH_ITER(hh_for_free, db->contexts_for_free, context, ctxt_tmp){
HASH_DELETE(hh_for_free, db->contexts_for_free, context);
mqtt3_context_cleanup(db, context, true);
}
#ifdef WITH_SYS_TREE
if(db->config->sys_interval > 0){
mqtt3_db_sys_update(db, db->config->sys_interval, start_time);
}
#endif
if(listensock_count + db->context_count > pollfd_count || !pollfds){
pollfd_count = listensock_count + db->context_count;
context_count = HASH_CNT(hh_sock, db->contexts_by_sock);
if(listensock_count + context_count > pollfd_count || !pollfds){
pollfd_count = listensock_count + context_count;
pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count);
if(!pollfds){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
@ -109,130 +121,137 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
now_time = time(NULL);
time_count = 0;
for(i=0; i<db->context_count; i++){
if(db->contexts[i]){
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
context->pollfd_index = -1;
if(context->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
if(context->bridge){
_mosquitto_check_keepalive(db, context);
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& now > context->bridge->primary_retry){
/* FIXME - this should be non-blocking */
if(_mosquitto_try_connect(context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, true) == MOSQ_ERR_SUCCESS){
COMPAT_CLOSE(bridge_sock);
_mosquitto_socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}
}
}
#endif
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in < (time_t)(context->keepalive)*3/2){
if(mqtt3_db_message_write(context) == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
}else{
mqtt3_context_disconnect(db, context);
}
}else{
if(db->config->connection_messages == true){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", context->id);
}
/* Client has exceeded keepalive*1.5 */
mqtt3_context_disconnect(db, context);
}
}
}
#ifdef WITH_BRIDGE
time_count = 0;
HASH_ITER(hh_bridge, db->contexts_bridge, context, ctxt_tmp){
if(context->sock == INVALID_SOCKET){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
db->contexts[i]->pollfd_index = -1;
if(db->contexts[i]->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
if(db->contexts[i]->bridge){
_mosquitto_check_keepalive(db->contexts[i]);
if(db->contexts[i]->bridge->round_robin == false
&& db->contexts[i]->bridge->cur_address != 0
&& now > db->contexts[i]->bridge->primary_retry){
/* FIXME - this should be non-blocking */
if(_mosquitto_try_connect(db->contexts[i]->bridge->addresses[0].address, db->contexts[i]->bridge->addresses[0].port, &bridge_sock, NULL, true) == MOSQ_ERR_SUCCESS){
COMPAT_CLOSE(bridge_sock);
_mosquitto_socket_close(db->contexts[i]);
db->contexts[i]->bridge->cur_address = db->contexts[i]->bridge->address_count-1;
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = now+context->bridge->restart_timeout;
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
}
}else{
if(context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect){
rc = mqtt3_bridge_connect(db, context);
if(rc){
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
#endif
/* Local bridges never time out in this fashion. */
if(!(db->contexts[i]->keepalive)
|| db->contexts[i]->bridge
|| now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){
if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = db->contexts[i]->sock;
if(context->bridge->start_type == bst_automatic && now > context->bridge->restart_t){
context->bridge->restart_t = 0;
rc = mqtt3_bridge_connect(db, context);
if(rc == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(db->contexts[i]->current_out_packet){
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
db->contexts[i]->pollfd_index = pollfd_index;
context->pollfd_index = pollfd_index;
pollfd_index++;
}else{
mqtt3_context_disconnect(db, db->contexts[i]);
}
}else{
if(db->config->connection_messages == true){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->contexts[i]->id);
}
/* Client has exceeded keepalive*1.5 */
mqtt3_context_disconnect(db, db->contexts[i]);
}
}else{
#ifdef WITH_BRIDGE
if(db->contexts[i]->bridge){
/* Want to try to restart the bridge connection */
if(!db->contexts[i]->bridge->restart_t){
db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout;
db->contexts[i]->bridge->cur_address++;
if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
db->contexts[i]->bridge->cur_address = 0;
}
if(db->contexts[i]->bridge->round_robin == false && db->contexts[i]->bridge->cur_address != 0){
db->contexts[i]->bridge->primary_retry = now + 5;
}
}else{
if(db->contexts[i]->bridge->start_type == bst_lazy && db->contexts[i]->bridge->lazy_reconnect){
rc = mqtt3_bridge_connect(db, db->contexts[i]);
if(rc){
db->contexts[i]->bridge->cur_address++;
if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
db->contexts[i]->bridge->cur_address = 0;
}
}
}
if(db->contexts[i]->bridge->start_type == bst_automatic && now > db->contexts[i]->bridge->restart_t){
db->contexts[i]->bridge->restart_t = 0;
rc = mqtt3_bridge_connect(db, db->contexts[i]);
if(rc == MOSQ_ERR_SUCCESS){
pollfds[pollfd_index].fd = db->contexts[i]->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(db->contexts[i]->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
db->contexts[i]->pollfd_index = pollfd_index;
pollfd_index++;
}else{
/* Retry later. */
db->contexts[i]->bridge->restart_t = now+db->contexts[i]->bridge->restart_timeout;
db->contexts[i]->bridge->cur_address++;
if(db->contexts[i]->bridge->cur_address == db->contexts[i]->bridge->address_count){
db->contexts[i]->bridge->cur_address = 0;
}
}
/* Retry later. */
context->bridge->restart_t = now+context->bridge->restart_timeout;
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}else{
}
}
}
}
#endif
if(db->contexts[i]->clean_session == true){
mqtt3_context_cleanup(db, db->contexts[i], true);
db->contexts[i] = NULL;
}else if(db->config->persistent_client_expiration > 0){
/* This is a persistent client, check to see if the
* last time it connected was longer than
* persistent_client_expiration seconds ago. If so,
* expire it and clean up.
*/
if(now_time > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db->contexts[i]->id);
now_time = time(NULL);
if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
if(context->sock == -1 && context->clean_session == 0){
/* This is a persistent client, check to see if the
* last time it connected was longer than
* persistent_client_expiration seconds ago. If so,
* expire it and clean up.
*/
if(now_time > context->disconnect_t+db->config->persistent_client_expiration){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", context->id);
#ifdef WITH_SYS_TREE
g_clients_expired++;
g_clients_expired++;
#endif
db->contexts[i]->clean_session = true;
mqtt3_context_cleanup(db, db->contexts[i], true);
db->contexts[i] = NULL;
}
}
#ifdef WITH_BRIDGE
context->clean_session = true;
mqtt3_context_cleanup(db, context, true);
context = NULL;
}
#endif
}
}
expiration_check_time = time(NULL) + db->config->persistent_client_expiration;
}
mqtt3_db_message_timeout_check(db, db->config->retry_interval);
@ -319,16 +338,16 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock
return MOSQ_ERR_SUCCESS;
}
static void do_disconnect(struct mosquitto_db *db, int context_index)
static void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
{
if(db->config->connection_messages == true){
if(db->contexts[context_index]->state != mosq_cs_disconnecting){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", db->contexts[context_index]->id);
if(context->state != mosq_cs_disconnecting){
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", context->id);
}else{
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", db->contexts[context_index]->id);
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", context->id);
}
}
mqtt3_context_disconnect(db, db->contexts[context_index]);
mqtt3_context_disconnect(db, context);
}
/* Error ocurred, probably an fd has been closed.
@ -336,53 +355,51 @@ static void do_disconnect(struct mosquitto_db *db, int context_index)
*/
static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds)
{
int i;
struct mosquitto *context, *ctxt_tmp;
for(i=0; i<db->context_count; i++){
if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){
do_disconnect(db, i);
}
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
if(pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL)){
do_disconnect(db, context);
}
}
}
static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds)
{
int i;
struct mosquitto *context, *ctxt_tmp;
for(i=0; i<db->context_count; i++){
if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
assert(pollfds[context->pollfd_index].fd == context->sock);
#ifdef WITH_TLS
if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT ||
db->contexts[i]->want_write ||
(db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){
if(pollfds[context->pollfd_index].revents & POLLOUT ||
context->want_write ||
(context->ssl && context->state == mosq_cs_new)){
#else
if(pollfds[db->contexts[i]->pollfd_index].revents & POLLOUT){
if(pollfds[context->pollfd_index].revents & POLLOUT){
#endif
if(_mosquitto_packet_write(db->contexts[i])){
do_disconnect(db, i);
}
if(_mosquitto_packet_write(context)){
do_disconnect(db, context);
continue;
}
}
if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
assert(pollfds[db->contexts[i]->pollfd_index].fd == db->contexts[i]->sock);
}
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
#ifdef WITH_TLS
if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN ||
(db->contexts[i]->ssl && db->contexts[i]->state == mosq_cs_new)){
if(pollfds[context->pollfd_index].revents & POLLIN ||
(context->ssl && context->state == mosq_cs_new)){
#else
if(pollfds[db->contexts[i]->pollfd_index].revents & POLLIN){
if(pollfds[context->pollfd_index].revents & POLLIN){
#endif
if(_mosquitto_packet_read(db, db->contexts[i])){
do_disconnect(db, i);
}
if(_mosquitto_packet_read(db, context)){
do_disconnect(db, context);
continue;
}
}
if(db->contexts[i] && db->contexts[i]->sock != INVALID_SOCKET){
if(pollfds[db->contexts[i]->pollfd_index].revents & (POLLERR | POLLNVAL)){
do_disconnect(db, i);
}
if(pollfds[context->pollfd_index].revents & (POLLERR | POLLNVAL)){
do_disconnect(db, context);
continue;
}
}
}

@ -161,6 +161,7 @@ int main(int argc, char *argv[])
#else
struct timeval tv;
#endif
struct mosquitto *ctxt, *ctxt_tmp;
#if defined(WIN32) || defined(__CYGWIN__)
if(argc == 2){
@ -265,7 +266,6 @@ int main(int argc, char *argv[])
for(i=0; i<config.listener_count; i++){
if(config.listeners[i].protocol == mp_mqtt){
if(mqtt3_socket_listen(&config.listeners[i])){
_mosquitto_free(int_db.contexts);
mqtt3_db_close(&int_db);
if(config.pid_file){
remove(config.pid_file);
@ -275,7 +275,6 @@ int main(int argc, char *argv[])
listensock_count += config.listeners[i].sock_count;
listensock = _mosquitto_realloc(listensock, sizeof(int)*listensock_count);
if(!listensock){
_mosquitto_free(int_db.contexts);
mqtt3_db_close(&int_db);
if(config.pid_file){
remove(config.pid_file);
@ -284,7 +283,6 @@ int main(int argc, char *argv[])
}
for(j=0; j<config.listeners[i].sock_count; j++){
if(config.listeners[i].socks[j] == INVALID_SOCKET){
_mosquitto_free(int_db.contexts);
mqtt3_db_close(&int_db);
if(config.pid_file){
remove(config.pid_file);
@ -336,13 +334,11 @@ int main(int argc, char *argv[])
}
#endif
for(i=0; i<int_db.context_count; i++){
if(int_db.contexts[i]){
mqtt3_context_cleanup(&int_db, int_db.contexts[i], true);
}
HASH_ITER(hh_id, int_db.contexts_by_id, ctxt, ctxt_tmp){
mqtt3_context_cleanup(&int_db, ctxt, true);
}
_mosquitto_free(int_db.contexts);
int_db.contexts = NULL;
HASH_CLEAR(hh_sock, int_db.contexts_by_sock);
HASH_CLEAR(hh_bridge, int_db.contexts_bridge);
mqtt3_db_close(&int_db);
if(listensock){

@ -197,14 +197,6 @@ struct _mosquitto_auth_plugin{
int (*psk_key_get)(void *user_data, const char *hint, const char *identity, char *key, int max_key_len);
};
struct _clientid_index_hash{
/* this is the key */
char *id;
/* this is the index where the client ID exists in the db->contexts array */
int db_context_index;
UT_hash_handle hh;
};
struct mosquitto_db{
dbid_t last_db_id;
struct _mosquitto_subhier subs;
@ -212,9 +204,11 @@ struct mosquitto_db{
struct _mosquitto_acl_user *acl_list;
struct _mosquitto_acl *acl_patterns;
struct _mosquitto_unpwd *psk_id;
struct mosquitto **contexts;
struct mosquitto *contexts_by_id;
struct mosquitto *contexts_by_sock;
struct mosquitto *contexts_for_free;
struct mosquitto *contexts_bridge;
struct _clientid_index_hash *clientid_index_hash;
int context_count;
struct mosquitto_msg_store *msg_store;
int msg_store_count;
struct mqtt3_config *config;

@ -68,7 +68,6 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
int i;
int j;
int new_sock = -1;
struct mosquitto **tmp_contexts = NULL;
struct mosquitto *new_context;
#ifdef WITH_TLS
BIO *bio;
@ -172,26 +171,7 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock)
#endif
_mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "New connection from %s on port %d.", new_context->address, new_context->listener->port);
for(i=0; i<db->context_count; i++){
if(db->contexts[i] == NULL){
db->contexts[i] = new_context;
break;
}
}
if(i==db->context_count){
tmp_contexts = _mosquitto_realloc(db->contexts, sizeof(struct mosquitto*)*(db->context_count+1));
if(tmp_contexts){
db->context_count++;
db->contexts = tmp_contexts;
db->contexts[i] = new_context;
}else{
// Out of memory
mqtt3_context_cleanup(NULL, new_context, true);
return -1;
}
}
// If we got here then the context's DB index is "i" regardless of how we got here
new_context->db_index = i;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(new_context->sock), new_context);
return new_sock;
}

@ -42,41 +42,21 @@ static int _db_restore_sub(struct mosquitto_db *db, const char *client_id, const
static struct mosquitto *_db_find_or_add_context(struct mosquitto_db *db, const char *client_id, uint16_t last_mid)
{
struct mosquitto *context;
struct mosquitto **tmp_contexts;
int i;
context = NULL;
for(i=0; i<db->context_count; i++){
if(db->contexts[i] && !strcmp(db->contexts[i]->id, client_id)){
context = db->contexts[i];
break;
}
}
HASH_FIND(hh_id, db->contexts_by_id, client_id, strlen(client_id), context);
if(!context){
context = mqtt3_context_init(-1);
if(!context) return NULL;
context->id = _mosquitto_strdup(client_id);
if(!context){
_mosquitto_free(context);
return NULL;
}
context->clean_session = false;
for(i=0; i<db->context_count; i++){
if(!db->contexts[i]){
db->contexts[i] = context;
break;
}
}
if(i==db->context_count){
db->context_count++;
tmp_contexts = _mosquitto_realloc(db->contexts, sizeof(struct mosquitto*)*db->context_count);
if(tmp_contexts){
db->contexts = tmp_contexts;
db->contexts[db->context_count-1] = context;
}else{
mqtt3_context_cleanup(db, context, true);
return NULL;
}
}
context->id = _mosquitto_strdup(client_id);
context->db_index = i;
HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, context->id, strlen(context->id), context);
}
if(last_mid){
context->last_mid = last_mid;
@ -224,8 +204,7 @@ error:
static int mqtt3_db_client_write(struct mosquitto_db *db, FILE *db_fptr)
{
int i;
struct mosquitto *context;
struct mosquitto *context, *ctxt_tmp;
uint16_t i16temp, slen;
uint32_t length;
time_t disconnect_t;
@ -233,8 +212,7 @@ static int mqtt3_db_client_write(struct mosquitto_db *db, FILE *db_fptr)
assert(db);
assert(db_fptr);
for(i=0; i<db->context_count; i++){
context = db->contexts[i];
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
if(context && context->clean_session == false){
length = htonl(2+strlen(context->id) + sizeof(uint16_t) + sizeof(time_t));
@ -480,7 +458,6 @@ static int _db_client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
int rc = 0;
struct mosquitto *context;
time_t disconnect_t;
struct _clientid_index_hash *new_cih;
read_e(db_fptr, &i16temp, sizeof(uint16_t));
slen = ntohs(i16temp);
@ -515,16 +492,6 @@ static int _db_client_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
_mosquitto_free(client_id);
if(!rc){
new_cih = _mosquitto_malloc(sizeof(struct _clientid_index_hash));
if(!new_cih){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
new_cih->id = context->id;
new_cih->db_context_index = context->db_index;
HASH_ADD_KEYPTR(hh, db->clientid_index_hash, context->id, strlen(context->id), new_cih);
}
return rc;
error:
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s.", strerror(errno));

@ -87,14 +87,13 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
int rc;
struct _mosquitto_acl_user *acl_tail;
struct mosquitto_client_msg *msg_tail, *msg_prev;
struct mosquitto *found_context;
int slen;
#ifdef WITH_TLS
X509 *client_cert;
X509_NAME *name;
X509_NAME_ENTRY *name_entry;
#endif
struct _clientid_index_hash *find_cih;
struct _clientid_index_hash *new_cih;
#ifdef WITH_SYS_TREE
g_connection_count++;
@ -396,11 +395,10 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
#endif
/* Find if this client already has an entry. This must be done *after* any security checks. */
HASH_FIND_STR(db->clientid_index_hash, client_id, find_cih);
if(find_cih){
i = find_cih->db_context_index;
HASH_FIND(hh_id, db->contexts_by_id, client_id, strlen(client_id), found_context);
if(found_context){
/* Found a matching client */
if(db->contexts[i]->sock == -1){
if(found_context->sock == -1){
/* Client is reconnecting after a disconnect */
/* FIXME - does anything else need to be done here? */
#ifdef WITH_SYS_TREE
@ -417,33 +415,39 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
connect_ack |= 0x01;
}
}
db->contexts[i]->clean_session = clean_session;
mqtt3_context_cleanup(db, db->contexts[i], false);
db->contexts[i]->state = mosq_cs_connected;
found_context->clean_session = clean_session;
mqtt3_context_cleanup(db, found_context, false);
found_context->state = mosq_cs_connected;
if(context->address){
db->contexts[i]->address = _mosquitto_strdup(context->address);
//found_context->address = _mosquitto_strdup(context->address);
found_context->address = context->address;
context->address = NULL;
}else{
db->contexts[i]->address = NULL;
}
db->contexts[i]->disconnect_t = 0;
db->contexts[i]->sock = context->sock;
db->contexts[i]->listener = context->listener;
db->contexts[i]->last_msg_in = mosquitto_time();
db->contexts[i]->last_msg_out = mosquitto_time();
db->contexts[i]->keepalive = context->keepalive;
db->contexts[i]->pollfd_index = context->pollfd_index;
found_context->address = NULL;
}
found_context->disconnect_t = 0;
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
found_context->sock = context->sock;
found_context->listener = context->listener;
context->listener = NULL;
found_context->last_msg_in = mosquitto_time();
found_context->last_msg_out = mosquitto_time();
found_context->keepalive = context->keepalive;
found_context->pollfd_index = context->pollfd_index;
#ifdef WITH_TLS
db->contexts[i]->ssl = context->ssl;
found_context->ssl = context->ssl;
#endif
if(context->username){
db->contexts[i]->username = _mosquitto_strdup(context->username);
found_context->username = _mosquitto_strdup(context->username);
}
context->sock = -1;
#ifdef WITH_TLS
context->ssl = NULL;
#endif
context->state = mosq_cs_disconnecting;
context = db->contexts[i];
HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(context), context);
context = found_context;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
if(context->msgs){
mqtt3_db_message_reconnect_reset(context);
}
@ -536,17 +540,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
}
}
// Add the client ID to the DB hash table here
new_cih = _mosquitto_malloc(sizeof(struct _clientid_index_hash));
if(!new_cih){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
mqtt3_context_disconnect(db, context);
rc = MOSQ_ERR_NOMEM;
goto handle_connect_error;
}
new_cih->id = context->id;
new_cih->db_context_index = context->db_index;
HASH_ADD_KEYPTR(hh, db->clientid_index_hash, context->id, strlen(context->id), new_cih);
HASH_ADD_KEYPTR(hh_id, db->contexts_by_id, context->id, strlen(context->id), context);
#ifdef WITH_PERSISTENCE
if(!clean_session){

@ -434,7 +434,7 @@ static void _free_acl(struct _mosquitto_acl *acl)
static int _acl_cleanup(struct mosquitto_db *db, bool reload)
{
int i;
struct mosquitto *context, *ctxt_tmp;
struct _mosquitto_acl_user *user_tail;
if(!db) return MOSQ_ERR_INVAL;
@ -446,12 +446,8 @@ static int _acl_cleanup(struct mosquitto_db *db, bool reload)
* is called if we are reloading the config. If this is not done, all
* access will be denied to currently connected clients.
*/
if(db->contexts){
for(i=0; i<db->context_count; i++){
if(db->contexts[i] && db->contexts[i]->acl_list){
db->contexts[i]->acl_list = NULL;
}
}
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
context->acl_list = NULL;
}
while(db->acl_list){
@ -695,49 +691,45 @@ static int _unpwd_cleanup(struct _mosquitto_unpwd **root, bool reload)
*/
int mosquitto_security_apply_default(struct mosquitto_db *db)
{
struct mosquitto *context, *ctxt_tmp;
struct _mosquitto_acl_user *acl_user_tail;
bool allow_anonymous;
int i;
if(!db) return MOSQ_ERR_INVAL;
allow_anonymous = db->config->allow_anonymous;
if(db->contexts){
for(i=0; i<db->context_count; i++){
if(db->contexts[i]){
/* Check for anonymous clients when allow_anonymous is false */
if(!allow_anonymous && !db->contexts[i]->username){
db->contexts[i]->state = mosq_cs_disconnecting;
_mosquitto_socket_close(db->contexts[i]);
continue;
}
/* Check for connected clients that are no longer authorised */
if(mosquitto_unpwd_check_default(db, db->contexts[i]->username, db->contexts[i]->password) != MOSQ_ERR_SUCCESS){
db->contexts[i]->state = mosq_cs_disconnecting;
_mosquitto_socket_close(db->contexts[i]);
continue;
}
/* Check for ACLs and apply to user. */
if(db->acl_list){
acl_user_tail = db->acl_list;
while(acl_user_tail){
if(acl_user_tail->username){
if(db->contexts[i]->username){
if(!strcmp(acl_user_tail->username, db->contexts[i]->username)){
db->contexts[i]->acl_list = acl_user_tail;
break;
}
}
}else{
if(!db->contexts[i]->username){
db->contexts[i]->acl_list = acl_user_tail;
break;
}
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
/* Check for anonymous clients when allow_anonymous is false */
if(!allow_anonymous && !context->username){
context->state = mosq_cs_disconnecting;
_mosquitto_socket_close(db, context);
continue;
}
/* Check for connected clients that are no longer authorised */
if(mosquitto_unpwd_check_default(db, context->username, context->password) != MOSQ_ERR_SUCCESS){
context->state = mosq_cs_disconnecting;
_mosquitto_socket_close(db, context);
continue;
}
/* Check for ACLs and apply to user. */
if(db->acl_list){
acl_user_tail = db->acl_list;
while(acl_user_tail){
if(acl_user_tail->username){
if(context->username){
if(!strcmp(acl_user_tail->username, context->username)){
context->acl_list = acl_user_tail;
break;
}
acl_user_tail = acl_user_tail->next;
}
}else{
if(!context->username){
context->acl_list = acl_user_tail;
break;
}
}
acl_user_tail = acl_user_tail->next;
}
}
}

Loading…
Cancel
Save