Experimental fix for poor websockets performance.

pull/472/head
Roger A. Light 8 years ago
parent 621f18d696
commit c07ba2a3da

@ -1,3 +1,6 @@
Broker:
- Fix for poor websockets performance.
Clients: Clients:
- Don't use / in auto-generated client ids. - Don't use / in auto-generated client ids.

@ -226,6 +226,7 @@ struct mosquitto {
struct libwebsocket *wsi; struct libwebsocket *wsi;
# endif # endif
# endif # endif
bool ws_want_write;
#else #else
# ifdef WITH_SOCKS # ifdef WITH_SOCKS
char *socks5_host; char *socks5_host;

@ -108,6 +108,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
struct pollfd *pollfds = NULL; struct pollfd *pollfds = NULL;
int pollfd_count = 0; int pollfd_count = 0;
int pollfd_index; int pollfd_index;
int pollfd_max;
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
mosq_sock_t bridge_sock; mosq_sock_t bridge_sock;
int rc; int rc;
@ -122,6 +123,18 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
sigaddset(&sigblock, SIGINT); sigaddset(&sigblock, SIGINT);
#endif #endif
#ifdef WIN32
pollfd_max = _getmaxstdio();
#else
pollfd_max = getdtablesize();
#endif
pollfds = _mosquitto_malloc(sizeof(struct pollfd)*pollfd_max);
if(!pollfds){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
if(db->config->persistent_client_expiration > 0){ if(db->config->persistent_client_expiration > 0){
expiration_check_time = time(NULL) + 3600; expiration_check_time = time(NULL) + 3600;
} }
@ -139,16 +152,8 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
context_count += db->bridge_count; context_count += db->bridge_count;
#endif #endif
if(listensock_count + context_count > pollfd_count || !pollfds){ pollfd_count = listensock_count + context_count;
pollfd_count = listensock_count + context_count; memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max);
pollfds = _mosquitto_realloc(pollfds, sizeof(struct pollfd)*pollfd_count);
if(!pollfds){
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count);
pollfd_index = 0; pollfd_index = 0;
for(i=0; i<listensock_count; i++){ for(i=0; i<listensock_count; i++){
@ -196,8 +201,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0; pollfds[pollfd_index].revents = 0;
if(context->current_out_packet || context->state == mosq_cs_connect_pending){ if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
pollfds[pollfd_index].events |= POLLOUT; pollfds[pollfd_index].events |= POLLOUT;
context->ws_want_write = false;
} }
context->pollfd_index = pollfd_index; context->pollfd_index = pollfd_index;
pollfd_index++; pollfd_index++;
@ -436,7 +442,10 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
if(context->wsi){ if(context->wsi){
libwebsocket_callback_on_writable(context->ws_context, context->wsi); libwebsocket_callback_on_writable(context->ws_context, context->wsi);
} }
context->sock = INVALID_SOCKET; if(context->sock != INVALID_SOCKET){
HASH_DELETE(hh_sock, db->contexts_by_sock, context);
context->sock = INVALID_SOCKET;
}
}else }else
#endif #endif
{ {
@ -482,6 +491,18 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol
} }
assert(pollfds[context->pollfd_index].fd == context->sock); assert(pollfds[context->pollfd_index].fd == context->sock);
#ifdef WITH_WEBSOCKETS
if(context->wsi){
struct lws_pollfd wspoll;
wspoll.fd = pollfds[context->pollfd_index].fd;
wspoll.events = pollfds[context->pollfd_index].events;
wspoll.revents = pollfds[context->pollfd_index].revents;
lws_service_fd(lws_get_context(context->wsi), &wspoll);
continue;
}
#endif
#ifdef WITH_TLS #ifdef WITH_TLS
if(pollfds[context->pollfd_index].revents & POLLOUT || if(pollfds[context->pollfd_index].revents & POLLOUT ||
context->want_write || context->want_write ||

@ -218,6 +218,8 @@ static int callback_mqtt(struct libwebsocket_context *context,
u->mosq = NULL; u->mosq = NULL;
return -1; return -1;
} }
mosq->sock = libwebsocket_get_socket_fd(wsi);
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(mosq->sock), mosq);
break; break;
case LWS_CALLBACK_CLOSED: case LWS_CALLBACK_CLOSED:
@ -226,6 +228,10 @@ static int callback_mqtt(struct libwebsocket_context *context,
} }
mosq = u->mosq; mosq = u->mosq;
if(mosq){ if(mosq){
if(mosq->sock > 0){
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
mosq->sock = INVALID_SOCKET;
}
mosq->wsi = NULL; mosq->wsi = NULL;
do_disconnect(db, mosq); do_disconnect(db, mosq);
} }
@ -412,6 +418,9 @@ static int callback_http(struct libwebsocket_context *context,
char *filename, *filename_canonical; char *filename, *filename_canonical;
unsigned char buf[4096]; unsigned char buf[4096];
struct stat filestat; struct stat filestat;
struct mosquitto_db *db = &int_db;
struct mosquitto *mosq;
struct lws_pollargs *pollargs = (struct lws_pollargs *)in;
/* FIXME - ssl cert verification is done here. */ /* FIXME - ssl cert verification is done here. */
@ -583,6 +592,15 @@ static int callback_http(struct libwebsocket_context *context,
break; break;
#endif #endif
case LWS_CALLBACK_ADD_POLL_FD:
case LWS_CALLBACK_DEL_POLL_FD:
case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
HASH_FIND(hh_sock, db->contexts_by_sock, &pollargs->fd, sizeof(pollargs->fd), mosq);
if(mosq){
mosq->ws_want_write = true;
}
break;
default: default:
return 0; return 0;
} }

Loading…
Cancel
Save