You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mosquitto/src/mux_epoll.c

273 lines
6.5 KiB
C

/*
Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
Tatsuzo Osawa - Add epoll.
*/
#include "config.h"
#ifdef WITH_EPOLL
#define MAX_EVENTS 1000
#define _GNU_SOURCE
#include <errno.h>
#include <signal.h>
#include <sys/epoll.h>
#include "mosquitto_broker_internal.h"
#include "mux.h"
#include "packet_mosq.h"
#include "util_mosq.h"
static void loop_handle_reads_writes(struct mosquitto *context, uint32_t events);
static sigset_t my_sigblock;
static struct epoll_event ep_events[MAX_EVENTS];
int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_count)
{
struct epoll_event ev;
int i;
sigemptyset(&my_sigblock);
sigaddset(&my_sigblock, SIGINT);
sigaddset(&my_sigblock, SIGTERM);
sigaddset(&my_sigblock, SIGUSR1);
sigaddset(&my_sigblock, SIGUSR2);
sigaddset(&my_sigblock, SIGHUP);
memset(&ep_events, 0, sizeof(struct epoll_event)*MAX_EVENTS);
db.epollfd = 0;
if ((db.epollfd = epoll_create(MAX_EVENTS)) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll creating: %s", strerror(errno));
return MOSQ_ERR_UNKNOWN;
}
memset(&ev, 0, sizeof(struct epoll_event));
for(i=0; i<listensock_count; i++){
ev.data.ptr = &listensock[i];
ev.events = EPOLLIN;
if (epoll_ctl(db.epollfd, EPOLL_CTL_ADD, listensock[i].sock, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno));
(void)close(db.epollfd);
db.epollfd = 0;
return MOSQ_ERR_UNKNOWN;
}
}
return MOSQ_ERR_SUCCESS;
}
int mux_epoll__add_out(struct mosquitto *context)
{
struct epoll_event ev;
if(!(context->events & EPOLLOUT)) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.ptr = context;
ev.events = EPOLLIN | EPOLLOUT;
if(epoll_ctl(db.epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db.epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
}
}
context->events = EPOLLIN | EPOLLOUT;
}
return MOSQ_ERR_SUCCESS;
}
int mux_epoll__remove_out(struct mosquitto *context)
{
struct epoll_event ev;
if(context->events & EPOLLOUT) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.data.ptr = context;
ev.events = EPOLLIN;
if(epoll_ctl(db.epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db.epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
}
}
context->events = EPOLLIN;
}
return MOSQ_ERR_SUCCESS;
}
int mux_epoll__new(struct mosquitto *context)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.ptr = context;
if (epoll_ctl(db.epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if(errno != EEXIST){
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
}
}
context->events = EPOLLIN;
return MOSQ_ERR_SUCCESS;
}
int mux_epoll__delete(struct mosquitto *context)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
if(context->sock != INVALID_SOCKET){
if(epoll_ctl(db.epollfd, EPOLL_CTL_DEL, context->sock, &ev) == -1){
return 1;
}
}
return 0;
}
int mux_epoll__handle(void)
{
int i;
struct epoll_event ev;
sigset_t origsig;
struct mosquitto *context;
struct mosquitto__listener_sock *listensock;
int event_count;
memset(&ev, 0, sizeof(struct epoll_event));
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig);
event_count = epoll_wait(db.epollfd, ep_events, MAX_EVENTS, 100);
sigprocmask(SIG_SETMASK, &origsig, NULL);
db.now_s = mosquitto_time();
db.now_real_s = time(NULL);
switch(event_count){
case -1:
if(errno != EINTR){
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
}
break;
case 0:
break;
default:
for(i=0; i<event_count; i++){
context = ep_events[i].data.ptr;
if(context->ident == id_client){
loop_handle_reads_writes(context, ep_events[i].events);
}else if(context->ident == id_listener){
listensock = ep_events[i].data.ptr;
if (ep_events[i].events & (EPOLLIN | EPOLLPRI)){
while((context = net__socket_accept(listensock)) != NULL){
}
}
#ifdef WITH_WEBSOCKETS
}else if(context->ident == id_listener_ws){
/* Nothing needs to happen here, because we always call lws_service in the loop.
* The important point is we've been woken up for this listener. */
#endif
}
}
}
return MOSQ_ERR_SUCCESS;
}
int mux_epoll__cleanup(void)
{
(void)close(db.epollfd);
db.epollfd = 0;
return MOSQ_ERR_SUCCESS;
}
static void loop_handle_reads_writes(struct mosquitto *context, uint32_t events)
{
int err;
socklen_t len;
int rc;
if(context->sock == INVALID_SOCKET){
return;
}
#ifdef WITH_WEBSOCKETS
if(context->wsi){
struct lws_pollfd wspoll;
wspoll.fd = context->sock;
wspoll.events = (int16_t)context->events;
wspoll.revents = (int16_t)events;
lws_service_fd(lws_get_context(context->wsi), &wspoll);
return;
}
#endif
if(events & EPOLLOUT
#ifdef WITH_TLS
|| context->want_write
|| (context->ssl && context->state == mosq_cs_new)
#endif
){
if(context->state == mosq_cs_connect_pending){
len = sizeof(int);
if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
mosquitto__set_state(context, mosq_cs_new);
#if defined(WITH_ADNS) && defined(WITH_BRIDGE)
if(context->bridge){
bridge__connect_step3(context);
}
#endif
}
}else{
do_disconnect(context, MOSQ_ERR_CONN_LOST);
return;
}
}
rc = packet__write(context);
if(rc){
do_disconnect(context, rc);
return;
}
}
if(events & EPOLLIN
#ifdef WITH_TLS
|| (context->ssl && context->state == mosq_cs_new)
#endif
){
do{
rc = packet__read(context);
if(rc){
do_disconnect(context, rc);
return;
}
}while(SSL_DATA_PENDING(context));
}else{
if(events & (EPOLLERR | EPOLLHUP)){
do_disconnect(context, MOSQ_ERR_CONN_LOST);
return;
}
}
}
#endif