Merge branch 'abiliojr-bridge_reload' into develop

pull/2053/head
Roger A. Light 5 years ago
commit fb9ed94b9e

@ -1495,7 +1495,7 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
<title>Configuring Bridges</title>
<para>Multiple bridges (connections to other brokers) can be configured
using the following variables.</para>
<para>Bridges cannot currently be reloaded on reload signal.</para>
<para>Reloaded on reload signal.</para>
<variablelist>
<varlistentry>
<term><option>address</option> <replaceable>address[:port]</replaceable> <replaceable>[address[:port]]</replaceable></term>
@ -2037,6 +2037,24 @@ topic clients/total in 0 test/mosquitto/org/ $SYS/broker/
can be used on one bridge at once.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_reload_type</option> [ lazy | immediate ]</term>
<listitem>
<para>If you change bridge options in the configuration file,
those configuration changes are applied during a bridge
reconnection. The <option>bridge_reload_type<option> option
determines when that reconnection happens, and can be set to either
<replaceable>lazy</replaceable> or <replaceable>immediate</replaceable>.</para>
<para><replaceable>lazy</replaceable> is the default, and means
that any connected bridge will remain in its current state until
a natural reconnection happens, at which point the new configuration
will be used.</para>
<para><replaceable>immediate</replaceable> forces a reconnection and so
uses the new configuration straight away.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_require_ocsp</option> [ true | false ]</term>
<listitem>

@ -806,6 +806,18 @@
# Set to 0 for "unlimited".
#bridge_max_packet_size 0
# If you change bridge options in the configuration file, those configuration
# changes are applied during a bridge reconnection. The bridge_reload_type
# option determines when that reconnection happens, and can be set to either
# lazy or immediate.
#
# lazy is the default, and means that any connected bridge will remain in its
# current state until a natural reconnection happens, at which point the new
# configuration is used.
#
# immediate forces a reconnection and so uses the new configuration straight
# away.
#bridge_reload_type lazy
# -----------------------------------------------------------------
# Certificate based SSL/TLS support

