From 13f94f3511ee174c7f52047edc9db38d53c66b4e Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 3 Jul 2014 01:00:57 +0100 Subject: [PATCH] More reconnect fixes. --- lib/net_mosq.c | 8 -------- src/loop.c | 37 ++++++++++++++++++++++++++++--------- src/mosquitto_broker.h | 1 + src/read_handle_server.c | 5 ++--- src/websockets.c | 6 +----- 5 files changed, 32 insertions(+), 25 deletions(-) diff --git a/lib/net_mosq.c b/lib/net_mosq.c index 47eed678..87eeee2d 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -238,14 +238,6 @@ int _mosquitto_socket_close(struct mosquitto *mosq) #endif } -#ifdef WITH_WEBSOCKETS - if(!mosq->wsi){ -#endif - HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, mosq, sizeof(void *), mosq); -#ifdef WITH_WEBSOCKETS - } -#endif - #ifdef WITH_BROKER if(mosq->listener){ mosq->listener->client_count--; diff --git a/src/loop.c b/src/loop.c index ca252f7f..83d6f071 100644 --- a/src/loop.c +++ b/src/loop.c @@ -330,19 +330,38 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock return MOSQ_ERR_SUCCESS; } -static void do_disconnect(struct mosquitto_db *db, struct mosquitto *context) +void do_disconnect(struct mosquitto_db *db, struct mosquitto *context) { - if(db->config->connection_messages == true){ +#ifdef WITH_WEBSOCKETS + if(context->wsi){ if(context->state != mosq_cs_disconnecting){ - _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", context->id); - }else{ - _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", context->id); + context->state = mosq_cs_disconnect_ws; } + if(context->wsi){ + libwebsocket_callback_on_writable(context->ws_context, context->wsi); + } + context->sock = INVALID_SOCKET; + }else{ +#endif + if(db->config->connection_messages == true){ + if(context->state != mosq_cs_disconnecting){ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Socket error on client %s, disconnecting.", context->id); + }else{ + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", context->id); + } + } + mqtt3_context_disconnect(db, context); + if(context->clean_session){ + HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context); + if(context->id){ + HASH_DELETE(hh_id, db->contexts_by_id, context); + _mosquitto_free(context->id); + context->id = NULL; + } + } +#ifdef WITH_WEBSOCKETS } - mqtt3_context_disconnect(db, context); - if(context->clean_session){ - mqtt3_context_cleanup(db, context, true); - } +#endif } /* Error ocurred, probably an fd has been closed. diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 1f0568e8..b54868b9 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -451,5 +451,6 @@ void service_run(void); #ifdef WITH_WEBSOCKETS struct libwebsocket_context *mosq_websockets_init(struct _mqtt3_listener *listener); #endif +void do_disconnect(struct mosquitto_db *db, struct mosquitto *context); #endif diff --git a/src/read_handle_server.c b/src/read_handle_server.c index 5537f25a..82134598 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -124,7 +124,6 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) return 3; } if(_mosquitto_read_byte(&context->in_packet, &protocol_version)){ - HASH_ADD_KEYPTR(hh_for_free, db->contexts_for_free, context, sizeof(void *), context); rc = 1; goto handle_connect_error; return 1; @@ -175,7 +174,6 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) goto handle_connect_error; } clean_session = (connect_flags & 0x02) >> 1; - context->clean_session = clean_session; will = connect_flags & 0x04; will_qos = (connect_flags & 0x18) >> 3; if(will_qos == 3){ @@ -427,6 +425,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) } } + context->clean_session = clean_session; found_context->clean_session = clean_session; mqtt3_context_cleanup(db, found_context, false); found_context->state = mosq_cs_connected; @@ -658,7 +657,7 @@ int mqtt3_handle_disconnect(struct mosquitto_db *db, struct mosquitto *context) } } context->state = mosq_cs_disconnecting; - mqtt3_context_disconnect(db, context); + do_disconnect(db, context); return MOSQ_ERR_SUCCESS; } diff --git a/src/websockets.c b/src/websockets.c index 0f55ca25..f0de5532 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -148,11 +148,7 @@ static int callback_mqtt(struct libwebsocket_context *context, mosq = u->mosq; if(mosq){ mosq->wsi = NULL; - if(mosq->clean_session){ - mqtt3_context_cleanup(db, mosq, true); - }else{ - mqtt3_context_disconnect(db, mosq); - } + do_disconnect(db, mosq); } break;