|
|
|
@ -222,114 +222,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
time_count = 0;
|
|
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
|
|
|
|
if(time_count > 0){
|
|
|
|
|
time_count--;
|
|
|
|
|
}else{
|
|
|
|
|
time_count = 1000;
|
|
|
|
|
now = mosquitto_time();
|
|
|
|
|
}
|
|
|
|
|
context->pollfd_index = -1;
|
|
|
|
|
|
|
|
|
|
if(context->sock != INVALID_SOCKET){
|
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
|
if(context->bridge){
|
|
|
|
|
mosquitto__check_keepalive(db, context);
|
|
|
|
|
if(context->bridge->round_robin == false
|
|
|
|
|
&& context->bridge->cur_address != 0
|
|
|
|
|
&& context->bridge->primary_retry
|
|
|
|
|
&& now > context->bridge->primary_retry){
|
|
|
|
|
|
|
|
|
|
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
|
|
|
|
|
rc = net__try_connect(context->bridge->addresses[0].address,
|
|
|
|
|
context->bridge->addresses[0].port,
|
|
|
|
|
&context->bridge->primary_retry_sock, NULL, false);
|
|
|
|
|
|
|
|
|
|
if(rc == 0){
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = 0;
|
|
|
|
|
net__socket_close(db, context);
|
|
|
|
|
context->bridge->cur_address = 0;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
len = sizeof(int);
|
|
|
|
|
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
|
|
|
|
if(err == 0){
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = 0;
|
|
|
|
|
net__socket_close(db, context);
|
|
|
|
|
context->bridge->cur_address = context->bridge->address_count-1;
|
|
|
|
|
}else{
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = now+5;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = now+5;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/* Local bridges never time out in this fashion. */
|
|
|
|
|
if(!(context->keepalive)
|
|
|
|
|
|| context->bridge
|
|
|
|
|
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
|
|
|
|
|
|
|
|
|
|
if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
|
|
|
|
|
#ifdef WITH_EPOLL
|
|
|
|
|
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
|
|
|
|
|
if(!(context->events & EPOLLOUT)) {
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.events = EPOLLIN | EPOLLOUT;
|
|
|
|
|
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
context->events = EPOLLIN | EPOLLOUT;
|
|
|
|
|
}
|
|
|
|
|
context->ws_want_write = false;
|
|
|
|
|
}
|
|
|
|
|
else{
|
|
|
|
|
if(context->events & EPOLLOUT) {
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.events = EPOLLIN;
|
|
|
|
|
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
context->events = EPOLLIN;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
pollfds[pollfd_index].fd = context->sock;
|
|
|
|
|
pollfds[pollfd_index].events = POLLIN;
|
|
|
|
|
pollfds[pollfd_index].revents = 0;
|
|
|
|
|
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
|
|
|
|
|
pollfds[pollfd_index].events |= POLLOUT;
|
|
|
|
|
context->ws_want_write = false;
|
|
|
|
|
}
|
|
|
|
|
context->pollfd_index = pollfd_index;
|
|
|
|
|
pollfd_index++;
|
|
|
|
|
#endif
|
|
|
|
|
}else{
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
/* Client has exceeded keepalive*1.5 */
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_KEEPALIVE);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
|
time_count = 0;
|
|
|
|
|
for(i=0; i<db->bridge_count; i++){
|
|
|
|
@ -466,6 +358,115 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
time_count = 0;
|
|
|
|
|
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
|
|
|
|
|
if(time_count > 0){
|
|
|
|
|
time_count--;
|
|
|
|
|
}else{
|
|
|
|
|
time_count = 1000;
|
|
|
|
|
now = mosquitto_time();
|
|
|
|
|
}
|
|
|
|
|
context->pollfd_index = -1;
|
|
|
|
|
|
|
|
|
|
if(context->sock != INVALID_SOCKET){
|
|
|
|
|
#ifdef WITH_BRIDGE
|
|
|
|
|
if(context->bridge){
|
|
|
|
|
mosquitto__check_keepalive(db, context);
|
|
|
|
|
if(context->bridge->round_robin == false
|
|
|
|
|
&& context->bridge->cur_address != 0
|
|
|
|
|
&& context->bridge->primary_retry
|
|
|
|
|
&& now > context->bridge->primary_retry){
|
|
|
|
|
|
|
|
|
|
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
|
|
|
|
|
rc = net__try_connect(context->bridge->addresses[0].address,
|
|
|
|
|
context->bridge->addresses[0].port,
|
|
|
|
|
&context->bridge->primary_retry_sock, NULL, false);
|
|
|
|
|
|
|
|
|
|
if(rc == 0){
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = 0;
|
|
|
|
|
net__socket_close(db, context);
|
|
|
|
|
context->bridge->cur_address = 0;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
len = sizeof(int);
|
|
|
|
|
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
|
|
|
|
|
if(err == 0){
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = 0;
|
|
|
|
|
net__socket_close(db, context);
|
|
|
|
|
context->bridge->cur_address = context->bridge->address_count-1;
|
|
|
|
|
}else{
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = now+5;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
COMPAT_CLOSE(context->bridge->primary_retry_sock);
|
|
|
|
|
context->bridge->primary_retry_sock = INVALID_SOCKET;
|
|
|
|
|
context->bridge->primary_retry = now+5;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/* Local bridges never time out in this fashion. */
|
|
|
|
|
if(!(context->keepalive)
|
|
|
|
|
|| context->bridge
|
|
|
|
|
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
|
|
|
|
|
|
|
|
|
|
if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
|
|
|
|
|
#ifdef WITH_EPOLL
|
|
|
|
|
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
|
|
|
|
|
if(!(context->events & EPOLLOUT)) {
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.events = EPOLLIN | EPOLLOUT;
|
|
|
|
|
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
context->events = EPOLLIN | EPOLLOUT;
|
|
|
|
|
}
|
|
|
|
|
context->ws_want_write = false;
|
|
|
|
|
}
|
|
|
|
|
else{
|
|
|
|
|
if(context->events & EPOLLOUT) {
|
|
|
|
|
ev.data.fd = context->sock;
|
|
|
|
|
ev.events = EPOLLIN;
|
|
|
|
|
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
|
|
|
|
|
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
context->events = EPOLLIN;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
pollfds[pollfd_index].fd = context->sock;
|
|
|
|
|
pollfds[pollfd_index].events = POLLIN;
|
|
|
|
|
pollfds[pollfd_index].revents = 0;
|
|
|
|
|
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
|
|
|
|
|
pollfds[pollfd_index].events |= POLLOUT;
|
|
|
|
|
context->ws_want_write = false;
|
|
|
|
|
}
|
|
|
|
|
context->pollfd_index = pollfd_index;
|
|
|
|
|
pollfd_index++;
|
|
|
|
|
#endif
|
|
|
|
|
}else{
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
/* Client has exceeded keepalive*1.5 */
|
|
|
|
|
do_disconnect(db, context, MOSQ_ERR_KEEPALIVE);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#ifndef WIN32
|
|
|
|
|
sigprocmask(SIG_SETMASK, &sigblock, &origsig);
|
|
|
|
|
#ifdef WITH_EPOLL
|
|
|
|
|