@ -57,20 +57,7 @@ Contributors:
static void bridge__backoff_step(struct mosquitto *context);
static void bridge__backoff_reset(struct mosquitto *context);
void bridge__start_all(void)
{
int i;
for(i=0; i<db.config->bridge_count; i++){
if(bridge__new(&(db.config->bridges[i]))){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect to bridge %s.",
db.config->bridges[i].name);
}
}
}
int bridge__new(struct mosquitto__bridge *bridge)
static struct mosquitto *bridge__new(struct mosquitto__bridge *bridge)
{
struct mosquitto *new_context = NULL;
struct mosquitto **bridges;
@ -89,7 +76,7 @@ int bridge__new(struct mosquitto__bridge *bridge)
new_context = context__init(INVALID_SOCKET);
if(!new_context){
mosquitto__free(local_id);
return MOSQ_ERR_NOMEM;
return NULL;
}
new_context->id = local_id;
HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, new_context->id, strlen(new_context->id), new_context);
@ -97,24 +84,24 @@ int bridge__new(struct mosquitto__bridge *bridge)
new_context->bridge = bridge;
new_context->is_bridge = true;
new_context->username = new_context->bridge->remote_username;
new_context->password = new_context->bridge->remote_password;
new_context->username = bridge->remote_username;
new_context->password = bridge->remote_password;
#ifdef WITH_TLS
new_context->tls_cafile = new_context->bridge->tls_cafile;
new_context->tls_capath = new_context->bridge->tls_capath;
new_context->tls_certfile = new_context->bridge->tls_certfile;
new_context->tls_keyfile = new_context->bridge->tls_keyfile;
new_context->tls_cafile = bridge->tls_cafile;
new_context->tls_capath = bridge->tls_capath;
new_context->tls_certfile = bridge->tls_certfile;
new_context->tls_keyfile = bridge->tls_keyfile;
new_context->tls_cert_reqs = SSL_VERIFY_PEER;
new_context->tls_ocsp_required = new_context->bridge->tls_ocsp_required;
new_context->tls_version = new_context->bridge->tls_version;
new_context->tls_insecure = new_context->bridge->tls_insecure;
new_context->tls_alpn = new_context->bridge->tls_alpn;
new_context->tls_ocsp_required = bridge->tls_ocsp_required;
new_context->tls_version = bridge->tls_version;
new_context->tls_insecure = bridge->tls_insecure;
new_context->tls_alpn = bridge->tls_alpn;
new_context->tls_engine = db.config->default_listener.tls_engine;
new_context->tls_keyform = db.config->default_listener.tls_keyform;
#ifdef FINAL_WITH_TLS_PSK
new_context->tls_psk_identity = new_context->bridge->tls_psk_identity;
new_context->tls_psk = new_context->bridge->tls_psk;
new_context->tls_psk_identity = bridge->tls_psk_identity;
new_context->tls_psk = bridge->tls_psk;
#endif
#endif
@ -132,15 +119,43 @@ int bridge__new(struct mosquitto__bridge *bridge)
db.bridge_count++;
db.bridges[db.bridge_count-1] = new_context;
}else{
return MOSQ_ERR_NOMEM;
return NULL;
}
return new_context;
}
static void bridge__destroy(struct mosquitto *context)
{
send__disconnect(context, MQTT_RC_SUCCESS, NULL);
context__cleanup(context, true);
}
void bridge__start_all(void)
{
int i;
for(i=0; i<db.config->bridge_count; i++){
struct mosquitto *context;
int ret;
context = bridge__new(db.config->bridges[i]);
assert(context);
#if defined(__GLIBC__) && defined(WITH_ADNS)
new_context->bridge->restart_t = 1; /* force quick restart of bridge */
return bridge__connect_step1(new_context);
context->bridge->restart_t = 1; /* force quick restart of bridge */
ret = bridge__connect_step1(context);
#else
return bridge__connect(new_context);
ret = bridge__connect(context);
#endif
if (ret){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect bridge %s.",
context->bridge->name);
}
db.config->bridges[i] = NULL;
}
}
#if defined(__GLIBC__) && defined(WITH_ADNS)
@ -583,15 +598,69 @@ int bridge__register_local_connections(void)
}
void bridge__reload(void)
{
int i;
int j;
// destroy old bridges that dissappeared
for(i=0;i<db.bridge_count;i++){
for(j=0;j<db.config->bridge_count;j++){
if(!strcmp(db.bridges[i]->bridge->name, db.config->bridges[j]->name)) break;
}
if(j==db.config->bridge_count){
bridge__destroy(db.bridges[i]);
}
}
for(i=0;i<db.config->bridge_count;i++){
for(j=0;j<db.bridge_count; j++){
if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)) break;
}
if(j==db.bridge_count){
// a new bridge was found, create it
bridge__new(db.config->bridges[i]);
db.config->bridges[i] = NULL;
continue;
}
if(db.config->bridges[i]->reload_type == brt_immediate){
// in this case, an existing bridge should match
for(j=0;j<db.bridge_count;j++){
if(!strcmp(db.config->bridges[i]->name, db.bridges[j]->bridge->name)) break;
}
assert(j<db.bridge_count);
db.bridges[j]->will_delay_interval = 0;
bridge__destroy(db.bridges[j]);
bridge__new(db.config->bridges[i]);
db.config->bridges[i] = NULL;
}
}
}
void bridge__cleanup(struct mosquitto *context)
{
int i;
assert(db.bridge_count > 0);
for(i=0; i<db.bridge_count; i++){
if(db.bridges[i] == context){
db.bridges[i] = NULL;
db.bridges[i] = db.bridges[db.bridge_count-1];
break;
}
}
db.bridge_count--;
db.bridges = mosquitto__realloc(db.bridges, (unsigned) db.bridge_count * sizeof(db.bridges[0]));
mosquitto__free(context->bridge->name);
context->bridge->name = NULL;
mosquitto__free(context->bridge->local_clientid);
context->bridge->local_clientid = NULL;
@ -621,6 +690,27 @@ void bridge__cleanup(struct mosquitto *context)
context->ssl_ctx = NULL;
}
#endif
for(i=0; i<context->bridge->address_count; i++){
mosquitto__free(context->bridge->addresses[i].address);
}
mosquitto__free(context->bridge->addresses);
context->bridge->addresses = NULL;
for(i=0; i<context->bridge->topic_count; i++){
mosquitto__free(context->bridge->topics[i].topic);
mosquitto__free(context->bridge->topics[i].local_prefix);
mosquitto__free(context->bridge->topics[i].remote_prefix);
mosquitto__free(context->bridge->topics[i].local_topic);
mosquitto__free(context->bridge->topics[i].remote_topic);
}
mosquitto__free(context->bridge->topics);
context->bridge->topics = NULL;
config__bridge_cleanup(context->bridge);
context->bridge = NULL;
}
@ -685,6 +775,22 @@ static void bridge__backoff_reset(struct mosquitto *context)
}
}
static bool reload_if_needed(struct mosquitto *context)
{
int i;
for(i=0;i<db.config->bridge_count;i++){
if(db.config->bridges[i] && !strcmp(context->bridge->name, db.config->bridges[i]->name)){
bridge__destroy(context);
bridge__new(db.config->bridges[i]);
db.config->bridges[i] = NULL;
return true;
}
}
return false;
}
void bridge_check(void)
{
static time_t last_check = 0;
@ -750,6 +856,8 @@ void bridge_check(void)
if(context->sock == INVALID_SOCKET){
if(reload_if_needed(context)) continue;
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = db.now_s+context->bridge->restart_timeout;

@ -70,6 +70,9 @@ static int conf__parse_string(char **token, const char *name, char **value, char
static int config__read_file(struct mosquitto__config *config, bool reload, const char *file, struct config_recurse *config_tmp, int level, int *lineno);
static int config__check(struct mosquitto__config *config);
static void config__cleanup_plugins(struct mosquitto__config *config);
#ifdef WITH_BRIDGE
static int config__check_bridges(struct mosquitto__config *config);
#endif
static void conf__set_cur_security_options(struct mosquitto__config *config, struct mosquitto__listener *cur_listener, struct mosquitto__security_options **security_options)
{
@ -249,9 +252,6 @@ void config__init(struct mosquitto__config *config)
void config__cleanup(struct mosquitto__config *config)
{
int i;
#ifdef WITH_BRIDGE
int j;
#endif
mosquitto__free(config->clientid_prefixes);
mosquitto__free(config->persistence_location);
@ -306,39 +306,7 @@ void config__cleanup(struct mosquitto__config *config)
#ifdef WITH_BRIDGE
if(config->bridges){
for(i=0; i<config->bridge_count; i++){
mosquitto__free(config->bridges[i].name);
if(config->bridges[i].addresses){
for(j=0; j<config->bridges[i].address_count; j++){
mosquitto__free(config->bridges[i].addresses[j].address);
}
mosquitto__free(config->bridges[i].addresses);
}
mosquitto__free(config->bridges[i].remote_clientid);
mosquitto__free(config->bridges[i].remote_username);
mosquitto__free(config->bridges[i].remote_password);
mosquitto__free(config->bridges[i].local_clientid);
mosquitto__free(config->bridges[i].local_username);
mosquitto__free(config->bridges[i].local_password);
if(config->bridges[i].topics){
for(j=0; j<config->bridges[i].topic_count; j++){
mosquitto__free(config->bridges[i].topics[j].topic);
mosquitto__free(config->bridges[i].topics[j].local_prefix);
mosquitto__free(config->bridges[i].topics[j].remote_prefix);
mosquitto__free(config->bridges[i].topics[j].local_topic);
mosquitto__free(config->bridges[i].topics[j].remote_topic);
}
mosquitto__free(config->bridges[i].topics);
}
mosquitto__free(config->bridges[i].notification_topic);
#ifdef WITH_TLS
mosquitto__free(config->bridges[i].tls_version);
mosquitto__free(config->bridges[i].tls_cafile);
mosquitto__free(config->bridges[i].tls_alpn);
#ifdef FINAL_WITH_TLS_PSK
mosquitto__free(config->bridges[i].tls_psk_identity);
mosquitto__free(config->bridges[i].tls_psk);
#endif
#endif
config__bridge_cleanup(config->bridges[i]);
}
mosquitto__free(config->bridges);
}
@ -355,6 +323,49 @@ void config__cleanup(struct mosquitto__config *config)
}
}
#ifdef WITH_BRIDGE
void config__bridge_cleanup(struct mosquitto__bridge *bridge)
{
int i;
if(bridge == NULL) return;
mosquitto__free(bridge->name);
if(bridge->addresses){
for(i=0; i<bridge->address_count; i++){
mosquitto__free(bridge->addresses[i].address);
}
mosquitto__free(bridge->addresses);
}
mosquitto__free(bridge->remote_clientid);
mosquitto__free(bridge->remote_username);
mosquitto__free(bridge->remote_password);
mosquitto__free(bridge->local_clientid);
mosquitto__free(bridge->local_username);
mosquitto__free(bridge->local_password);
if(bridge->topics){
for(i=0; i<bridge->topic_count; i++){
mosquitto__free(bridge->topics[i].topic);
mosquitto__free(bridge->topics[i].local_prefix);
mosquitto__free(bridge->topics[i].remote_prefix);
mosquitto__free(bridge->topics[i].local_topic);
mosquitto__free(bridge->topics[i].remote_topic);
}
mosquitto__free(bridge->topics);
}
mosquitto__free(bridge->notification_topic);
#ifdef WITH_TLS
mosquitto__free(bridge->tls_version);
mosquitto__free(bridge->tls_cafile);
mosquitto__free(bridge->tls_alpn);
#ifdef FINAL_WITH_TLS_PSK
mosquitto__free(bridge->tls_psk_identity);
mosquitto__free(bridge->tls_psk);
#endif
#endif
mosquitto__free(bridge);
}
#endif
static void print_usage(void)
{
printf("mosquitto version %s\n\n", VERSION);
@ -529,6 +540,8 @@ int config__parse_args(struct mosquitto__config *config, int argc, char *argv[])
void config__copy(struct mosquitto__config *src, struct mosquitto__config *dest)
{
int i;
mosquitto__free(dest->security_options.acl_file);
dest->security_options.acl_file = src->security_options.acl_file;
@ -590,6 +603,15 @@ void config__copy(struct mosquitto__config *src, struct mosquitto__config *dest)
#ifdef WITH_WEBSOCKETS
dest->websockets_log_level = src->websockets_log_level;
#endif
#ifdef WITH_BRIDGE
for(i=0;i<dest->bridge_count;i++){
if(dest->bridges[i]) config__bridge_cleanup(dest->bridges[i]);
}
mosquitto__free(dest->bridges);
dest->bridges = src->bridges;
dest->bridge_count = src->bridge_count;
#endif
}
@ -684,24 +706,24 @@ int config__read(struct mosquitto__config *config, bool reload)
#ifdef WITH_BRIDGE
for(i=0; i<config->bridge_count; i++){
if(!config->bridges[i].name){
if(!config->bridges[i]->name){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: bridge name not defined.");
return MOSQ_ERR_INVAL;
}
if(config->bridges[i].addresses == 0){
if(config->bridges[i]->addresses == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: no remote addresses defined.");
return MOSQ_ERR_INVAL;
}
if(config->bridges[i].topic_count == 0){
if(config->bridges[i]->topic_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: no topics defined.");
return MOSQ_ERR_INVAL;
}
#ifdef FINAL_WITH_TLS_PSK
if(config->bridges[i].tls_psk && !config->bridges[i].tls_psk_identity){
if(config->bridges[i]->tls_psk && !config->bridges[i]->tls_psk_identity){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: missing bridge_identity.");
return MOSQ_ERR_INVAL;
}
if(config->bridges[i].tls_psk_identity && !config->bridges[i].tls_psk){
if(config->bridges[i]->tls_psk_identity && !config->bridges[i]->tls_psk){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: missing bridge_psk.");
return MOSQ_ERR_INVAL;
}
@ -717,7 +739,11 @@ int config__read(struct mosquitto__config *config, bool reload)
}else if(cr.log_type_set){
config->log_type = cr.log_type;
}
#ifdef WITH_BRIDGE
return config__check_bridges(config);
#else
return MOSQ_ERR_SUCCESS;
#endif
}
@ -765,7 +791,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_string(&token, "acl_file", &cur_security_options->acl_file, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "address") || !strcmp(token, "addresses")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge || cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -929,7 +954,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_attempt_unsubscribe")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -940,7 +964,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_cafile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -957,7 +980,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_alpn")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -979,7 +1001,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_capath")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -996,7 +1017,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_certfile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1013,7 +1033,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_identity")){
#if defined(WITH_BRIDGE) && defined(FINAL_WITH_TLS_PSK)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1028,7 +1047,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_insecure")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1042,7 +1060,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_require_ocsp")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* Listeners not valid for reloading. */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1066,7 +1083,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_outgoing_retain")){
#if defined(WITH_BRIDGE)
if(reload) continue; // Listeners not valid for reloading.
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1077,7 +1093,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_keyfile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1094,7 +1109,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_protocol_version")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1120,7 +1134,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_psk")){
#if defined(WITH_BRIDGE) && defined(FINAL_WITH_TLS_PSK)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1132,10 +1145,32 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_string(&token, "bridge_psk", &cur_bridge->tls_psk, saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge and/or TLS-PSK support not available.");
#endif
}else if(!strcmp(token, "bridge_reload_type")){
#ifdef WITH_BRIDGE
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcmp(token, "lazy")){
cur_bridge->reload_type = brt_lazy;
}else if(!strcmp(token, "immediate")){
cur_bridge->reload_type = brt_immediate;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge_reload_type value in configuration (%s).", token);
return MOSQ_ERR_INVAL;
}
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty bridge_reload_type value in configuration.");
return MOSQ_ERR_INVAL;
}
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_tls_version")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1192,7 +1227,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "clientid") || !strcmp(token, "remote_clientid")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1203,7 +1237,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "cleansession")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1214,7 +1247,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "local_cleansession")){
#ifdef WITH_BRIDGE
if(reload) continue; // FIXME
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1232,24 +1264,30 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_string(&token, "clientid_prefixes", &config->clientid_prefixes, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "connection")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
token = strtok_r(NULL, " ", &saveptr);
if(token){
/* Check for existing bridge name. */
for(i=0; i<config->bridge_count; i++){
if(!strcmp(config->bridges[i].name, token)){
if(!strcmp(config->bridges[i]->name, token)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", token);
return MOSQ_ERR_INVAL;
}
}
config->bridge_count++;
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge *));
if(!config->bridges){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge = &(config->bridges[config->bridge_count-1]);
cur_bridge = mosquitto__malloc(sizeof(struct mosquitto__bridge));
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
config->bridges[config->bridge_count-1] = cur_bridge;
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
cur_bridge->name = mosquitto__strdup(token);
if(!cur_bridge->name){
@ -1271,6 +1309,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
cur_bridge->primary_retry_sock = INVALID_SOCKET;
cur_bridge->outgoing_retain = true;
cur_bridge->clean_start_local = -1;
cur_bridge->reload_type = brt_lazy;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
@ -1303,7 +1342,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "idle_timeout")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1348,7 +1386,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "keepalive_interval")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1464,7 +1501,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "local_clientid")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1475,7 +1511,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "local_password")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1486,7 +1521,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "local_username")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1703,7 +1737,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "notifications")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1714,7 +1747,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "notifications_local_only")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration");
return MOSQ_ERR_INVAL;
@ -1725,7 +1757,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "notification_topic")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1736,7 +1767,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "password") || !strcmp(token, "remote_password")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1864,7 +1894,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "restart_timeout")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1897,7 +1926,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: The retry_interval option is no longer available.");
}else if(!strcmp(token, "round_robin")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1910,7 +1938,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_bool(&token, "set_tcp_nodelay", &config->set_tcp_nodelay, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "start_type")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -1961,7 +1988,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "threshold")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -2014,7 +2040,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "topic")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -2099,7 +2124,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "try_private")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -2133,7 +2157,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_bool(&token, "use_username_as_clientid", &cur_listener->use_username_as_clientid, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "username") || !strcmp(token, "remote_username")){
#ifdef WITH_BRIDGE
if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@ -2209,14 +2232,40 @@ int config__read_file(struct mosquitto__config *config, bool reload, const char
return rc;
}
static int config__check(struct mosquitto__config *config)
{
/* Checks that are easy to make after the config has been loaded. */
int i;
/* Default to auto_id_prefix = 'auto-' if none set. */
if(config->per_listener_settings){
for(i=0; i<config->listener_count; i++){
if(!config->listeners[i].security_options.auto_id_prefix){
config->listeners[i].security_options.auto_id_prefix = mosquitto__strdup("auto-");
if(!config->listeners[i].security_options.auto_id_prefix){
return MOSQ_ERR_NOMEM;
}
config->listeners[i].security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
}
}
}else{
if(!config->security_options.auto_id_prefix){
config->security_options.auto_id_prefix = mosquitto__strdup("auto-");
if(!config->security_options.auto_id_prefix){
return MOSQ_ERR_NOMEM;
}
config->security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
}
}
return MOSQ_ERR_SUCCESS;
}
#ifdef WITH_BRIDGE
static int config__check_bridges(struct mosquitto__config *config)
{
int i;
int j;
struct mosquitto__bridge *bridge1, *bridge2;
char hostname[256];
@ -2225,7 +2274,7 @@ static int config__check(struct mosquitto__config *config)
/* Check for bridge duplicate local_clientid, need to generate missing IDs
* first. */
for(i=0; i<config->bridge_count; i++){
bridge1 = &config->bridges[i];
bridge1 = config->bridges[i];
if(!bridge1->remote_clientid){
if(!gethostname(hostname, 256)){
@ -2252,9 +2301,9 @@ static int config__check(struct mosquitto__config *config)
}
for(i=0; i<config->bridge_count; i++){
bridge1 = &config->bridges[i];
bridge1 = config->bridges[i];
for(j=i+1; j<config->bridge_count; j++){
bridge2 = &config->bridges[j];
bridge2 = config->bridges[j];
if(!strcmp(bridge1->local_clientid, bridge2->local_clientid)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Bridge local_clientid "
"'%s' is not unique. Try changing or setting the "
@ -2264,31 +2313,10 @@ static int config__check(struct mosquitto__config *config)
}
}
}
#endif
/* Default to auto_id_prefix = 'auto-' if none set. */
if(config->per_listener_settings){
for(i=0; i<config->listener_count; i++){
if(!config->listeners[i].security_options.auto_id_prefix){
config->listeners[i].security_options.auto_id_prefix = mosquitto__strdup("auto-");
if(!config->listeners[i].security_options.auto_id_prefix){
return MOSQ_ERR_NOMEM;
}
config->listeners[i].security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
}
}
}else{
if(!config->security_options.auto_id_prefix){
config->security_options.auto_id_prefix = mosquitto__strdup("auto-");
if(!config->security_options.auto_id_prefix){
return MOSQ_ERR_NOMEM;
}
config->security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
}
}
return MOSQ_ERR_SUCCESS;
}
#endif
static int conf__parse_bool(char **token, const char *name, bool *value, char *saveptr)

@ -210,6 +210,9 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
mosquitto_security_apply();
log__close(db.config);
log__init(db.config);
#ifdef WITH_BRIDGE
bridge__reload();
#endif
flag_reload = false;
}
if(flag_tree_print){

@ -296,7 +296,7 @@ struct mosquitto__config {
uint16_t websockets_headers_size;
#endif
#ifdef WITH_BRIDGE
struct mosquitto__bridge *bridges;
struct mosquitto__bridge **bridges;
int bridge_count;
#endif
struct mosquitto__security_options security_options;
@ -485,6 +485,11 @@ enum mosquitto_bridge_start_type{
bst_once = 3
};
enum mosquitto_bridge_reload_type{
brt_lazy = 0,
brt_immediate = 1,
};
struct mosquitto__bridge_topic{
char *topic;
char *local_prefix;
@ -539,6 +544,7 @@ struct mosquitto__bridge{
bool attempt_unsubscribe;
bool initial_notification_done;
bool outgoing_retain;
enum mosquitto_bridge_reload_type reload_type;
#ifdef WITH_TLS
bool tls_insecure;
bool tls_ocsp_required;
@ -581,6 +587,9 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
* ============================================================ */
/* Initialise config struct to default values. */
void config__init(struct mosquitto__config *config);
#ifdef WITH_BRIDGE
void config__bridge_cleanup(struct mosquitto__bridge *bridge);
#endif
/* Parse command line options into config. */
int config__parse_args(struct mosquitto__config *config, int argc, char *argv[]);
/* Read configuration data from config->config_file into config.
@ -713,7 +722,7 @@ void log__internal(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
* ============================================================ */
#ifdef WITH_BRIDGE
void bridge__start_all(void);
int bridge__new(struct mosquitto__bridge *bridge);
void bridge__reload(void);
void bridge__cleanup(struct mosquitto *context);
int bridge__connect(struct mosquitto *context);
int bridge__connect_step1(struct mosquitto *context);

@ -0,0 +1,129 @@
#!/usr/bin/env python3
# tests that bridge configuration is reloaded on signal
from mosq_test_helper import *
import signal
def write_config(filename, port1, port2, subtopic, reload_immediate=False):
with open(filename, 'w') as f:
f.write("listener %d\n" % (port2))
f.write("allow_anonymous true\n")
f.write("\n")
f.write("connection bridge_sample\n")
f.write("address 127.0.0.1:%d\n" % (port1))
f.write("topic # in 0 local/topic/ remote/%s/\n" % (subtopic))
f.write("notifications false\n")
f.write("restart_timeout 1\n")
if reload_immediate:
f.write("bridge_reload_type immediate")
def accept_new_connection(sock):
conn, _ = sock.accept()
conn.settimeout(20)
client_id = socket.gethostname()+".bridge_sample"
connect_packet = mosq_test.gen_connect(
client_id, keepalive=60, clean_session=False, proto_ver=0x84)
connack_packet = mosq_test.gen_connack()
mosq_test.expect_packet(conn, "connect", connect_packet)
conn.send(connack_packet)
return conn
def accept_subscription(socket, topic, mid=1, qos=0):
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos)
suback_packet = mosq_test.gen_suback(mid, qos)
mosq_test.expect_packet(socket, "subscribe", subscribe_packet)
socket.send(suback_packet)
def start_fake_broker(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(3)
sock.bind(('', port))
sock.listen(5)
return sock
def expect_no_incoming_connection(sock):
try:
accept_new_connection(sock) # will timeout if nothing comes in
raise mosq_test.TestError # hence, it shouldn't reach this
except socket.timeout:
pass
def do_test():
rc = 1
port1, port2 = mosq_test.get_port(2)
conf_file = os.path.basename(__file__).replace('.py', '.conf')
try:
ssock = start_fake_broker(port1)
write_config(conf_file, port1, port2, "topic1", True)
broker = mosq_test.start_broker(
filename=os.path.basename(__file__), port=port2, use_conf=True)
bridge = accept_new_connection(ssock)
accept_subscription(bridge, "remote/topic1/#")
write_config(conf_file, port1, port2, "topic2", True)
broker.send_signal(signal.SIGHUP)
bridge = accept_new_connection(ssock) # immediate reload forces a reconnection
accept_subscription(bridge, "remote/topic2/#")
write_config(conf_file, port1, port2, "topic3", False)
broker.send_signal(signal.SIGHUP)
expect_no_incoming_connection(ssock) # as it was set to lazy reload
bridge.close()
bridge = accept_new_connection(ssock)
accept_subscription(bridge, "remote/topic3/#")
rc = 0
except mosq_test.TestError:
pass
finally:
try:
broker.terminate()
broker.wait()
_, stde = broker.communicate()
if rc:
print(stde.decode('utf-8'))
except NameError:
pass
try:
os.remove(conf_file)
except FileNotFoundError:
pass
try:
bridge.close()
except NameError:
pass
try:
ssock.close()
except NameError:
pass
return rc
exit_code = do_test()
exit(exit_code)

@ -129,6 +129,7 @@ test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14
./06-bridge-outgoing-retain.py
./06-bridge-per-listener-settings.py
./06-bridge-reconnect-local-out.py
./06-bridge-config-reload.py
07 :
./07-will-delay-reconnect.py

@ -109,6 +109,7 @@ tests = [
(2, './06-bridge-outgoing-retain.py'),
(3, './06-bridge-per-listener-settings.py'),
(2, './06-bridge-reconnect-local-out.py'),
(2, './06-bridge-config-reload.py'),
(1, './07-will-delay-reconnect.py'),
(1, './07-will-delay-recover.py'),

Loading…
Cancel
Save