Do plugin callback unregistering automatically.

pull/2386/head
Roger A. Light 4 years ago
parent 6763d94962
commit 2bfc7e7cc8

@ -67,6 +67,9 @@ Broker:
- Improve idle performance. The broker now calculates when the next event of - Improve idle performance. The broker now calculates when the next event of
interest is, and uses that as the timeout for e.g. epoll_wait(). This can interest is, and uses that as the timeout for e.g. epoll_wait(). This can
reduce the number of process wakeups by 100x on an idle broker. reduce the number of process wakeups by 100x on an idle broker.
- Plugins no longer need to define mosquitto_plugin_cleanup() if they do not
need to do any of their own cleanup. Callbacks will be unregistered
automatically.
Client library: Client library:
- Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair

@ -190,6 +190,9 @@ mosq_plugin_EXPORT int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier,
* Called when the broker is shutting down. This will only ever be called once * Called when the broker is shutting down. This will only ever be called once
* per plugin. * per plugin.
* *
* If you do not need to do any of your own cleanup, this function is not
* required. The broker will automatically unregister your callbacks.
*
* Parameters: * Parameters:
* *
* user_data - The pointer provided in <mosquitto_plugin_init>. * user_data - The pointer provided in <mosquitto_plugin_init>.

@ -506,11 +506,6 @@ int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *options, int
UNUSED(options); UNUSED(options);
UNUSED(option_count); UNUSED(option_count);
if(plg_id){
mosquitto_callback_unregister(plg_id, MOSQ_EVT_CONTROL, dynsec_control_callback, "$CONTROL/dynamic-security/v1");
mosquitto_callback_unregister(plg_id, MOSQ_EVT_BASIC_AUTH, dynsec_auth__basic_auth_callback, NULL);
mosquitto_callback_unregister(plg_id, MOSQ_EVT_ACL_CHECK, dynsec__acl_check_callback, NULL);
}
dynsec_groups__cleanup(); dynsec_groups__cleanup();
dynsec_clients__cleanup(); dynsec_clients__cleanup();
dynsec_roles__cleanup(); dynsec_roles__cleanup();

@ -101,12 +101,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL);
}

@ -76,12 +76,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_BASIC_AUTH, basic_auth_callback, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_BASIC_AUTH, basic_auth_callback, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_BASIC_AUTH, basic_auth_callback, NULL);
}

@ -53,12 +53,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL);
}

@ -115,13 +115,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
rc = mosquitto_callback_register(mosq_pid, MOSQ_EVT_DISCONNECT, disconnect_callback, NULL, NULL); rc = mosquitto_callback_register(mosq_pid, MOSQ_EVT_DISCONNECT, disconnect_callback, NULL, NULL);
return rc; return rc;
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_CONNECT, connect_callback, NULL);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_DISCONNECT, disconnect_callback, NULL);
}

@ -162,12 +162,17 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count) int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{ {
struct client_list *client, *client_tmp;
UNUSED(user_data); UNUSED(user_data);
UNUSED(opts); UNUSED(opts);
UNUSED(opt_count); UNUSED(opt_count);
mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_BASIC_AUTH, basic_auth_callback, NULL); HASH_ITER(hh, clients, client, client_tmp){
mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_TICK, tick_callback, NULL); HASH_DELETE(hh, clients, client);
mosquitto_free(client->id);
mosquitto_free(client);
}
return 0; return MOSQ_ERR_SUCCESS;
} }

@ -68,12 +68,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL);
}

@ -72,12 +72,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL);
}

@ -95,12 +95,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL);
}

@ -42,12 +42,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION); mosquitto_plugin_set_info(identifier, PLUGIN_NAME, PLUGIN_VERSION);
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, message_callback, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, message_callback, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, message_callback, NULL);
}

@ -78,12 +78,3 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s
mosq_pid = identifier; mosq_pid = identifier;
return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL); return mosquitto_callback_register(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL, NULL);
} }
int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
UNUSED(user_data);
UNUSED(opts);
UNUSED(opt_count);
return mosquitto_callback_unregister(mosq_pid, MOSQ_EVT_MESSAGE, callback_message, NULL);
}

