Change the way that new clients with match client ids are dealt with.

This change means that the connection isn't swapped from new context to
old, which makes dealing with websockets lots easier. It does require
that clients storing a list of their subscriptons.
pull/211/merge
Roger A. Light 11 years ago
parent d2dbe16d68
commit e3bf10cd9a

@ -167,9 +167,6 @@ int main(int argc, char *argv[])
struct timeval tv; struct timeval tv;
#endif #endif
struct mosquitto *ctxt, *ctxt_tmp; struct mosquitto *ctxt, *ctxt_tmp;
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack *hack_head, *hack;
#endif
#if defined(WIN32) || defined(__CYGWIN__) #if defined(WIN32) || defined(__CYGWIN__)
if(argc == 2){ if(argc == 2){
@ -345,18 +342,7 @@ int main(int argc, char *argv[])
#ifdef WITH_WEBSOCKETS #ifdef WITH_WEBSOCKETS
for(i=0; i<int_db.config->listener_count; i++){ for(i=0; i<int_db.config->listener_count; i++){
if(int_db.config->listeners[i].ws_context){ if(int_db.config->listeners[i].ws_context){
hack_head = libwebsocket_context_user(int_db.config->listeners[i].ws_context);
libwebsocket_context_destroy(int_db.config->listeners[i].ws_context); libwebsocket_context_destroy(int_db.config->listeners[i].ws_context);
if(hack_head){
while(hack_head){
if(hack_head->http_dir){
_mosquitto_free(hack_head->http_dir);
}
hack = hack_head->next;
_mosquitto_free(hack_head);
hack_head = hack;
}
}
} }
if(int_db.config->listeners[i].ws_protocol){ if(int_db.config->listeners[i].ws_protocol){
_mosquitto_free(int_db.config->listeners[i].ws_protocol); _mosquitto_free(int_db.config->listeners[i].ws_protocol);

@ -299,9 +299,6 @@ struct _mqtt3_bridge{
#ifdef WITH_WEBSOCKETS #ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack { struct libws_mqtt_hack {
char *http_dir; char *http_dir;
struct mosquitto *old_mosq;
struct mosquitto *new_mosq;
struct libws_mqtt_hack *next;
}; };
struct libws_mqtt_data { struct libws_mqtt_data {

@ -92,15 +92,13 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
struct mosquitto_client_msg *msg_tail, *msg_prev; struct mosquitto_client_msg *msg_tail, *msg_prev;
struct mosquitto *found_context; struct mosquitto *found_context;
int slen; int slen;
struct _mosquitto_subleaf *leaf;
#ifdef WITH_TLS #ifdef WITH_TLS
int i; int i;
X509 *client_cert = NULL; X509 *client_cert = NULL;
X509_NAME *name; X509_NAME *name;
X509_NAME_ENTRY *name_entry; X509_NAME_ENTRY *name_entry;
#endif #endif
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack *ws_ctxt_user, *ws_ctxt_user_head;
#endif
#ifdef WITH_SYS_TREE #ifdef WITH_SYS_TREE
g_connection_count++; g_connection_count++;
@ -431,104 +429,40 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context)
context->clean_session = clean_session; context->clean_session = clean_session;
found_context->clean_session = clean_session; found_context->clean_session = clean_session;
mqtt3_context_cleanup(db, found_context, false); found_context->state = mosq_cs_disconnecting;
found_context->state = mosq_cs_connected;
if(context->address){
found_context->address = context->address;
context->address = NULL;
}else{
found_context->address = NULL;
}
found_context->disconnect_t = 0;
found_context->sock = context->sock;
found_context->listener = context->listener;
context->listener = NULL;
found_context->last_msg_in = mosquitto_time();
found_context->last_msg_out = mosquitto_time();
found_context->keepalive = context->keepalive;
found_context->pollfd_index = context->pollfd_index;
#ifdef WITH_TLS
found_context->ssl = context->ssl;
#endif
if(context->username){
found_context->username = context->username;
context->username = NULL;
}
if(context->password){
found_context->password = context->password;
context->password = NULL;
}
#ifdef WITH_TLS HASH_DELETE(hh_id, db->contexts_by_id, found_context);
context->ssl = NULL;
#endif
context->state = mosq_cs_disconnecting;
#ifdef WITH_WEBSOCKETS #ifdef WITH_WEBSOCKETS
if(found_context->wsi){ if(found_context->wsi){
/* This is a hack to allow us to update the wsi->user_space found_context->state = mosq_cs_disconnect_ws;
* structure. If libwebsockets let us access that variable itself,
* this wouldn't be necessary. */
ws_ctxt_user_head = (struct libws_mqtt_hack *)libwebsocket_context_user(found_context->ws_context);
ws_ctxt_user = _mosquitto_calloc(1, sizeof(struct libws_mqtt_hack));
if(!ws_ctxt_user){
rc = MOSQ_ERR_NOMEM;
goto handle_connect_error;
}
ws_ctxt_user->old_mosq = found_context;
ws_ctxt_user->new_mosq = NULL;
ws_ctxt_user->next = ws_ctxt_user_head->next;
ws_ctxt_user_head->next = ws_ctxt_user;
found_context->sock = INVALID_SOCKET; found_context->sock = INVALID_SOCKET;
found_context->wsi = NULL; }else
} #endif
if(context->wsi){ if(found_context->sock != INVALID_SOCKET){
found_context->wsi = context->wsi;
found_context->ws_context = context->ws_context;
found_context->sock = WEBSOCKET_CLIENT;
context->wsi = NULL;
context->ws_context = NULL;
context->sock = INVALID_SOCKET;
/* This is a hack to allow us to update the wsi->user_space
* structure. If libwebsockets let us access that variable itself,
* this wouldn't be necessary. */
ws_ctxt_user_head = (struct libws_mqtt_hack *)libwebsocket_context_user(found_context->ws_context);
ws_ctxt_user = _mosquitto_calloc(1, sizeof(struct libws_mqtt_hack));
if(!ws_ctxt_user){
rc = MOSQ_ERR_NOMEM;
goto handle_connect_error;
}
ws_ctxt_user->old_mosq = context;
ws_ctxt_user->new_mosq = found_context;
while(ws_ctxt_user_head->next){
ws_ctxt_user_head = ws_ctxt_user_head->next;
}
ws_ctxt_user_head->next = ws_ctxt_user;
mosquitto__add_context_to_disused(db, context);
}else{
mosquitto__add_context_to_disused(db, context);
HASH_DELETE(hh_sock, db->contexts_by_sock, context); HASH_DELETE(hh_sock, db->contexts_by_sock, context);
context->sock = INVALID_SOCKET; found_context->sock = INVALID_SOCKET;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(found_context->sock), found_context);
} }
#else
mosquitto__add_context_to_disused(db, context);
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
context->sock = INVALID_SOCKET;
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(found_context->sock), found_context);
#endif
context = found_context;
if(context->msgs){ if(found_context->msgs){
context->msgs = found_context->msgs;
found_context->msgs = NULL;
mqtt3_db_message_reconnect_reset(context); mqtt3_db_message_reconnect_reset(context);
} }
context->subs = found_context->subs;
found_context->subs = NULL;
context->sub_count = found_context->sub_count;
found_context->sub_count = 0;
for(i=0; i<context->sub_count; i++){
leaf = context->subs[i]->subs;
while(leaf){
if(leaf->context == found_context){
leaf->context = context;
}
leaf = leaf->next;
}
}
} }
/* Associate user with its ACL, assuming we have ACLs loaded. */ /* Associate user with its ACL, assuming we have ACLs loaded. */

@ -125,7 +125,6 @@ static int callback_mqtt(struct libwebsocket_context *context,
struct _mosquitto_packet *packet; struct _mosquitto_packet *packet;
int count; int count;
struct libws_mqtt_data *u = (struct libws_mqtt_data *)user; struct libws_mqtt_data *u = (struct libws_mqtt_data *)user;
struct libws_mqtt_hack *hack_head, *hack, *hack_prev = NULL;
size_t pos; size_t pos;
uint8_t *buf; uint8_t *buf;
int rc; int rc;
@ -133,26 +132,6 @@ static int callback_mqtt(struct libwebsocket_context *context,
db = &int_db; db = &int_db;
/* Update wsi->user, in case of reconnecting client */
hack_head = (struct libws_mqtt_hack *)libwebsocket_context_user(context);
if(hack_head && u && u->mosq){
hack = hack_head->next;
while(hack){
if(hack->old_mosq == u->mosq){
u->mosq = hack->new_mosq;
if(hack_prev){
hack_prev->next = hack->next;
}else{
hack_head->next = hack->next;
}
_mosquitto_free(hack);
break;
}
hack_prev = hack;
hack = hack->next;
}
}
switch (reason) { switch (reason) {
case LWS_CALLBACK_ESTABLISHED: case LWS_CALLBACK_ESTABLISHED:
mosq = mqtt3_context_init(db, WEBSOCKET_CLIENT); mosq = mqtt3_context_init(db, WEBSOCKET_CLIENT);

Loading…
Cancel
Save