|
|
|
@ -62,12 +62,12 @@ Contributors:
|
|
|
|
|
# error "epoll not supported on WIN32"
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events);
|
|
|
|
|
static void loop_handle_reads_writes(struct mosquitto_db *db, struct mosquitto *context, uint32_t events);
|
|
|
|
|
|
|
|
|
|
static sigset_t my_sigblock;
|
|
|
|
|
static struct epoll_event ep_events[MAX_EVENTS];
|
|
|
|
|
|
|
|
|
|
int mux_epoll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
|
|
|
|
int mux_epoll__init(struct mosquitto_db *db, struct mosquitto__listener_sock *listensock, int listensock_count)
|
|
|
|
|
{
|
|
|
|
|
struct epoll_event ev;
|
|
|
|
|
int i;
|
|
|
|
@ -90,9 +90,9 @@ int mux_epoll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listen
|
|
|
|
|
}
|
|
|
|
|
memset(&ev, 0, sizeof(struct epoll_event));
|
|
|
|
|
for(i=0; i<listensock_count; i++){
|
|
|
|
|
ev.data.fd = listensock[i];
|
|
|
|
|
ev.data.ptr = &listensock[i];
|
|
|
|
|
ev.events = EPOLLIN;
|
|
|
|
|
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) {
|
|
|
|
|
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, listensock[i].sock, &ev) == -1) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno));
|
|
|
|
|
(void)close(db->epollfd);
|
|
|
|
|
db->epollfd = 0;
|
|
|
|
@ -115,7 +115,7 @@ int mux_epoll__add_out(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
memset(&ev, 0, sizeof(struct epoll_event));
|
|
|
|
|
if(!(context->events & EPOLLOUT)) {
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.data.ptr = context;
|
|
|
|
|
ev.events = EPOLLIN | EPOLLOUT;
|
|
|
|
|
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
|
|
|
@ -134,7 +134,7 @@ int mux_epoll__remove_out(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
memset(&ev, 0, sizeof(struct epoll_event));
|
|
|
|
|
if(context->events & EPOLLOUT) {
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.data.ptr = context;
|
|
|
|
|
ev.events = EPOLLIN;
|
|
|
|
|
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
|
|
|
@ -153,7 +153,7 @@ int mux_epoll__add_in(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
memset(&ev, 0, sizeof(struct epoll_event));
|
|
|
|
|
ev.events = EPOLLIN;
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.data.ptr = context;
|
|
|
|
|
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
@ -176,21 +176,22 @@ int mux_epoll__delete(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count)
|
|
|
|
|
int mux_epoll__handle(struct mosquitto_db *db)
|
|
|
|
|
{
|
|
|
|
|
int i;
|
|
|
|
|
int j;
|
|
|
|
|
struct epoll_event ev;
|
|
|
|
|
sigset_t origsig;
|
|
|
|
|
struct mosquitto *context;
|
|
|
|
|
int fdcount;
|
|
|
|
|
struct mosquitto__listener_sock *listensock;
|
|
|
|
|
int event_count;
|
|
|
|
|
int sock;
|
|
|
|
|
|
|
|
|
|
memset(&ev, 0, sizeof(struct epoll_event));
|
|
|
|
|
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig);
|
|
|
|
|
fdcount = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100);
|
|
|
|
|
event_count = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100);
|
|
|
|
|
sigprocmask(SIG_SETMASK, &origsig, NULL);
|
|
|
|
|
|
|
|
|
|
switch(fdcount){
|
|
|
|
|
switch(event_count){
|
|
|
|
|
case -1:
|
|
|
|
|
if(errno != EINTR){
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
|
|
|
|
@ -199,26 +200,25 @@ int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int list
|
|
|
|
|
case 0:
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
for(i=0; i<fdcount; i++){
|
|
|
|
|
for(j=0; j<listensock_count; j++){
|
|
|
|
|
if (ep_events[i].data.fd == listensock[j]) {
|
|
|
|
|
if (ep_events[i].events & (EPOLLIN | EPOLLPRI)){
|
|
|
|
|
while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){
|
|
|
|
|
context = NULL;
|
|
|
|
|
HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context);
|
|
|
|
|
if(!context) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
|
|
|
|
|
}
|
|
|
|
|
context->events = EPOLLIN;
|
|
|
|
|
mux__add_in(db, context);
|
|
|
|
|
for(i=0; i<event_count; i++){
|
|
|
|
|
context = ep_events[i].data.ptr;
|
|
|
|
|
if(context->ident == id_client){
|
|
|
|
|
loop_handle_reads_writes(db, context, ep_events[i].events);
|
|
|
|
|
}else if(context->ident == id_listener){
|
|
|
|
|
listensock = ep_events[i].data.ptr;
|
|
|
|
|
|
|
|
|
|
if (ep_events[i].events & (EPOLLIN | EPOLLPRI)){
|
|
|
|
|
while((sock = net__socket_accept(db, listensock)) != -1){
|
|
|
|
|
context = NULL;
|
|
|
|
|
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
|
|
|
|
if(!context) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
|
|
|
|
|
}
|
|
|
|
|
context->events = EPOLLIN;
|
|
|
|
|
mux__add_in(db, context);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (j == listensock_count) {
|
|
|
|
|
loop_handle_reads_writes(db, ep_events[i].data.fd, ep_events[i].events);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
@ -233,99 +233,74 @@ int mux_epoll__cleanup(struct mosquitto_db *db)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events)
|
|
|
|
|
static void loop_handle_reads_writes(struct mosquitto_db *db, struct mosquitto *context, uint32_t events)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto *context;
|
|
|
|
|
int err;
|
|
|
|
|
socklen_t len;
|
|
|
|
|
int rc;
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
context = NULL;
|
|
|
|
|
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
|
|
|
|
if(!context) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
for (i=0;i<1;i++) {
|
|
|
|
|
|
|
|
|
|
#ifdef WITH_WEBSOCKETS
|
|
|
|
|
if(context->wsi){
|
|
|
|
|
struct lws_pollfd wspoll;
|
|
|
|
|
wspoll.fd = context->sock;
|
|
|
|
|
wspoll.events = (int16_t)context->events;
|
|
|
|
|
wspoll.revents = (int16_t)events;
|
|
|
|
|
if(context->wsi){
|
|
|
|
|
struct lws_pollfd wspoll;
|
|
|
|
|
wspoll.fd = context->sock;
|
|
|
|
|
wspoll.events = (int16_t)context->events;
|
|
|
|
|
wspoll.revents = (int16_t)events;
|
|
|
|
|
#ifdef LWS_LIBRARY_VERSION_NUMBER
|
|
|
|
|
lws_service_fd(lws_get_context(context->wsi), &wspoll);
|
|
|
|
|
lws_service_fd(lws_get_context(context->wsi), &wspoll);
|
|
|
|
|
#else
|
|
|
|
|
lws_service_fd(context->ws_context, &wspoll);
|
|
|
|
|
lws_service_fd(context->ws_context, &wspoll);
|
|
|
|
|
#endif
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if(events & EPOLLOUT
|
|
|
|
|
#ifdef WITH_TLS
|
|
|
|
|
if(events & EPOLLOUT ||
|
|
|
|
|
context->want_write ||
|
|
|
|
|
(context->ssl && context->state == mosq_cs_new)){
|
|
|
|
|
#else
|
|
|
|
|
if(events & EPOLLOUT){
|
|
|
|
|
|| context->want_write
|
|
|
|
|
|| (context->ssl && context->state == mosq_cs_new)
|
|
|
|
|
#endif
|
|
|
|
|
if(context->state == mosq_cs_connect_pending){
|
|
|
|
|
len = sizeof(int);
|
|
|
|
|
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
|
|
|
|
if(err == 0){
|
|
|
|
|
mosquitto__set_state(context, mosq_cs_new);
|
|
|
|
|
){
|
|
|
|
|
|
|
|
|
|
if(context->state == mosq_cs_connect_pending){
|
|
|
|
|
len = sizeof(int);
|
|
|
|
|
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
|
|
|
|
if(err == 0){
|
|
|
|
|
mosquitto__set_state(context, mosq_cs_new);
|
|
|
|
|
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
|
|
|
|
|
if(context->bridge){
|
|
|
|
|
bridge__connect_step3(db, context);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
if(context->bridge){
|
|
|
|
|
bridge__connect_step3(db, context);
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
continue;
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
rc = packet__write(context);
|
|
|
|
|
if(rc){
|
|
|
|
|
do_disconnect(db, context, rc);
|
|
|
|
|
continue;
|
|
|
|
|
}else{
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
context = NULL;
|
|
|
|
|
HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context);
|
|
|
|
|
if(!context) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
for (i=0;i<1;i++) {
|
|
|
|
|
#ifdef WITH_WEBSOCKETS
|
|
|
|
|
if(context->wsi){
|
|
|
|
|
// Websocket are already handled above
|
|
|
|
|
continue;
|
|
|
|
|
rc = packet__write(context);
|
|
|
|
|
if(rc){
|
|
|
|
|
do_disconnect(db, context, rc);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(events & EPOLLIN
|
|
|
|
|
#ifdef WITH_TLS
|
|
|
|
|
if(events & EPOLLIN ||
|
|
|
|
|
(context->ssl && context->state == mosq_cs_new)){
|
|
|
|
|
#else
|
|
|
|
|
if(events & EPOLLIN){
|
|
|
|
|
|| (context->ssl && context->state == mosq_cs_new)
|
|
|
|
|
#endif
|
|
|
|
|
do{
|
|
|
|
|
rc = packet__read(db, context);
|
|
|
|
|
if(rc){
|
|
|
|
|
do_disconnect(db, context, rc);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}while(SSL_DATA_PENDING(context));
|
|
|
|
|
}else{
|
|
|
|
|
if(events & (EPOLLERR | EPOLLHUP)){
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
continue;
|
|
|
|
|
){
|
|
|
|
|
|
|
|
|
|
do{
|
|
|
|
|
rc = packet__read(db, context);
|
|
|
|
|
if(rc){
|
|
|
|
|
do_disconnect(db, context, rc);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}while(SSL_DATA_PENDING(context));
|
|
|
|
|
}else{
|
|
|
|
|
if(events & (EPOLLERR | EPOLLHUP)){
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|