@ -167,3 +167,26 @@ int control__unregister_callback(mosquitto_plugin_id_t *identifier, MOSQ_FUNC_ge
return MOSQ_ERR_NOT_SUPPORTED; return MOSQ_ERR_NOT_SUPPORTED;
#endif #endif
} }
/* Unregister all control callbacks for a single plugin */
void control__unregister_all_callbacks(mosquitto_plugin_id_t *identifier)
{
struct mosquitto__security_options *opts;
struct mosquitto__callback *cb_found;
struct control_endpoint *ep, *ep_tmp;
opts = &db.config->security_options;
DL_FOREACH_SAFE(identifier->control_endpoints, ep, ep_tmp){
HASH_FIND(hh, opts->plugin_callbacks.control, ep->topic, strlen(ep->topic), cb_found);
if(cb_found){
HASH_DELETE(hh, opts->plugin_callbacks.control, cb_found);
mosquitto__free(cb_found->data);
mosquitto__free(cb_found);
}
DL_DELETE(identifier->control_endpoints, ep);
mosquitto__free(ep);
}
}

@ -259,11 +259,22 @@ struct mosquitto__listener_sock{
struct mosquitto__listener *listener; struct mosquitto__listener *listener;
}; };
/* Callbacks belonging to a specific plugin
* Doesn't include MOSQ_EVT_CONTROL events.
*/
struct plugin_own_callback{
struct plugin_own_callback *next, *prev;
MOSQ_FUNC_generic_callback cb_func;
int event;
};
typedef struct mosquitto_plugin_id_t{ typedef struct mosquitto_plugin_id_t{
struct mosquitto__listener *listener; struct mosquitto__listener *listener;
char *plugin_name; char *plugin_name;
char *plugin_version; char *plugin_version;
struct control_endpoint *control_endpoints; struct control_endpoint *control_endpoints;
struct plugin_own_callback *own_callbacks;
} mosquitto_plugin_id_t; } mosquitto_plugin_id_t;
struct mosquitto__config { struct mosquitto__config {
@ -752,6 +763,7 @@ void control__cleanup(void);
#endif #endif
int control__register_callback(mosquitto_plugin_id_t *pid, MOSQ_FUNC_generic_callback cb_func, const char *topic, void *userdata); int control__register_callback(mosquitto_plugin_id_t *pid, MOSQ_FUNC_generic_callback cb_func, const char *topic, void *userdata);
int control__unregister_callback(mosquitto_plugin_id_t *pid, MOSQ_FUNC_generic_callback cb_func, const char *topic); int control__unregister_callback(mosquitto_plugin_id_t *pid, MOSQ_FUNC_generic_callback cb_func, const char *topic);
void control__unregister_all_callbacks(mosquitto_plugin_id_t *identifier);
/* ============================================================ /* ============================================================
@ -816,6 +828,7 @@ void plugin__handle_disconnect(struct mosquitto *context, int reason);
int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store *stored); int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store *stored);
void LIB_ERROR(void); void LIB_ERROR(void);
void plugin__handle_tick(void); void plugin__handle_tick(void);
int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier);
/* ============================================================ /* ============================================================
* Property related functions * Property related functions

@ -43,16 +43,25 @@ static bool check_callback_exists(struct mosquitto__callback *cb_base, MOSQ_FUNC
} }
static int remove_callback(struct mosquitto__callback **cb_base, MOSQ_FUNC_generic_callback cb_func) static int remove_callback(mosquitto_plugin_id_t *identifier, int event, struct mosquitto__callback **cb_base, MOSQ_FUNC_generic_callback cb_func)
{ {
struct mosquitto__callback *tail, *tmp; struct mosquitto__callback *tail, *tmp;
struct plugin_own_callback *own, *own_tmp;
DL_FOREACH_SAFE(*cb_base, tail, tmp){ DL_FOREACH_SAFE(*cb_base, tail, tmp){
if(tail->cb == cb_func){ if(tail->cb == cb_func){
DL_DELETE(*cb_base, tail); DL_DELETE(*cb_base, tail);
mosquitto__free(tail); mosquitto__free(tail);
break;
}
}
DL_FOREACH_SAFE(identifier->own_callbacks, own, own_tmp){
if(own->cb_func == cb_func && own->event == event){
DL_DELETE(identifier->own_callbacks, own);
mosquitto__free(own);
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
mosquitto__free(own);
} }
return MOSQ_ERR_NOT_FOUND; return MOSQ_ERR_NOT_FOUND;
} }
@ -70,13 +79,8 @@ int plugin__load_v5(struct mosquitto__listener *listener, struct mosquitto__auth
LIB_CLOSE(lib); LIB_CLOSE(lib);
return MOSQ_ERR_UNKNOWN; return MOSQ_ERR_UNKNOWN;
} }
if(!(plugin->plugin_cleanup_v5 = (FUNC_plugin_cleanup_v5)LIB_SYM(lib, "mosquitto_plugin_cleanup"))){ /* Optional function */
log__printf(NULL, MOSQ_LOG_ERR, plugin->plugin_cleanup_v5 = (FUNC_plugin_cleanup_v5)LIB_SYM(lib, "mosquitto_plugin_cleanup");
"Error: Unable to load plugin function mosquitto_plugin_cleanup().");
LIB_ERROR();
LIB_CLOSE(lib);
return MOSQ_ERR_UNKNOWN;
}
pid = mosquitto__calloc(1, sizeof(mosquitto_plugin_id_t)); pid = mosquitto__calloc(1, sizeof(mosquitto_plugin_id_t));
if(pid == NULL){ if(pid == NULL){
@ -262,6 +266,7 @@ int mosquitto_callback_register(
struct mosquitto__callback **cb_base = NULL, *cb_new; struct mosquitto__callback **cb_base = NULL, *cb_new;
struct mosquitto__security_options *security_options; struct mosquitto__security_options *security_options;
const char *event_name; const char *event_name;
struct plugin_own_callback *own_callback;
if(cb_func == NULL) return MOSQ_ERR_INVAL; if(cb_func == NULL) return MOSQ_ERR_INVAL;
@ -328,6 +333,15 @@ int mosquitto_callback_register(
if(cb_new == NULL){ if(cb_new == NULL){
return MOSQ_ERR_NOMEM; return MOSQ_ERR_NOMEM;
} }
own_callback = mosquitto__calloc(1, sizeof(struct plugin_own_callback));
if(own_callback == NULL){
mosquitto__free(cb_new);
return MOSQ_ERR_NOMEM;
}
own_callback->event = event;
own_callback->cb_func = cb_func;
DL_APPEND(identifier->own_callbacks, own_callback);
DL_APPEND(*cb_base, cb_new); DL_APPEND(*cb_base, cb_new);
cb_new->cb = cb_func; cb_new->cb = cb_func;
cb_new->userdata = userdata; cb_new->userdata = userdata;
@ -341,6 +355,71 @@ int mosquitto_callback_register(
} }
int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier)
{
struct mosquitto__callback **cb_base = NULL;
struct mosquitto__security_options *security_options;
struct plugin_own_callback *own, *own_tmp;
if(identifier == NULL){
return MOSQ_ERR_INVAL;
}
if(identifier->listener == NULL){
security_options = &db.config->security_options;
}else{
security_options = &identifier->listener->security_options;
}
control__unregister_all_callbacks(identifier);
DL_FOREACH_SAFE(identifier->own_callbacks, own, own_tmp){
switch(own->event){
case MOSQ_EVT_RELOAD:
cb_base = &security_options->plugin_callbacks.reload;
break;
case MOSQ_EVT_ACL_CHECK:
cb_base = &security_options->plugin_callbacks.acl_check;
break;
case MOSQ_EVT_BASIC_AUTH:
cb_base = &security_options->plugin_callbacks.basic_auth;
break;
case MOSQ_EVT_PSK_KEY:
cb_base = &security_options->plugin_callbacks.psk_key;
break;
case MOSQ_EVT_EXT_AUTH_START:
cb_base = &security_options->plugin_callbacks.ext_auth_start;
break;
case MOSQ_EVT_EXT_AUTH_CONTINUE:
cb_base = &security_options->plugin_callbacks.ext_auth_continue;
break;
case MOSQ_EVT_CONTROL:
cb_base = NULL;
break;
case MOSQ_EVT_MESSAGE:
cb_base = &security_options->plugin_callbacks.message;
break;
case MOSQ_EVT_TICK:
cb_base = &security_options->plugin_callbacks.tick;
break;
case MOSQ_EVT_DISCONNECT:
cb_base = &security_options->plugin_callbacks.disconnect;
break;
case MOSQ_EVT_CONNECT:
cb_base = &security_options->plugin_callbacks.connect;
break;
default:
cb_base = NULL;
break;
}
if(cb_base){
remove_callback(identifier, own->event, cb_base, own->cb_func);
}
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_callback_unregister( int mosquitto_callback_unregister(
mosquitto_plugin_id_t *identifier, mosquitto_plugin_id_t *identifier,
int event, int event,
@ -398,7 +477,7 @@ int mosquitto_callback_unregister(
break; break;
} }
return remove_callback(cb_base, cb_func); return remove_callback(identifier, event, cb_base, cb_func);
} }
void mosquitto_complete_basic_auth(const char *client_id, int result) void mosquitto_complete_basic_auth(const char *client_id, int result)

@ -406,46 +406,53 @@ static void security__module_cleanup_single(struct mosquitto__security_options *
{ {
int i; int i;
struct control_endpoint *ep, *tmp; struct control_endpoint *ep, *tmp;
struct mosquitto__auth_plugin_config *conf;
for(i=0; i<opts->auth_plugin_config_count; i++){ for(i=0; i<opts->auth_plugin_config_count; i++){
conf = &opts->auth_plugin_configs[i];
/* Run plugin cleanup function */ /* Run plugin cleanup function */
if(opts->auth_plugin_configs[i].plugin.version == 5){ if(conf->plugin.version == 5){
opts->auth_plugin_configs[i].plugin.plugin_cleanup_v5( if(conf->plugin.plugin_cleanup_v5){
opts->auth_plugin_configs[i].plugin.user_data, conf->plugin.plugin_cleanup_v5(
opts->auth_plugin_configs[i].options, conf->plugin.user_data,
opts->auth_plugin_configs[i].option_count); conf->options,
mosquitto__free(opts->auth_plugin_configs[i].plugin.identifier->plugin_name); conf->option_count);
mosquitto__free(opts->auth_plugin_configs[i].plugin.identifier->plugin_version);
DL_FOREACH_SAFE(opts->auth_plugin_configs[i].plugin.identifier->control_endpoints, ep, tmp){
DL_DELETE(opts->auth_plugin_configs[i].plugin.identifier->control_endpoints, ep);
mosquitto__free(ep);
} }
mosquitto__free(opts->auth_plugin_configs[i].plugin.identifier);
opts->auth_plugin_configs[i].plugin.identifier = NULL;
}else if(opts->auth_plugin_configs[i].plugin.version == 4){
opts->auth_plugin_configs[i].plugin.plugin_cleanup_v4(
opts->auth_plugin_configs[i].plugin.user_data,
opts->auth_plugin_configs[i].options,
opts->auth_plugin_configs[i].option_count);
}else if(opts->auth_plugin_configs[i].plugin.version == 3){ plugin__callback_unregister_all(conf->plugin.identifier);
opts->auth_plugin_configs[i].plugin.plugin_cleanup_v3( mosquitto__free(conf->plugin.identifier->plugin_name);
opts->auth_plugin_configs[i].plugin.user_data, mosquitto__free(conf->plugin.identifier->plugin_version);
opts->auth_plugin_configs[i].options, DL_FOREACH_SAFE(conf->plugin.identifier->control_endpoints, ep, tmp){
opts->auth_plugin_configs[i].option_count); DL_DELETE(conf->plugin.identifier->control_endpoints, ep);
mosquitto__free(ep);
}else if(opts->auth_plugin_configs[i].plugin.version == 2){ }
opts->auth_plugin_configs[i].plugin.plugin_cleanup_v2( mosquitto__free(conf->plugin.identifier);
opts->auth_plugin_configs[i].plugin.user_data, conf->plugin.identifier = NULL;
(struct mosquitto_auth_opt *)opts->auth_plugin_configs[i].options,
opts->auth_plugin_configs[i].option_count); }else if(conf->plugin.version == 4){
conf->plugin.plugin_cleanup_v4(
conf->plugin.user_data,
conf->options,
conf->option_count);
}else if(conf->plugin.version == 3){
conf->plugin.plugin_cleanup_v3(
conf->plugin.user_data,
conf->options,
conf->option_count);
}else if(conf->plugin.version == 2){
conf->plugin.plugin_cleanup_v2(
conf->plugin.user_data,
(struct mosquitto_auth_opt *)conf->options,
conf->option_count);
} }
if(opts->auth_plugin_configs[i].plugin.lib){ if(conf->plugin.lib){
LIB_CLOSE(opts->auth_plugin_configs[i].plugin.lib); LIB_CLOSE(conf->plugin.lib);
} }
memset(&opts->auth_plugin_configs[i].plugin, 0, sizeof(struct mosquitto__auth_plugin)); memset(&conf->plugin, 0, sizeof(struct mosquitto__auth_plugin));
} }
} }

Loading…
Cancel
Save