Compare commits

...

11 Commits

Author SHA1 Message Date
Roger A. Light 2760985f1e Fix man page installation.
Closes #2363. Thanks to yvs2014.
4 years ago
Roger Light 90566d3f9e
Merge pull request #2318 from cdelston0/dynamic-bridge
Fix dynamic bridge start-up delay
4 years ago
Chris Elston 63d400dc50 Additional bridge parameters for dynamic bridges
Signed-off-by: Chris Elston <chris.elston@sancloud.com>

Adds the following bridge configuration as parameters for dynamic
bridges:
- remote_username
- try_private
- notification_topic
- remote_clientid
4 years ago
Chris Elston 8491fd4a0c Fix segfault in dynamic bridge support
Signed-off-by: Chris Elston <chris.elston@sancloud.com>

Sending an empty packet to $BRIDGE/new would cause a segfault because
the payload pointer was accessed without checking for NULL.

This change rejects an empty payload with MOSQ_ERR_INVAL.
4 years ago
Chris Elston cd3b588601 Fix dynamic bridge start-up delay
Signed-off-by: Chris Elston <chris.elston@sancloud.com>

The first packet sent by a dynamic bridge was failing because it was
sent before the socket was fully established and the OS returned
-EAGAIN. We had to wait for the next PINGREQ to cause the initial packet
to be sent, and therefore bridge startup was delayed by the
configured keepalive timeout (default 60 seconds).

This change adds the new bridge's output socket to the list managed by
the mux, and we now send the initial CONNECT as soon as the output
socket becomes available.
4 years ago
Roger Light 2b866904aa
Merge pull request #1926 from Tifaifai/Bridge-Dynamic_2.0
Bridge dynamic 2.0
5 years ago
Tifaifai Maupiti 1f6fa8d771 Resolve conflict
Signed-off-by: Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
5 years ago
Tifaifai Maupiti efa0ff6908 Fix security, remove }
Signed-off-by: Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
5 years ago
Tifaifai Maupiti 360d9be7d8 Remove allow_sys_update used and deprecated
Signed-off-by: Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
5 years ago
Tifaifai Maupiti 7f9b017552 Fix for travis - websockets
Signed-off-by: Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
5 years ago
Tifaifai Maupiti b14eafd5ec Bridge Dynamic Update 2.0.0
Signed-off-by: Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
5 years ago

1
.gitignore vendored

@ -25,6 +25,7 @@ build64/
client/mosquitto_pub
client/mosquitto_rr
client/mosquitto_sub
client/mosquitto_bridge
client/testing
client/testing.c

@ -6,7 +6,7 @@ are optional.
* tcp-wrappers (optional, package name libwrap0-dev)
* libwebsockets (optional, disabled by default, version 2.4 and above)
* cJSON (optional but recommended, for dynamic-security plugin support, and
JSON output from mosquitto_sub/mosquitto_rr)
JSON output from mosquitto_sub/mosquitto_rr/mosquitto_bridge)
* On Windows, a pthreads library is required if threading support is to be
included.

@ -42,6 +42,51 @@ And to publish a message:
mosquitto_pub -t 'test/topic' -m 'hello world'
## To dynamically create/delete/show a bridge, use:
Mosquitto broker implement SYS-Topics. All SYS-Topics start $SYS and are read-only for MQTT clients.
To dynamically create or delete a bridge, mosquitto use new topics: BRIDGE-Topics.
All BRIDGE-Topics start with $BRIDGE and are read-write with ACL Protection.
Create Bridge:
mosquitto_bridge -p 1883 -c testBridge -a 127.0.0.1 -R 1884 -n -t \# -q 0 -l local/ -r remote/ -D both
mosquitto_bridge -p 1883 -c testBridge -a 127.0.0.1 -R 1884 -n -t \# -q 0 -l test/1883/ -r test/1884/ -D both
with json format:
mosquitto_bridge -p 1883 -c testBridge -a 127.0.0.1 -R 1884 -n -j -t \# -q 0 -l test/1883/ -r test/1884/ -D both
or via publish message to create a bridge:
mosquitto_pub -h 127.0.0.1 -p 1883 -t '$BRIDGE/new' -m 'connection testBridge
address 127.0.0.1:1884
topic # both 0 test/1883/ test/1884/
'
with json format:
mosquitto_pub -h 127.0.0.1 -p 1883 -t '$BRIDGE/new' -m '{"bridges":[{"connection":"testBridge","addresses":[{"address":"127.0.0.1","port":1884}],"topic":"#","direction":"both","qos":0,"local_prefix":"test/1883/","remote_prefix":"test/1884/"}]}'
Delete Bridge:
mosquitto_bridge -p 1883 -c testBridge -d
with json format:
mosquitto_bridge -p 1883 -c testBridge -d -j
or via publish message to delete a bridge:
mosquitto_pub -h 127.0.0.1 -p 1883 -t '$BRIDGE/del' -m 'connection testBridge'
with json format:
mosquitto_pub -h 127.0.0.1 -p 1883 -t '$BRIDGE/del' -m '{"connection":"testBridge"}'
Show all Bridges:
mosquitto_bridge -p 1883 -k
with json format:
mosquitto_bridge -p 1883 -k -j
## Documentation
Documentation for the broker, clients and client library API can be found in

@ -22,6 +22,7 @@ link_directories(${CLIENT_DIR})
add_executable(mosquitto_pub pub_client.c pub_shared.c ${shared_src})
add_executable(mosquitto_sub sub_client.c sub_client_output.c ${shared_src})
add_executable(mosquitto_rr rr_client.c pub_shared.c sub_client_output.c ${shared_src})
add_executable(mosquitto_bridge bridge_client.c ${shared_src} pub_shared.c)
if (CJSON_FOUND)
target_link_libraries(mosquitto_pub ${CJSON_LIBRARIES})
@ -33,18 +34,22 @@ if (WITH_STATIC_LIBRARIES)
target_link_libraries(mosquitto_pub libmosquitto_static)
target_link_libraries(mosquitto_sub libmosquitto_static)
target_link_libraries(mosquitto_rr libmosquitto_static)
target_link_libraries(mosquitto_bridge libmosquitto_static)
else()
target_link_libraries(mosquitto_pub libmosquitto)
target_link_libraries(mosquitto_sub libmosquitto)
target_link_libraries(mosquitto_rr libmosquitto)
target_link_libraries(mosquitto_bridge libmosquitto)
endif()
if (QNX)
target_link_libraries(mosquitto_pub socket)
target_link_libraries(mosquitto_sub socket)
target_link_libraries(mosquitto_rr socket)
target_link_libraries(mosquitto_bridge socket)
endif()
install(TARGETS mosquitto_pub RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}")
install(TARGETS mosquitto_sub RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}")
install(TARGETS mosquitto_rr RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}")
install(TARGETS mosquitto_bridge RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}")

@ -1,16 +1,16 @@
include ../config.mk
.PHONY: all install uninstall reallyclean clean static static_pub static_sub static_rr
.PHONY: all install uninstall reallyclean clean static static_pub static_sub static_rr static_bridge
ifeq ($(WITH_SHARED_LIBRARIES),yes)
SHARED_DEP:=../lib/libmosquitto.so.${SOVERSION}
endif
ifeq ($(WITH_SHARED_LIBRARIES),yes)
ALL_DEPS:= mosquitto_pub mosquitto_sub mosquitto_rr
ALL_DEPS:= mosquitto_pub mosquitto_sub mosquitto_rr mosquitto_bridge
else
ifeq ($(WITH_STATIC_LIBRARIES),yes)
ALL_DEPS:= static_pub static_sub static_rr
ALL_DEPS:= static_pub static_sub static_rr static_bridge
endif
endif
@ -29,6 +29,9 @@ static_sub : sub_client.o sub_client_output.o client_props.o client_shared.o ../
static_rr : rr_client.o client_props.o client_shared.o pub_shared.o sub_client_output.o ../lib/libmosquitto.a
${CROSS_COMPILE}${CC} $^ -o mosquitto_rr ${CLIENT_LDFLAGS} ${STATIC_LIB_DEPS} ${CLIENT_STATIC_LDADD}
static_bridge : bridge_client.o pub_shared.o client_props.o client_shared.o ../lib/libmosquitto.a
${CROSS_COMPILE}${CC} $^ -o mosquitto_bridge ${CLIENT_LDFLAGS} ${STATIC_LIB_DEPS} ${CLIENT_STATIC_LDADD}
mosquitto_pub : pub_client.o pub_shared.o client_shared.o client_props.o
${CROSS_COMPILE}${CC} $(CLIENT_LDFLAGS) $^ -o $@ $(CLIENT_LDADD)
@ -38,6 +41,9 @@ mosquitto_sub : sub_client.o sub_client_output.o client_shared.o client_props.o
mosquitto_rr : rr_client.o client_shared.o client_props.o pub_shared.o sub_client_output.o
${CROSS_COMPILE}${CC} $(CLIENT_LDFLAGS) $^ -o $@ $(CLIENT_LDADD)
mosquitto_bridge : bridge_client.o pub_shared.o client_shared.o client_props.o
${CROSS_COMPILE}${CC} ${CLIENT_LDFLAGS} $^ -o $@ $(CLIENT_LDADD)
pub_client.o : pub_client.c ${SHARED_DEP}
${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@
@ -53,6 +59,9 @@ sub_client_output.o : sub_client_output.c sub_client_output.h ${SHARED_DEP}
rr_client.o : rr_client.c ${SHARED_DEP}
${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@
bridge_client.o : bridge_client.c ${SHARED_DEP}
${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@
client_shared.o : client_shared.c client_shared.h
${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@
@ -79,13 +88,15 @@ install : all
$(INSTALL) ${STRIP_OPTS} mosquitto_pub "${DESTDIR}${prefix}/bin/mosquitto_pub"
$(INSTALL) ${STRIP_OPTS} mosquitto_sub "${DESTDIR}${prefix}/bin/mosquitto_sub"
$(INSTALL) ${STRIP_OPTS} mosquitto_rr "${DESTDIR}${prefix}/bin/mosquitto_rr"
$(INSTALL) ${STRIP_OPTS} mosquitto_bridge "${DESTDIR}${prefix}/bin/mosquitto_bridge"
uninstall :
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_pub"
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_sub"
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_rr"
-rm -f "${DESTDIR}${prefix}/bin/mosquitto_bridge"
reallyclean : clean
clean :
-rm -f *.o mosquitto_pub mosquitto_sub mosquitto_rr *.gcda *.gcno
clean :
-rm -f *.o mosquitto_pub mosquitto_sub mosquitto_rr mosquitto_bridge *.gcda *.gcno

@ -0,0 +1,584 @@
/*
Copyright (c) 2015-2020 Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Tifaifai Maupiti - initial implementation and documentation.
*/
#include "config.h"
#ifdef WITH_CJSON
# include <cjson/cJSON.h>
#endif
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <signal.h>
#include <sys/time.h>
#include <time.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#endif
#include <mqtt_protocol.h>
#include <mosquitto.h>
#include "client_shared.h"
#include "pub_shared.h"
/* Global variables for use in callbacks. See sub_client.c for an example of
* using a struct to hold variables for use in callbacks. */
struct mosquitto *mosq = NULL;
bool process_messages = true;
static int last_mid_sent = -1;
static bool disconnect_sent = false;
static int publish_count = 0;
static volatile int status = STATUS_CONNECTING;
static char *topic = NULL;
static char *name = NULL;
static char *pattern = NULL;
static char *direction = NULL;
static char *local_prefix = NULL;
static char *remote_prefix = NULL;
static char *remote_add = NULL;
static int remote_port;
static int qos = 0;
static char *msg = NULL;
static char *msg_json = NULL;
void * ptrMosq = NULL;
int process_run = 1;
int nb_line = 0;
struct bridge{
char *name;
};
struct bridge_list{
int bridge_list_count;
struct bridge* bridge;
};
struct bridge_list *bridges = NULL;
int show_bridges(struct bridge_list* bridges)
{
int i;
char * bridge_name;
uint32_t bridge_name_len;
for(i=0; i<nb_line ;i++)
{
printf("\33[2K");
printf("\033[A");
}
nb_line = 0;
for(i=0;i<bridges->bridge_list_count;i++)
{
bridge_name_len = (uint32_t)strlen(bridges->bridge[i].name) - (uint32_t)strlen("/state") - (uint32_t)strlen("$SYS/broker/connection/");
bridge_name = malloc(bridge_name_len*sizeof(char));
memset(bridge_name, 0, bridge_name_len);
memcpy(bridge_name, bridges->bridge[i].name + (uint32_t)strlen("$SYS/broker/connection/")*sizeof(char) , bridge_name_len);
printf("%s\n",bridge_name);
fflush(stdout);
free(bridge_name);
nb_line++;
}
return MOSQ_ERR_SUCCESS;
}
int show_bridges_json(struct bridge_list* bridges){
int rc;
#ifndef WITH_CJSON
rc = show_bridges(bridges);
return rc;
#endif
int i;
char * bridge_name;
uint32_t bridge_name_len;
char *string_json = NULL;
cJSON *bridges_json = NULL;
cJSON *bridge_json = NULL;
cJSON *connection_json = NULL;
if(cfg.bridge_conf_json == CONF_JSON){
cJSON *s_bridges_json = cJSON_CreateObject();
if(s_bridges_json == NULL) {
cJSON_Delete(s_bridges_json);
return MOSQ_ERR_INVAL;
}
bridges_json = cJSON_CreateArray();
if(bridges_json == NULL) {
cJSON_Delete(s_bridges_json);
return MOSQ_ERR_INVAL;
}
cJSON_AddItemToObject(s_bridges_json, "bridges", bridges_json);
for(i=0;i<bridges->bridge_list_count;i++)
{
bridge_json = cJSON_CreateObject();
if(bridge_json == NULL){
cJSON_Delete(s_bridges_json);
return MOSQ_ERR_INVAL;
}
cJSON_AddItemToArray(bridges_json, bridge_json);
bridge_name_len = (uint32_t)strlen(bridges->bridge[i].name) - (uint32_t)strlen("/state") - (uint32_t)strlen("$SYS/broker/connection/");
bridge_name = malloc(bridge_name_len*sizeof(char));
memset(bridge_name, 0, bridge_name_len);
memcpy(bridge_name, bridges->bridge[i].name + (uint32_t)strlen("$SYS/broker/connection/")*sizeof(char) , bridge_name_len);
connection_json = cJSON_CreateString(bridge_name);
if(connection_json == NULL){
cJSON_Delete(s_bridges_json);
return MOSQ_ERR_INVAL;
}
cJSON_AddItemToObject(bridge_json, "connection", connection_json);
free(bridge_name);
}
string_json = cJSON_PrintUnformatted(s_bridges_json);
if(string_json == NULL){
printf("Error, Failed to print show_bridges.\n");
}
printf("%s\n",string_json);
cJSON_Delete(s_bridges_json);
return MOSQ_ERR_SUCCESS;
}else{
rc = show_bridges(bridges);
return rc;
}
return MOSQ_ERR_SUCCESS;
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties)
{
UNUSED(properties);
struct bridge_list *bridges;
bridges = (struct bridge_list*) obj;
int valid_erase = 0;
int i;
if(!strcmp(message->payload,"1")){
bridges->bridge_list_count++;
bridges->bridge = realloc(bridges->bridge, (size_t)bridges->bridge_list_count*sizeof(struct bridge));
if(!bridges->bridge){
printf("Error: Out of memory. 1\n");
exit(-1);
}
bridges->bridge[bridges->bridge_list_count-1].name = malloc(strlen(message->topic)*sizeof(char));
if(!bridges->bridge[bridges->bridge_list_count-1].name){
printf("Error: Out of memory. 2\n");
exit(-1);
}
bridges->bridge[bridges->bridge_list_count-1].name = strdup(message->topic);
show_bridges_json(bridges);
}else{
if(bridges->bridge_list_count>0){
for (i = 0; i < bridges->bridge_list_count; i++) {
if(!strcmp(bridges->bridge[i].name,message->topic)){
valid_erase = i;
bridges->bridge_list_count--;
}
}
if(valid_erase){
for (i = valid_erase; i < bridges->bridge_list_count; i++) {
bridges->bridge[i] = bridges->bridge[i+1];
}
bridges->bridge = realloc(bridges->bridge, (size_t)bridges->bridge_list_count*sizeof(struct bridge));
if(!bridges->bridge){
printf("Error: Out of memory. 3\n");
exit(-1);
}
show_bridges_json(bridges);
}
}else{
bridges->bridge_list_count = 0;
}
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
{
UNUSED(mosq);
UNUSED(obj);
UNUSED(rc);
UNUSED(properties);
if(rc == 0){
status = STATUS_DISCONNECTED;
}
}
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
{
return mosquitto_publish_v5(mosq, mid, topic, payloadlen, payload, qos, retain, cfg.publish_props);
}
void my_connect_callback_pub(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties)
{
UNUSED(obj);
UNUSED(flags);
UNUSED(properties);
int rc = MOSQ_ERR_SUCCESS;
if(!result){
rc = my_publish(mosq, &mid_sent, cfg.topic, cfg.msglen, cfg.message, cfg.qos, cfg.retain);
if(rc){
switch(rc){
case MOSQ_ERR_INVAL:
err_printf(&cfg, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
break;
case MOSQ_ERR_NOMEM:
err_printf(&cfg, "Error: Out of memory when trying to publish message.\n");
break;
case MOSQ_ERR_NO_CONN:
err_printf(&cfg, "Error: Client not connected when trying to publish.\n");
break;
case MOSQ_ERR_PROTOCOL:
err_printf(&cfg, "Error: Protocol error when communicating with broker.\n");
break;
case MOSQ_ERR_PAYLOAD_SIZE:
err_printf(&cfg, "Error: Message payload is too large.\n");
break;
case MOSQ_ERR_QOS_NOT_SUPPORTED:
err_printf(&cfg, "Error: Message QoS not supported on broker, try a lower QoS.\n");
break;
}
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
}
}else{
if(result){
if(cfg.protocol_version == MQTT_PROTOCOL_V5){
if(result == MQTT_RC_UNSUPPORTED_PROTOCOL_VERSION){
err_printf(&cfg, "Connection error: %s. Try connecting to an MQTT v5 broker, or use MQTT v3.x mode.\n", mosquitto_reason_string(result));
}else{
err_printf(&cfg, "Connection error: %s\n", mosquitto_reason_string(result));
}
}else{
err_printf(&cfg, "Connection error: %s\n", mosquitto_connack_string(result));
}
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
}
}
}
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
{
UNUSED(obj);
UNUSED(properties);
last_mid_sent = mid;
if(reason_code > 127){
err_printf(&cfg, "Warning: Publish %d failed: %s.\n", mid, mosquitto_reason_string(reason_code));
}
publish_count++;
if(disconnect_sent == false){
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
disconnect_sent = true;
}
}
void my_connect_callback_sub(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties)
{
UNUSED(obj);
UNUSED(flags);
UNUSED(properties);
if(!result){
mosquitto_subscribe(mosq, NULL, "$SYS/broker/connection/#", 0);
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
UNUSED(obj);
int i;
if(cfg.debug){
if(!cfg.quiet) printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
if(!cfg.quiet) printf(", %d", granted_qos[i]);
}
if(!cfg.quiet) printf("\n");
}
if(cfg.exit_after_sub){
mosquitto_disconnect_v5(mosq, 0, cfg.disconnect_props);
}
}
int bridge_shared_init(void)
{
bridges = malloc(sizeof(struct bridge_list));
if(!bridges){
printf("Error: Out of memory.\n");
return 1;
}
bridges->bridge_list_count = 0;
ptrMosq = bridges;
return 0;
}
int pub_loop(struct mosquitto *mosq)
{
int rc;
int loop_delay = 1000;
do{
rc = mosquitto_loop(mosq, loop_delay, 1);
}while(rc == MOSQ_ERR_SUCCESS);
if(status == STATUS_DISCONNECTED){
return MOSQ_ERR_SUCCESS;
}else{
return rc;
}
}
void bridge_shared_cleanup(void)
{
free(bridges);
}
void print_usage(void)
{
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
printf("mosquitto_bridger is a simple mqtt client that will publish a message on a single topic on one broker to manage bridge dynamiclly with another and exit.\n");
printf("mosquitto_bridge version %s running on libmosquitto %d.%d.%d.\n\n", VERSION, major, minor, revision);
printf("Usage: mosquitto_bridge [-h local host] [-p local port] [-q qos] -a remote host -P remote port -t bridge pattern -D bridge direction -l local prefix -r remote prefix\n");
printf("\n");
printf("mosquitto_bridge --help\n\n");
printf(" -j | --json : JSON message configuration\n");
printf(" -a | --address : address configuration\n");
printf(" -c | --connection : connection configuration\n");
printf(" -d | --del : delete bridge dynamic\n");
printf(" -D | --direction : direction configuration : in, out or both\n");
printf(" -h | --host : local Mosquitto host bridge dynamic compatible broker to make new/del bridge.\n");
printf(" -k | --know : know actif bridges on local broker\n");
printf(" -l | --local : local prefix configuration\n");
printf(" -n | --new : new bridge dynamic\n");
printf(" -p | --port : network port to connect to local. Defaults to 1883.\n");
printf(" -P | --pw : provide a password (requires MQTT 3.1 broker)\n");
printf(" -q | --qos : quality of service level to use for all messages. Defaults to 0.\n");
printf(" -r | --remote : remote prefix configuration\n");
printf(" -R | --remotePort : network port to connect to remote. no defaults\n");
printf(" -u | --username : provide a username (requires MQTT 3.1 broker)\n");
printf(" --help : display this message.\n");
}
#ifndef WIN32
void my_signal_handler(int signum)
{
if(signum == SIGALRM || signum == SIGTERM || signum == SIGINT){
process_run = 0;
}
}
#endif
int main(int argc, char *argv[])
{
int rc;
#ifndef WIN32
struct sigaction sigact;
#endif
int msg_len, msg_json_len;
mosquitto_lib_init();
rc = client_config_load_bridge(&cfg, CLIENT_PUB, argc, argv);
if(rc){
if(rc == 2){
/* --help */
print_usage();
}else{
fprintf(stderr, "\nUse 'mosquitto_bridge(2) --help' to see usage.\n");
}
goto cleanup;
}
if(cfg.know_bridge_connection){
if(bridge_shared_init()) goto cleanup;
}
if(cfg.bridgeType == BRIDGE_NEW){
topic = strdup("$BRIDGE/new");
name = cfg.bridge.name;
pattern = cfg.bridge.topics[0].topic;
qos = cfg.bridge.topics[0].qos;
if(cfg.bridge.topics[0].direction == bd_out){
direction = strdup("out");
}else if(cfg.bridge.topics[0].direction == bd_in){
direction = strdup("in");
}else if(cfg.bridge.topics[0].direction == bd_both){
direction = strdup("both");
}
local_prefix = cfg.bridge.topics[0].local_prefix;
remote_prefix = cfg.bridge.topics[0].remote_prefix;
remote_add = cfg.bridge.addresses[0].address;
remote_port = cfg.bridge.addresses[0].port;
if(cfg.bridge_conf_json == CONF_JSON){
msg_json_len = snprintf(NULL,0,"{\"bridges\":[{\"connection\":\"%s\",\"addresses\":[{\"address\":\"%s\",\"port\":%d}],\"topic\":\"%s\",\"direction\":\"%s\",\"qos\":%d,\"local_prefix\":\"%s\",\"remote_prefix\":\"%s\"}]}",name
,remote_add,remote_port,pattern,direction,qos,local_prefix,remote_prefix);
msg_json_len++;
msg_json = (char*) malloc((size_t)msg_json_len);
snprintf(msg_json,(size_t)msg_json_len,"{\"bridges\":[{\"connection\":\"%s\",\"addresses\":[{\"address\":\"%s\",\"port\":%d}],\"topic\":\"%s\",\"direction\":\"%s\",\"qos\":%d,\"local_prefix\":\"%s\",\"remote_prefix\":\"%s\"}]}",name
,remote_add,remote_port,pattern,direction,qos,local_prefix,remote_prefix);
cfg.message = strdup(msg_json);
cfg.msglen = msg_json_len;
}else{
msg_len = snprintf(NULL,0,"connection %s\naddress %s:%d\ntopic %s %s %d %s %s",name
,remote_add
,remote_port
,pattern
,direction
,qos
,local_prefix
,remote_prefix);
msg_len++;
msg = (char*) malloc((size_t)msg_len);
snprintf(msg,(size_t)msg_len,"connection %s\naddress %s:%d\ntopic %s %s %d %s %s",name
,remote_add
,remote_port
,pattern
,direction
,qos
,local_prefix
,remote_prefix);
cfg.message = strdup(msg);
cfg.msglen = msg_len;
}
cfg.topic = strdup(topic);
printf("Message New Bridge (%d):\n%s\n", cfg.msglen, cfg.message);
}else if(cfg.bridgeType == BRIDGE_DEL){
topic = strdup("$BRIDGE/del");
name = cfg.bridge.name;
if(cfg.bridge_conf_json == CONF_JSON){
msg_json_len = snprintf(NULL,0,"{\"connection\":\"%s\"}", name);
msg_json_len++;
msg_json = (char*) malloc((size_t)msg_json_len);
snprintf(msg_json,(size_t)msg_json_len,"{\"connection\":\"%s\"}", name);
cfg.message = strdup(msg_json);
cfg.msglen = msg_json_len;
}else{
msg_len = snprintf(NULL,0,"connection %s",name);
msg_len++;
msg = (char*) malloc((size_t)msg_len);
snprintf(msg,(size_t)msg_len,"connection %s",name);
cfg.message = strdup(msg);
cfg.msglen = msg_len;
}
cfg.topic = strdup(topic);
printf("Message Del Bridge (%d):\n%s\n", cfg.msglen, cfg.message);
}
if(client_id_generate(&cfg)){
goto cleanup;
}
mosq = mosquitto_new(cfg.id, cfg.clean_session, ptrMosq);
if(!mosq){
switch(errno){
case ENOMEM:
err_printf(&cfg, "Error: Out of memory.\n");
break;
case EINVAL:
err_printf(&cfg, "Error: Invalid id.\n");
break;
}
goto cleanup;
}
mosquitto_disconnect_v5_callback_set(mosq, my_disconnect_callback);
if(!cfg.know_bridge_connection){
mosquitto_connect_v5_callback_set(mosq, my_connect_callback_pub);
mosquitto_publish_v5_callback_set(mosq, my_publish_callback);
}
if(client_opts_set(mosq, &cfg)){
goto cleanup;
}
rc = client_connect(mosq, &cfg);
if(rc){
goto cleanup;
}
#ifndef WIN32
sigact.sa_handler = my_signal_handler;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
if(sigaction(SIGALRM, &sigact, NULL) == -1){
perror("sigaction");
goto cleanup;
}
if(sigaction(SIGTERM, &sigact, NULL) == -1){
perror("sigaction");
goto cleanup;
}
if(sigaction(SIGINT, &sigact, NULL) == -1){
perror("sigaction");
goto cleanup;
}
if(cfg.timeout){
alarm(cfg.timeout);
}
#endif
if(cfg.know_bridge_connection){
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_connect_v5_callback_set(mosq, my_connect_callback_sub);
mosquitto_message_v5_callback_set(mosq, my_message_callback);
mosquitto_loop_start(mosq);
while (process_run) {
pause();
}
mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq,false);
} else {
rc = pub_loop(mosq);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
//client_config_cleanup(&cfg);
if(cfg.know_bridge_connection){
bridge_shared_cleanup();
}
if(rc){
err_printf(&cfg, "Error: %s\n", mosquitto_strerror(rc));
}
return rc;
cleanup:
mosquitto_lib_cleanup();
//client_config_cleanup(&cfg);
if(cfg.know_bridge_connection){
bridge_shared_cleanup();
}
return 1;
}

@ -4,12 +4,12 @@ Copyright (c) 2014-2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR EDL-1.0
Contributors:
@ -991,7 +991,7 @@ int client_config_line_proc(struct mosq_config *cfg, int pub_or_sub, int argc, c
if(cfg->pub_mode != MSGMODE_NONE){
fprintf(stderr, "Error: Only one type of message can be sent at once.\n\n");
return 1;
}else{
}else{
cfg->pub_mode = MSGMODE_STDIN_FILE;
}
#ifdef WITH_SRV
@ -1621,3 +1621,183 @@ void err_printf(const struct mosq_config *cfg, const char *fmt, ...)
va_end(va);
}
/* Process a tokenised single line from a file or set of real argc/argv */
int client_config_line_proc_bridge(struct mosq_config *cfg, int argc, char *argv[])
{
int i;
for(i=1; i<argc; i++){
if(!strcmp(argv[i], "-p") || !strcmp(argv[i], "--port")){
if(i==argc-1){
fprintf(stderr, "Error: -p argument given but no port specified.\n\n");
return 1;
}else{
cfg->port = (uint16_t)atoi(argv[i+1]);
if(cfg->port<1 || cfg->port>UINT16_MAX){
fprintf(stderr, "Error: Invalid port given: %d\n", cfg->port);
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "-R") || !strcmp(argv[i], "--remotePort")){
if(i==argc-1){
fprintf(stderr, "Error: -R argument given but no port specified.\n\n");
return 1;
}else{
cfg->bridge.addresses[0].port = (uint16_t)atoi(argv[i+1]);
if(cfg->bridge.addresses[0].port<1 || cfg->bridge.addresses[0].port>UINT16_MAX){
fprintf(stderr, "Error: Invalid portRemote given: %d\n", cfg->bridge.addresses[0].port);
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "-h") || !strcmp(argv[i], "--host")){
if(i==argc-1){
fprintf(stderr, "Error: -h argument given but no host specified.\n\n");
return 1;
}else{
cfg->host = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-a") || !strcmp(argv[i], "--address")){
if(i==argc-1){
fprintf(stderr, "Error: -a argument given but no address for remote bridge specified.\n\n");
return 1;
}else{
cfg->bridge.addresses[0].address = malloc(strlen(argv[i+1])*sizeof(char));
cfg->bridge.addresses[0].address = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-c") || !strcmp(argv[i], "--connection")){
if(i==argc-1){
fprintf(stderr, "Error: -c argument given but no connection name for remote bridge specified.\n\n");
return 1;
}else{
cfg->bridge.name = malloc(strlen(argv[i+1])*sizeof(char));
cfg->bridge.name = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-j") || !strcmp(argv[i], "--json")){
cfg->bridge_conf_json = CONF_JSON;
}else if(!strcmp(argv[i], "-n") || !strcmp(argv[i], "--new")){
if(cfg->bridgeType == -1){
cfg->bridgeType = BRIDGE_NEW;
}
else{
fprintf(stderr, "Error: -n argument given but -d alredy specified.\n\n");
return 1;
}
}else if(!strcmp(argv[i], "-d") || !strcmp(argv[i], "--del")){
if(cfg->bridgeType == -1){
cfg->bridgeType = BRIDGE_DEL;
}
else{
fprintf(stderr, "Error: -d argument given but -n alredy specified.\n\n");
return 1;
}
}else if(!strcmp(argv[i], "--help")){
return 2;
}else if(!strcmp(argv[i], "-t") || !strcmp(argv[i], "--topic")){
if(i==argc-1){
fprintf(stderr, "Error: -t argument given but no topic specified.\n\n");
return 1;
}else{
cfg->bridge.topics[0].topic = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-D") || !strcmp(argv[i], "--direction")){
if(i==argc-1){
fprintf(stderr, "Error: -D argument given but no direction specified.\n\n");
return 1;
}else{
if(!strcasecmp(argv[i+1], "out")){
cfg->bridge.topics[0].direction = bd_out;
}else if(!strcasecmp(argv[i+1], "in")){
cfg->bridge.topics[0].direction = bd_in;
}else if(!strcasecmp(argv[i+1], "both")){
cfg->bridge.topics[0].direction = bd_both;
}else{
fprintf(stderr, "Error: Invalid bridge topic direction '%s'.\n\n",argv[i+1]);
return 1;
}
}
i++;
}else if(!strcmp(argv[i], "-q") || !strcmp(argv[i], "--qos")){
if(i==argc-1){
fprintf(stderr, "Error: -q argument given but no qos specified.\n\n");
return 1;
}else{
cfg->bridge.topics[0].qos = (uint8_t)atoi(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-k") || !strcmp(argv[i], "--know")){
cfg->know_bridge_connection = 1;
}else if(!strcmp(argv[i], "-l") || !strcmp(argv[i], "--local")){
if(i==argc-1){
fprintf(stderr, "Error: -l argument given but no local prefix specified.\n\n");
return 1;
}else{
cfg->bridge.topics[0].local_prefix = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-r") || !strcmp(argv[i], "--remote")){
if(i==argc-1){
fprintf(stderr, "Error: -r argument given but no remote prefix specified.\n\n");
return 1;
}else{
cfg->bridge.topics[0].remote_prefix = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-u") || !strcmp(argv[i], "--username")){
if(i==argc-1){
fprintf(stderr, "Error: -u argument given but no username specified.\n\n");
return 1;
}else{
cfg->username = strdup(argv[i+1]);
}
i++;
}else if(!strcmp(argv[i], "-P") || !strcmp(argv[i], "--pw")){
if(i==argc-1){
fprintf(stderr, "Error: -P argument given but no password specified.\n\n");
return 1;
}else{
cfg->password = strdup(argv[i+1]);
}
i++;
}else{
fprintf(stderr, "Error: Unknown option '%s'.\n",argv[i]);
return 1;
}
}
return MOSQ_ERR_SUCCESS;
}
int client_config_load_bridge(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
{
int rc;
init_config(cfg, pub_or_sub);
/* Deal with real argc/argv */
cfg->bridge_conf_json = 0;
cfg->bridgeType = -1;
cfg->know_bridge_connection = 0;
cfg->bridge.addresses = malloc(sizeof(struct bridge_address));
cfg->bridge.topics = malloc(sizeof(struct mosquitto__bridge));
rc = client_config_line_proc_bridge(cfg, argc, argv);
if(rc) return rc;
if(cfg->bridgeType == -1 && cfg->know_bridge_connection == 0){
fprintf(stderr, "Error: No bridge type action given.\n");
return 1;
}
/// TODO : Make sure all parameters are OK !
if(!cfg->host){
cfg->host = "localhost";
}
return MOSQ_ERR_SUCCESS;
}

@ -4,12 +4,12 @@ Copyright (c) 2014-2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR EDL-1.0
Contributors:
@ -20,6 +20,7 @@ Contributors:
#define CLIENT_CONFIG_H
#include <stdio.h>
#include <mosquitto_broker_internal.h>
#ifdef WIN32
# include <winsock2.h>
@ -39,6 +40,11 @@ Contributors:
#define CLIENT_SUB 2
#define CLIENT_RR 3
#define CLIENT_RESPONSE_TOPIC 4
#define BRIDGE_NEW 5
#define BRIDGE_DEL 6
#define CONF_NORMAL 0
#define CONF_JSON 1
#define PORT_UNDEFINED -1
#define PORT_UNIX 0
@ -117,6 +123,10 @@ struct mosq_config {
char *socks5_username;
char *socks5_password;
#endif
struct mosquitto__bridge bridge;
int bridge_conf_json;
int bridgeType;
int know_bridge_connection;
mosquitto_property *connect_props;
mosquitto_property *publish_props;
mosquitto_property *subscribe_props;
@ -129,6 +139,7 @@ struct mosq_config {
};
int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);
int client_config_load_bridge(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);
void client_config_cleanup(struct mosq_config *cfg);
int client_opts_set(struct mosquitto *mosq, struct mosq_config *cfg);
int client_id_generate(struct mosq_config *cfg);

@ -166,8 +166,8 @@ BROKER_CFLAGS:=${CFLAGS} -DVERSION="\"${VERSION}\"" -DWITH_BROKER
BROKER_LDFLAGS:=${LDFLAGS}
BROKER_LDADD:=
CLIENT_CPPFLAGS:=$(CPPFLAGS) -I.. -I../include
CLIENT_CFLAGS:=${CFLAGS} -DVERSION="\"${VERSION}\""
CLIENT_CPPFLAGS:=$(CPPFLAGS) -I.. -I../include -I../lib
CLIENT_CFLAGS:=${CFLAGS} -I../src -DVERSION="\"${VERSION}\""
CLIENT_LDFLAGS:=$(LDFLAGS) -L../lib
CLIENT_LDADD:=
@ -187,6 +187,7 @@ endif
ifeq ($(UNAME),Linux)
BROKER_LDADD:=$(BROKER_LDADD) -lrt
BROKER_LIBS:=$(BROKER_LIBS) -lanl
BROKER_LDFLAGS:=$(BROKER_LDFLAGS) -Wl,--dynamic-list=linker.syms
LIB_LIBADD:=$(LIB_LIBADD) -lrt
endif
@ -348,6 +349,7 @@ endif
ifeq ($(WITH_BUNDLED_DEPS),yes)
BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -I../deps
CLIENT_CFLAGS:=$(CLIENT_CFLAGS) -I../deps
LIB_CPPFLAGS:=$(LIB_CPPFLAGS) -I../deps
PLUGIN_CPPFLAGS:=$(PLUGIN_CPPFLAGS) -I../../deps
endif
@ -364,6 +366,8 @@ ifeq ($(WITH_COVERAGE),yes)
endif
ifeq ($(WITH_CJSON),yes)
BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_CJSON
BROKER_LDADD:=$(BROKER_LDADD) -lcjson
CLIENT_CFLAGS:=$(CLIENT_CFLAGS) -DWITH_CJSON
CLIENT_LDADD:=$(CLIENT_LDADD) -lcjson
CLIENT_STATIC_LDADD:=$(CLIENT_STATIC_LDADD) -lcjson

@ -114,6 +114,7 @@ enum mosq_err_t {
MOSQ_ERR_TOPIC_ALIAS_INVALID = 29,
MOSQ_ERR_ADMINISTRATIVE_ACTION = 30,
MOSQ_ERR_ALREADY_EXISTS = 31,
MOSQ_ERR_BRIDGE_DYNA = 32,
};
/* Option values */

@ -1,6 +1,6 @@
include ../config.mk
.PHONY : all clean install uninstall dist
.PHONY : all clean install uninstall dist
MANPAGES = \
libmosquitto.3 \
@ -13,6 +13,7 @@ MANPAGES = \
mosquitto_pub.1 \
mosquitto_rr.1 \
mosquitto_sub.1 \
mosquitto_bridge.1 \
mqtt.7
all : ${MANPAGES}
@ -37,6 +38,7 @@ install :
$(INSTALL) -m 644 mosquitto_pub.1 "${DESTDIR}${mandir}/man1/mosquitto_pub.1"
$(INSTALL) -m 644 mosquitto_sub.1 "${DESTDIR}${mandir}/man1/mosquitto_sub.1"
$(INSTALL) -m 644 mosquitto_rr.1 "${DESTDIR}${mandir}/man1/mosquitto_rr.1"
$(INSTALL) -m 644 mosquitto_bridge.1 "${DESTDIR}${mandir}/man1/mosquitto_bridge.1"
$(INSTALL) -d "${DESTDIR}$(mandir)/man7"
$(INSTALL) -m 644 mqtt.7 "${DESTDIR}${mandir}/man7/mqtt.7"
$(INSTALL) -m 644 mosquitto-tls.7 "${DESTDIR}${mandir}/man7/mosquitto-tls.7"
@ -52,6 +54,7 @@ uninstall :
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_pub.1"
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_sub.1"
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_rr.1"
-rm -f "${DESTDIR}${mandir}/man1/mosquitto_bridge.1"
-rm -f "${DESTDIR}${mandir}/man7/mqtt.7"
-rm -f "${DESTDIR}${mandir}/man7/mosquitto-tls.7"
-rm -f "${DESTDIR}${mandir}/man3/libmosquitto.3"
@ -80,6 +83,9 @@ mosquitto_sub.1 : mosquitto_sub.1.xml manpage.xsl
mosquitto_rr.1 : mosquitto_rr.1.xml manpage.xsl
$(XSLTPROC) $<
mosquitto_bridge.1 : mosquitto_bridge.1.xml manpage.xsl
$(XSLTPROC) $<
mqtt.7 : mqtt.7.xml manpage.xsl
$(XSLTPROC) $<
@ -105,6 +111,7 @@ potgen :
xml2po -o po/mosquitto_pub/mosquitto_pub.1.pot mosquitto_pub.1.xml
xml2po -o po/mosquitto_sub/mosquitto_sub.1.pot mosquitto_sub.1.xml
xml2po -o po/mosquitto_sub/mosquitto_rr.1.pot mosquitto_rr.1.xml
xml2po -o po/mosquitto_bridge/mosquitto_bridge.1.pot mosquitto_bridge.1.xml
xml2po -o po/mqtt/mqtt.7.pot mqtt.7.xml
xml2po -o po/mosquitto-tls/mosquitto-tls.7.pot mosquitto-tls.7.xml
xml2po -o po/libmosquitto/libmosquitto.3.pot libmosquitto.3.xml

@ -0,0 +1,215 @@
'\" t
.\" Title: mosquitto_bridge
.\" Author: [see the "Author" section]
.\" Generator: DocBook XSL Stylesheets v1.79.1 <http://docbook.sf.net/>
.\" Date: 12/03/2020
.\" Manual: Commands
.\" Source: Mosquitto Project
.\" Language: English
.\"
.TH "MOSQUITTO_BRIDGE" "1" "12/03/2020" "Mosquitto Project" "Commands"
.\" -----------------------------------------------------------------
.\" * Define some portability stuff
.\" -----------------------------------------------------------------
.\" ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.\" http://bugs.debian.org/507673
.\" http://lists.gnu.org/archive/html/groff/2009-02/msg00013.html
.\" ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.ie \n(.g .ds Aq \(aq
.el .ds Aq '
.\" -----------------------------------------------------------------
.\" * set default formatting
.\" -----------------------------------------------------------------
.\" disable hyphenation
.nh
.\" disable justification (adjust text to left margin only)
.ad l
.\" -----------------------------------------------------------------
.\" * MAIN CONTENT STARTS HERE *
.\" -----------------------------------------------------------------
.SH "NAME"
mosquitto_bridge \- an MQTT client for creating, deleting and knowing bridges dynamically\&.
.SH "SYNOPSIS"
.HP \w'\fBmosquitto_bridge\fR\ 'u
\fBmosquitto_bridge\fR [\-a | \-\-address] [\-c | \-\-connection] [\-d | \-\-del] [\-D | \-\-direction] [\-h | \-\-host] [\-k | \-\-know] [\-l | \-\-local] [\-n | \-\-new] [\-p | \-\-port] [\-P | \-\-pw] [\-q | \-\-qos] [\-r | \-\-remote] [\-R | \-\-remotePort] [\-u | \-\-username]
.HP \w'\fBmosquitto_bridge\fR\ 'u
\fBmosquitto_bridge\fR [\fB\-\-help\fR]
.SH "DESCRIPTION"
.PP
\fBmosquitto_bridge\fR
is a simple MQTT client that will create or delete a bridge dynamically for Mosquitto broker 1\&.4\&.90 or newer without shutdown or SIGHUP signal\&.
.PP
\fBmosquitto_bridge\fR
also allows to know active bridges on a broker
.SH "OPTIONS"
.PP
The options below may be given on the command line\&.
.PP
\fB\-a\fR, \fB\-\-address\fR
.RS 4
Define bridge address of bridge\&. Necessary only with
\fB\-n\fR\&.
.RE
.PP
\fB\-c\fR, \fB\-\-connection\fR
.RS 4
Define the connection name of the bridge\&.
.RE
.PP
\fB\-d\fR, \fB\-\-del\fR
.RS 4
Delete a bridge which name is define by
\fB\-c\fR\&. Can\*(Aqt be use with
\fB\-n\fR\&.
.RE
.PP
\fB\-D\fR, \fB\-\-directoin\fR
.RS 4
Define direction of the bridge [ in | out | both ]\&.
.RE
.PP
\fB\-h\fR, \fB\-\-host\fR
.RS 4
Define the network host to connect to the local broker where the bridge will be create/delete or to know active bridges\&. Defaults to localhost\&.
.RE
.PP
\fB\-k\fR, \fB\-\-know\fR
.RS 4
Know all bridges which are active in a broker\&. Can only be used with options other than
\fB\-h\fR
and
\fB\-p\fR\&.
.RE
.PP
\fB\-l\fR, \fB\-\-local\fR
.RS 4
Define the local prefix for the bridge configuration\&.
.RE
.PP
\fB\-n\fR, \fB\-\-new\fR
.RS 4
Define to create a bridge\&.
.RE
.PP
\fB\-p\fR, \fB\-\-port\fR
.RS 4
Define the network port to connect to the local broker where the bridge will be create/delete or to know active bridges\&. Defaults to 1883\&.
.RE
.PP
\fB\-P\fR, \fB\-\-pw\fR
.RS 4
Provide a password to be used for authenticating with the broker\&. Using this argument without also specifying a username is invalid\&. This requires a broker that supports MQTT v3\&.1\&. See also the
\fB\-\-username\fR
option\&.
.RE
.PP
\fB\-q\fR, \fB\-\-qos\fR
.RS 4
Specify the quality of service desired for the bridge between local broker and remote broker, from 0, 1 and 2\&. Defaults to 0\&. See
\fBmqtt\fR(7)
for more information on QoS\&.
.RE
.PP
\fB\-r\fR, \fB\-\-remote\fR
.RS 4
Define the remote prefix for the bridge configuration\&.
.RE
.PP
\fB\-R\fR, \fB\-\-remotePort\fR
.RS 4
Define the network port to connect to the remote broker\&. No default value\&.
.RE
.PP
\fB\-u\fR, \fB\-\-username\fR
.RS 4
Provide a username to be used for authenticating with the broker\&. This requires a broker that supports MQTT v3\&.1\&. See also the
\fB\-\-pw\fR
argument\&.
.RE
.SH "EXAMPLES"
.PP
Note that these really are examples\&.
.PP
Three possiblites are available :
.PP
\- Create a new bridge from a local broker to a remote broker whith
\-n
and all necessary parameters of a bridge (connection, address, topic)\&.
.PP
\- Delete a bridge present on a local bridge with
\-d
and bridge connection name\&.
.PP
\- Know all bridges which are active on a local broker with
\-k\&.
.PP
Creat a bridge on localhost and default port with another broker on localhost and 1884 port:
.sp
.RS 4
.ie n \{\
\h'-04'\(bu\h'+03'\c
.\}
.el \{\
.sp -1
.IP \(bu 2.3
.\}
mosquitto_bridge
\-c
testBridge
\-a
127\&.0\&.0\&.1
\-R
1884
\-n
\-t
\e#
\-q
0
\-l
local/
\-r
remote/
\-D
both
.RE
.PP
Delete a bridge on localhost and default port:
.sp
.RS 4
.ie n \{\
\h'-04'\(bu\h'+03'\c
.\}
.el \{\
.sp -1
.IP \(bu 2.3
.\}
mosquitto_bridge
\-c
testBridge
\-d
.RE
.PP
Know all active bridges on localhost and default port:
.sp
.RS 4
.ie n \{\
\h'-04'\(bu\h'+03'\c
.\}
.el \{\
.sp -1
.IP \(bu 2.3
.\}
mosquitto_bridge
\-k
.RE
.SH "BUGS"
.PP
\fBmosquitto\fR
bug information can be found at
\m[blue]\fB\%https://github.com/eclipse/mosquitto/issues\fR\m[]
.SH "SEE ALSO"
\fBmqtt\fR(7), \fBmosquitto_pub\fR(1), \fBmosquitto_sub\fR(1), \fBmosquitto\fR(8), \fBlibmosquitto\fR(3), \fBmosquitto-tls\fR(7)
.SH "AUTHOR"
.PP
Tifaifai Maupiti
<tifaifai\&.maupiti@gmail\&.com>

@ -0,0 +1,5 @@
.. title: mosquitto_bridge man page
.. slug: mosquitto_bridge-1
.. category: man
.. type: man
.. pretty_url: False

@ -0,0 +1,292 @@
<?xml version='1.0' encoding='UTF-8'?>
<?xml-stylesheet type="text/xsl" href="manpage.xsl"?>
<refentry xml:id="mosquitto_bridge" xmlns:xlink="http://www.w3.org/1999/xlink">
<refmeta>
<refentrytitle>mosquitto_bridge</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo class="source">Mosquitto Project</refmiscinfo>
<refmiscinfo class="manual">Commands</refmiscinfo>
</refmeta>
<refnamediv>
<refname>mosquitto_bridge</refname>
<refpurpose>an MQTT client for creating, deleting and knowing bridges dynamically.</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>mosquitto_bridge</command>
<group>
<arg choice='plain'>-a</arg>
<arg choice='plain'>--address</arg>
</group>
<group>
<arg choice='plain'>-c</arg>
<arg choice='plain'>--connection</arg>
</group>
<group>
<arg choice='plain'>-d</arg>
<arg choice='plain'>--del</arg>
</group>
<group>
<arg choice='plain'>-D</arg>
<arg choice='plain'>--direction</arg>
</group>
<group>
<arg choice='plain'>-h</arg>
<arg choice='plain'>--host</arg>
</group>
<group>
<arg choice='plain'>-k</arg>
<arg choice='plain'>--know</arg>
</group>
<group>
<arg choice='plain'>-l</arg>
<arg choice='plain'>--local</arg>
</group>
<group>
<arg choice='plain'>-n</arg>
<arg choice='plain'>--new</arg>
</group>
<group>
<arg choice='plain'>-p</arg>
<arg choice='plain'>--port</arg>
</group>
<group>
<arg choice='plain'>-P</arg>
<arg choice='plain'>--pw</arg>
</group>
<group>
<arg choice='plain'>-q</arg>
<arg choice='plain'>--qos</arg>
</group>
<group>
<arg choice='plain'>-r</arg>
<arg choice='plain'>--remote</arg>
</group>
<group>
<arg choice='plain'>-R</arg>
<arg choice='plain'>--remotePort</arg>
</group>
<group>
<arg choice='plain'>-u</arg>
<arg choice='plain'>--username</arg>
</group>
</cmdsynopsis>
<cmdsynopsis>
<command>mosquitto_bridge</command>
<group choice='plain'>
<arg><option>--help</option></arg>
</group>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para><command>mosquitto_bridge</command> is a simple MQTT
client that will create or delete a bridge dynamically for Mosquitto broker
1.4.90 or newer without shutdown or SIGHUP signal.</para>
<para><command>mosquitto_bridge</command> also allows to know active bridges on a broker</para>
</refsect1>
<refsect1>
<title>Options</title>
<para>The options below may be given on the command line.</para>
<variablelist>
<varlistentry>
<term><option>-a</option></term>
<term><option>--address</option></term>
<listitem>
<para>Define bridge address of bridge.
Necessary only with <option>-n</option>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-c</option></term>
<term><option>--connection</option></term>
<listitem>
<para>Define the connection name of the bridge.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-d</option></term>
<term><option>--del</option></term>
<listitem>
<para>Delete a bridge which name is define by
<option>-c</option>. Can't be use with <option>-n</option>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-D</option></term>
<term><option>--directoin</option></term>
<listitem>
<para>Define direction of the bridge [ in | out | both ].</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-h</option></term>
<term><option>--host</option></term>
<listitem>
<para>Define the network host to connect to the local
broker where the bridge will be create/delete or to
know active bridges. Defaults to localhost.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-k</option></term>
<term><option>--know</option></term>
<listitem>
<para>Know all bridges which are active in a broker.
Can only be used with options other than
<option>-h</option> and <option>-p</option>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-l</option></term>
<term><option>--local</option></term>
<listitem>
<para>Define the local prefix for the bridge configuration.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-n</option></term>
<term><option>--new</option></term>
<listitem>
<para>Define to create a bridge.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-p</option></term>
<term><option>--port</option></term>
<listitem>
<para>Define the network port to connect to the local
broker where the bridge will be create/delete or to
know active bridges. Defaults to 1883.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-P</option></term>
<term><option>--pw</option></term>
<listitem>
<para>Provide a password to be used for authenticating with
the broker. Using this argument without also specifying a
username is invalid. This requires a broker that supports
MQTT v3.1. See also the <option>--username</option> option.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-q</option></term>
<term><option>--qos</option></term>
<listitem>
<para>Specify the quality of service desired for the
bridge between local broker and remote broker, from 0, 1 and 2. Defaults to 0. See
<citerefentry><refentrytitle>mqtt</refentrytitle><manvolnum>7</manvolnum></citerefentry>
for more information on QoS.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-r</option></term>
<term><option>--remote</option></term>
<listitem>
<para>Define the remote prefix for the bridge configuration.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-R</option></term>
<term><option>--remotePort</option></term>
<listitem>
<para>Define the network port to connect to the remote broker. No default value.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-u</option></term>
<term><option>--username</option></term>
<listitem>
<para>Provide a username to be used for authenticating with
the broker. This requires a broker that supports MQTT v3.1.
See also the <option>--pw</option> argument.</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>Examples</title>
<para>Note that these really are examples.</para>
<para>Three possiblites are available :</para>
<para>- Create a new bridge from a local broker to a remote broker whith
<literal>-n</literal> and all necessary parameters of a bridge (connection, address, topic).</para>
<para>- Delete a bridge present on a local bridge with <literal>-d</literal> and bridge connection name.</para>
<para>- Know all bridges which are active on a local broker with <literal>-k</literal>.</para>
<para>Creat a bridge on localhost and default port with another broker on localhost and 1884 port:</para>
<itemizedlist mark="circle">
<listitem><para>mosquitto_bridge <literal>-c</literal> testBridge
<literal>-a</literal> 127.0.0.1 <literal>-R</literal> 1884 <literal>-n</literal>
<literal>-t</literal> \# <literal>-q</literal> 0 <literal>-l</literal> local/
<literal>-r</literal> remote/ <literal>-D</literal> both</para></listitem>
</itemizedlist>
<para>Delete a bridge on localhost and default port:</para>
<itemizedlist mark="circle">
<listitem><para>mosquitto_bridge <literal>-c</literal> testBridge
<literal>-d</literal></para></listitem>
</itemizedlist>
<para>Know all active bridges on localhost and default port:</para>
<itemizedlist mark="circle">
<listitem><para>mosquitto_bridge <literal>-k</literal></para></listitem>
</itemizedlist>
</refsect1>
<refsect1>
<title>Bugs</title>
<para><command>mosquitto</command> bug information can be found at
<ulink url="https://github.com/eclipse/mosquitto/issues"/></para>
</refsect1>
<refsect1>
<title>See Also</title>
<simplelist type="inline">
<member>
<citerefentry>
<refentrytitle><link xlink:href="mqtt-7.html">mqtt</link></refentrytitle>
<manvolnum>7</manvolnum>
</citerefentry>
</member>
<member>
<citerefentry>
<refentrytitle><link xlink:href="mosquitto_pub-1.html">mosquitto_pub</link></refentrytitle>
<manvolnum>1</manvolnum>
</citerefentry>
</member>
<member>
<citerefentry>
<refentrytitle><link xlink:href="mosquitto_sub-1.html">mosquitto_sub</link></refentrytitle>
<manvolnum>1</manvolnum>
</citerefentry>
</member>
<member>
<citerefentry>
<refentrytitle><link xlink:href="mosquitto-8.html">mosquitto</link></refentrytitle>
<manvolnum>8</manvolnum>
</citerefentry>
</member>
<member>
<citerefentry>
<refentrytitle><link xlink:href="libmosquitto-3.html">libmosquitto</link></refentrytitle>
<manvolnum>3</manvolnum>
</citerefentry>
</member>
<member>
<citerefentry>
<refentrytitle><link xlink:href="mosquitto-tls-7.html">mosquitto-tls</link></refentrytitle>
<manvolnum>7</manvolnum>
</citerefentry>
</member>
</simplelist>
</refsect1>
<refsect1>
<title>Author</title>
<para>Tifaifai Maupiti <email>tifaifai.maupiti@gmail.com</email></para>
</refsect1>
</refentry>

@ -80,7 +80,7 @@ endif (WITH_BUNDLED_DEPS)
option(INC_BRIDGE_SUPPORT
"Include bridge support for connecting to other brokers?" ON)
if (INC_BRIDGE_SUPPORT)
set (MOSQ_SRCS ${MOSQ_SRCS} bridge.c)
set (MOSQ_SRCS ${MOSQ_SRCS} bridge.c bridge_dynamic.c)
add_definitions("-DWITH_BRIDGE")
endif (INC_BRIDGE_SUPPORT)

@ -7,6 +7,7 @@ all : mosquitto
OBJS= mosquitto.o \
alias_mosq.o \
bridge.o \
bridge_dynamic.o \
bridge_topic.o \
conf.o \
conf_includedir.o \
@ -93,6 +94,9 @@ alias_mosq.o : ../lib/alias_mosq.c ../lib/alias_mosq.h
bridge.o : bridge.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
bridge_dynamic.o : bridge_dynamic.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
bridge_topic.o : bridge_topic.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
@ -330,7 +334,7 @@ uninstall :
-rm -f "${DESTDIR}${prefix}/include/mosquitto_broker.h"
-rm -f "${DESTDIR}${prefix}/include/mosquitto_plugin.h"
clean :
clean :
-rm -f *.o mosquitto *.gcda *.gcno
reallyclean : clean

@ -4,12 +4,12 @@ Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR EDL-1.0
Contributors:
@ -18,6 +18,11 @@ Contributors:
#include "config.h"
#ifdef WITH_EPOLL
#include <sys/epoll.h>
#define MAX_EVENTS 1000
#endif
#include <assert.h>
#include <errno.h>
#include <stdio.h>
@ -63,7 +68,7 @@ void bridge__start_all(void)
for(i=0; i<db.config->bridge_count; i++){
if(bridge__new(&(db.config->bridges[i]))){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect to bridge %s.",
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect to bridge %s.",
db.config->bridges[i].name);
}
}
@ -143,6 +148,35 @@ int bridge__new(struct mosquitto__bridge *bridge)
#endif
}
int bridge__del(struct mosquitto_db *db, int index)
{
struct mosquitto **bridges;
int i;
assert(db);
bridge__disconnect(db,db->bridges[index]);
mux__delete(db->bridges[index]);
db->bridge_count--;
for(i=index; i<db->bridge_count; i++){
db->bridges[i] = db->bridges[i+1];
}
if(db->bridge_count==0){
db->bridges[0] = NULL;
}else{
bridges = mosquitto__realloc(db->bridges, (size_t)(db->bridge_count)*sizeof(struct mosquitto *));
if(bridges){
db->bridges = bridges;
}else{
return MOSQ_ERR_NOMEM;
}
}
return 0;
}
#if defined(__GLIBC__) && defined(WITH_ADNS)
int bridge__connect_step1(struct mosquitto *context)
{
@ -584,6 +618,42 @@ int bridge__register_local_connections(void)
return MOSQ_ERR_SUCCESS;
}
int bridge__disconnect(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
char *notification_topic;
uint32_t notification_topic_len;
uint8_t notification_payload;
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
bridge__packet_cleanup(context);
db__messages_delete(context, false);
sub__clean_session(context);
notification_topic_len = (uint32_t)strlen(context->bridge->remote_clientid)+(uint32_t)strlen("$SYS/broker/connection//state");
notification_topic = mosquitto__malloc((size_t)(notification_topic_len+1)*sizeof(char));
if(!notification_topic) return MOSQ_ERR_NOMEM;
snprintf(notification_topic, (uint32_t)notification_topic_len+1, "$SYS/broker/connection/%s/state", context->bridge->remote_clientid);
notification_payload = '0';
db__messages_easy_queue(context, notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
log__printf(NULL, MOSQ_LOG_NOTICE, "Disconnecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[0].address, context->bridge->addresses[0].port);
rc = send__disconnect(context, MQTT_RC_NORMAL_DISCONNECTION, NULL);
if(rc<0){
log__printf(NULL, MOSQ_LOG_ERR, "Error disconnecting bridge: %s.", gai_strerror(errno));
return rc;
}
do_disconnect(context, MOSQ_ERR_SUCCESS);
return rc;
}
void bridge__cleanup(struct mosquitto *context)
{

@ -0,0 +1,904 @@
/*
Copyright (c) 2017-2020 Tifaifai Maupiti <tifaifai.maupiti@gmail.com>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
Contributor:
Tifaifai Maupiti - Initial implementation and documentation.
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <string.h>
#ifdef WITH_EPOLL
#include <sys/epoll.h>
#endif
#ifdef WITH_CJSON
# include <cjson/cJSON.h>
#endif
#ifndef WIN32
#include <unistd.h>
#include <strings.h>
#else
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#define strncasecmp _strnicmp
#endif
#include "mosquitto_broker_internal.h"
#include "mqtt_protocol.h"
#include "memory_mosq.h"
#include "read_handle.h"
#include "send_mosq.h"
#include "util_mosq.h"
static int config__check(struct mosquitto__config *config);
int bridge__dynamic_analyse(struct mosquitto_db *db, char *topic, void* payload, uint32_t payloadlen)
{
int rc;
int *index;
struct mosquitto__config config;
config__init(&config);
index = (int*) mosquitto__malloc(sizeof(int));
*index = -1;
if(strncmp("$BRIDGE/new",topic, 11)==0){
rc = bridge__dynamic_parse_payload_new_json(db, payload, &config);
if(rc != 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to parse PUBLISH for bridge dynamic.");
mosquitto__free(index);
return MOSQ_ERR_BRIDGE_DYNA;
}
rc = config__check(&config);
if(rc != 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to parse PUBLISH.");
mosquitto__free(index);
return MOSQ_ERR_BRIDGE_DYNA;
}
bridge__new(&(config.bridges[config.bridge_count-1]));
if(rc != 0){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect to bridge %s.",
config.bridges[config.bridge_count-1].name);
mosquitto__free(index);
return MOSQ_ERR_BRIDGE_DYNA;
}else{
log__printf(NULL, MOSQ_LOG_WARNING, "Information : Start connection with bridge %s.",
config.bridges[config.bridge_count-1].name);
mux__add_in(db->bridges[db->bridge_count-1]);
mux__add_out(db->bridges[db->bridge_count-1]);
}
}else if(strncmp("$BRIDGE/del", topic, 11)==0){
rc = bridge__dynamic_parse_payload_del_json(payload,db,index);
if(rc != 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Unable to parse PUBLISH for bridge dynamic.");
mosquitto__free(index);
return MOSQ_ERR_BRIDGE_DYNA;
}
if(*index == -1){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unknow bridge name.");
mosquitto__free(index);
return MOSQ_ERR_BRIDGE_DYNA;
}
if(bridge__del(db, *index)){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to remove bridge %s.",
config.bridges[*index].name);
mosquitto__free(index);
return MOSQ_ERR_BRIDGE_DYNA;
}
}
mosquitto__free(index);
return 0;
}
int bridge__dynamic_parse_payload_new_json(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
{
int i;
int len;
struct mosquitto__bridge *cur_bridge = NULL;
struct mosquitto__bridge_topic *cur_topic;
#ifndef WITH_CJSON
return bridge__dynamic_parse_payload_new(db, payload, config);
#endif
cJSON *message_json = cJSON_Parse(payload);
if(message_json == NULL){
const char *error_ptr = cJSON_GetErrorPtr();
if(error_ptr != NULL){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to parse JSON Message for bridge dynamic. Maybe normal message configuration. %s", error_ptr);
}
cJSON_Delete(message_json);
return bridge__dynamic_parse_payload_new(db, payload, config);
}
const cJSON *bridges_json = NULL;
const cJSON *addresses_json = NULL;
int bridges_count_json = 0;
int addresses_count_json = 0;
bridges_json = cJSON_GetObjectItemCaseSensitive(message_json, "bridges");
if(!cJSON_IsArray(bridges_json)) {
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
bridges_count_json = cJSON_GetArraySize(bridges_json);
log__printf(NULL, MOSQ_LOG_DEBUG, "Information : %d bridge(s) dynamic.", bridges_count_json);
if(bridges_count_json == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: None Bridge in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
const cJSON *bridge_json = NULL;
// Actually, just one bridge defined in configuration bridges. Work in progress...
int index = 0;
bridge_json = cJSON_GetArrayItem(bridges_json, index);
const cJSON *connection_json = NULL;
const cJSON *topic_json = NULL;
const cJSON *direction_json = NULL;
const cJSON *qos_json= NULL;
const cJSON *local_prefix_json = NULL;
const cJSON *remote_prefix_json = NULL;
const cJSON *remote_username = NULL;
const cJSON *try_private = NULL;
const cJSON *notification_topic = NULL;
const cJSON *remote_clientid = NULL;
connection_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "connection");
if(cJSON_IsString(connection_json) && (connection_json->valuestring != NULL)) {
/* Check for existing bridge name. */
for(i=0; i<db->bridge_count; i++){
if(!strcmp(db->bridges[i]->bridge->name, connection_json->valuestring)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", connection_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
}
config->bridge_count++;
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
if(!config->bridges){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_bridge = &(config->bridges[config->bridge_count-1]);
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
cur_bridge->name = mosquitto__strdup(connection_json->valuestring);
if(!cur_bridge->name){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_bridge->keepalive = 60;
cur_bridge->notifications = true;
cur_bridge->notifications_local_only = false;
cur_bridge->start_type = bst_automatic;
cur_bridge->idle_timeout = 60;
cur_bridge->restart_timeout = 0;
cur_bridge->backoff_base = 5;
cur_bridge->backoff_cap = 30;
cur_bridge->threshold = 10;
cur_bridge->try_private = true;
cur_bridge->attempt_unsubscribe = true;
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
}
addresses_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "addresses");
if(!cJSON_IsArray(addresses_json)) {
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration (addresses).");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
addresses_count_json = cJSON_GetArraySize(addresses_json);
if(addresses_count_json == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: None address in bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
const cJSON *addressItem_json = NULL;
cJSON_ArrayForEach(addressItem_json, addresses_json) {
cJSON *address_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "address");
cJSON *port_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "port");
if(cJSON_IsString(address_json) && (address_json->valuestring != NULL)) {
if(!cur_bridge || cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_bridge->address_count++;
cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, (size_t)cur_bridge->address_count*sizeof(struct bridge_address));
if(!cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_bridge->addresses[cur_bridge->address_count-1].address = mosquitto__strdup(address_json->valuestring);
}
if(cJSON_IsNumber(port_json)){
if(port_json->valueint < 1 || port_json->valueint > UINT16_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", port_json->valueint);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_bridge->addresses[cur_bridge->address_count-1].port = (uint16_t)port_json->valueint;
}
}
topic_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "topic");
if(cJSON_IsString(topic_json) && (topic_json->valuestring != NULL)) {
cur_bridge->topic_count++;
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
if(!cur_bridge->topics){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
if(!strcmp(topic_json->valuestring, "\"\"")){
cur_topic->topic = NULL;
}else{
cur_topic->topic = mosquitto__strdup(topic_json->valuestring);
if(!cur_topic->topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
cur_topic->direction = bd_out;
cur_topic->qos = 0;
cur_topic->local_prefix = NULL;
cur_topic->remote_prefix = NULL;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
direction_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "direction");
if(cJSON_IsString(direction_json) && (direction_json->valuestring != NULL)) {
if(!strcasecmp(direction_json->valuestring, "out")){
cur_topic->direction = bd_out;
}else if(!strcasecmp(direction_json->valuestring, "in")){
cur_topic->direction = bd_in;
}else if(!strcasecmp(direction_json->valuestring, "both")){
cur_topic->direction = bd_both;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", direction_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
}
qos_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "qos");
if(cJSON_IsNumber(qos_json)){
if(qos_json->valueint < 0 || qos_json->valueint > 2){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%d'.", qos_json->valueint);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_topic->qos = (uint8_t)qos_json->valueint;
}
local_prefix_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "local_prefix");
if(cJSON_IsString(local_prefix_json) && (local_prefix_json->valuestring != NULL)) {
cur_bridge->topic_remapping = true;
if(!strcmp(local_prefix_json->valuestring, "\"\"")){
cur_topic->local_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(local_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", local_prefix_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_topic->local_prefix = mosquitto__strdup(local_prefix_json->valuestring);
if(!cur_topic->local_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
remote_prefix_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_prefix");
if(cJSON_IsString(remote_prefix_json) && (remote_prefix_json->valuestring != NULL)) {
if(!strcmp(remote_prefix_json->valuestring, "\"\"")){
cur_topic->remote_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(remote_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", remote_prefix_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_topic->remote_prefix = mosquitto__strdup(remote_prefix_json->valuestring);
if(!cur_topic->remote_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
remote_username = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_username");
if(cJSON_IsString(remote_username) && (remote_username->valuestring != NULL)) {
if(!strcmp(remote_username->valuestring, "\"\"")){
cur_bridge->remote_username = NULL;
}else{
cur_bridge->remote_username = mosquitto__strdup(remote_username->valuestring);
if(!cur_bridge->remote_username){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
try_private = cJSON_GetObjectItemCaseSensitive(bridge_json, "try_private");
if(cJSON_IsBool(try_private)){
cur_bridge->try_private = cJSON_IsTrue(try_private) ? true : false;
}
notification_topic = cJSON_GetObjectItemCaseSensitive(bridge_json, "notification_topic");
if(cJSON_IsString(notification_topic) && (notification_topic->valuestring != NULL)) {
if(!strcmp(notification_topic->valuestring, "\"\"")){
cur_bridge->notification_topic = NULL;
}else{
cur_bridge->notification_topic = mosquitto__strdup(notification_topic->valuestring);
if(!cur_bridge->notification_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
remote_clientid = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_clientid");
if(cJSON_IsString(remote_clientid) && (remote_clientid->valuestring != NULL)) {
if(!strcmp(remote_clientid->valuestring, "\"\"")){
cur_bridge->remote_clientid = NULL;
}else{
cur_bridge->remote_clientid = mosquitto__strdup(remote_clientid->valuestring);
if(!cur_bridge->remote_clientid){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
//Last verification
if(cur_bridge->address_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(config->bridge_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(cur_topic->topic == NULL &&
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(cur_topic->local_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
cur_topic->local_topic[len] = '\0';
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
if(cur_topic->remote_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
cur_topic->remote_topic[len] = '\0';
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
cJSON_Delete(message_json);
return MOSQ_ERR_SUCCESS;
}
int bridge__dynamic_parse_payload_new(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
{
char *buf = NULL;
char *token;
int tmp_int;
char *saveptr = NULL;
struct mosquitto__bridge *cur_bridge = NULL;
struct mosquitto__bridge_topic *cur_topic;
char *address;
int i;
int len;
int nb_param = 0;
if(!payload) return MOSQ_ERR_INVAL;
buf = strtok(payload, "\n");
while(buf) {
if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
buf[strlen(buf)-1] = 0;
}
token = strtok_r(buf, " ", &saveptr);
if(token)
{
if(!strcmp(token, "address") || !strcmp(token, "addresses"))
{
nb_param ++;
if(!cur_bridge || cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
while((token = strtok_r(NULL, " ", &saveptr))){
cur_bridge->address_count++;
cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, sizeof(struct bridge_address)*(size_t)cur_bridge->address_count);
if(!cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge->addresses[cur_bridge->address_count-1].address = token;
}
for(i=0; i<cur_bridge->address_count; i++){
address = strtok_r(cur_bridge->addresses[i].address, ":", &saveptr);
if(address){
token = strtok_r(NULL, ":", &saveptr);
if(token){
tmp_int = atoi(token);
if(tmp_int < 1 || tmp_int > UINT16_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", tmp_int);
return MOSQ_ERR_INVAL;
}
cur_bridge->addresses[i].port = (uint16_t)tmp_int;
}else{
cur_bridge->addresses[i].port = 1883;
}
cur_bridge->addresses[i].address = mosquitto__strdup(address);
//_conf_attempt_resolve(address, "bridge address", MOSQ_LOG_WARNING, "Warning");
}
}
if(cur_bridge->address_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
return MOSQ_ERR_INVAL;
}
}
else if(!strcmp(token, "connection"))
{
nb_param ++;
//if(reload) continue; // FIXME
token = strtok_r(NULL, " ", &saveptr);
if(token){
/* Check for existing bridge name. */
for(i=0; i<db->bridge_count; i++){
if(!strcmp(db->bridges[i]->bridge->name, token)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", token);
return MOSQ_ERR_INVAL;
}
}
config->bridge_count++;
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
if(!config->bridges){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge = &(config->bridges[config->bridge_count-1]);
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
cur_bridge->name = mosquitto__strdup(token);
if(!cur_bridge->name){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge->keepalive = 60;
cur_bridge->notifications = true;
cur_bridge->notifications_local_only = false;
cur_bridge->start_type = bst_automatic;
cur_bridge->idle_timeout = 60;
cur_bridge->restart_timeout = 0;
cur_bridge->backoff_base = 5;
cur_bridge->backoff_cap = 30;
cur_bridge->threshold = 10;
cur_bridge->try_private = true;
cur_bridge->attempt_unsubscribe = true;
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
}
}
else if(!strcmp(token, "try_private"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcmp(token,"false")){
cur_bridge->try_private = false;
}else if(strcmp(token,"true")){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
}
}
else if(!strcmp(token, "notification_topic"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->notification_topic = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "remote_username"))
{
log__printf(NULL, MOSQ_LOG_INFO, "Found remote_username");
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->remote_username = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "remote_clientid"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->remote_clientid = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "topic"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->topic_count++;
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
if(!cur_bridge->topics){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
if(!strcmp(token, "\"\"")){
cur_topic->topic = NULL;
}else{
cur_topic->topic = mosquitto__strdup(token);
if(!cur_topic->topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
cur_topic->direction = bd_out;
cur_topic->qos = 0;
cur_topic->local_prefix = NULL;
cur_topic->remote_prefix = NULL;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcasecmp(token, "out")){
cur_topic->direction = bd_out;
}else if(!strcasecmp(token, "in")){
cur_topic->direction = bd_in;
}else if(!strcasecmp(token, "both")){
cur_topic->direction = bd_both;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", token);
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_topic->qos = (uint8_t)atoi(token);
if(cur_topic->qos < 0 || cur_topic->qos > 2){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%s'.", token);
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->topic_remapping = true;
if(!strcmp(token, "\"\"")){
cur_topic->local_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", token);
return MOSQ_ERR_INVAL;
}
cur_topic->local_prefix = mosquitto__strdup(token);
if(!cur_topic->local_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcmp(token, "\"\"")){
cur_topic->remote_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", token);
return MOSQ_ERR_INVAL;
}
cur_topic->remote_prefix = mosquitto__strdup(token);
if(!cur_topic->remote_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}
}
}
}
if(cur_topic->topic == NULL &&
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
return MOSQ_ERR_INVAL;
}
if(cur_topic->local_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
cur_topic->local_topic[len] = '\0';
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
if(cur_topic->remote_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
cur_topic->remote_topic[len] = '\0';
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}
}
}
buf = strtok(NULL, "\n");
}
if(nb_param>=3){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_INVAL;
}
}
int bridge__dynamic_parse_payload_del_json(void* payload, struct mosquitto_db *db, int *index)
{
int i;
#ifndef WITH_CJSON
return bridge__dynamic_parse_payload_del(payload,db,index);
#endif
cJSON *message_json = cJSON_Parse(payload);
if(message_json == NULL){
const char *error_ptr = cJSON_GetErrorPtr();
if(error_ptr != NULL){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to parse JSON Message for bridge dynamic. Maybe normal message configuration. %s", error_ptr);
}
cJSON_Delete(message_json);
return bridge__dynamic_parse_payload_del(payload,db,index);
}
const cJSON *connection_json = NULL;
connection_json = cJSON_GetObjectItemCaseSensitive(message_json, "connection");
if(cJSON_IsString(connection_json) && (connection_json->valuestring != NULL)) {
/* Check for existing bridge name. */
for(i=0; i<db->bridge_count; i++){
if(!strcmp(db->bridges[i]->bridge->name, connection_json->valuestring)){
*index = i;
}
}
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}
int bridge__dynamic_parse_payload_del(void* payload, struct mosquitto_db *db, int *index)
{
char *buf;
char *token;
char *saveptr = NULL;
int i;
buf = strdup(payload);
if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
buf[strlen(buf)-1] = 0;
}
token = strtok_r(buf, " ", &saveptr);
if(token)
{
if(!strcmp(token, "connection"))
{
//if(reload) continue; // FIXME
token = strtok_r(NULL, " ", &saveptr);
if(token){
/* Check for existing bridge name. */
for(i=0; i<db->bridge_count; i++){
if(!strcmp(db->bridges[i]->bridge->name, token)){
*index = i;
}
}
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
}
}
}
}
return MOSQ_ERR_SUCCESS;
}
static int config__check(struct mosquitto__config *config)
{
/* Checks that are easy to make after the config has been loaded. */
#ifdef WITH_BRIDGE
int i, j;
struct mosquitto__bridge *bridge1, *bridge2;
char hostname[256];
int len;
/* Check for bridge duplicate local_clientid, need to generate missing IDs
* first. */
for(i=0; i<config->bridge_count; i++){
bridge1 = &config->bridges[i];
if(!bridge1->remote_clientid){
if(!gethostname(hostname, 256)){
len = (int)strlen(hostname) + (int)strlen(bridge1->name) + 2;
bridge1->remote_clientid = mosquitto__malloc((size_t)len);
if(!bridge1->remote_clientid){
return MOSQ_ERR_NOMEM;
}
snprintf(bridge1->remote_clientid, (size_t)len, "%s.%s", hostname, bridge1->name);
}else{
return 1;
}
}
if(!bridge1->local_clientid){
len = (int)strlen(bridge1->remote_clientid) + (int)strlen("local.") + 2;
bridge1->local_clientid = mosquitto__malloc((size_t)len);
if(!bridge1->local_clientid){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
snprintf(bridge1->local_clientid, (size_t)len, "local.%s", bridge1->remote_clientid);
}
}
for(i=0; i<config->bridge_count; i++){
bridge1 = &config->bridges[i];
for(j=i+1; j<config->bridge_count; j++){
bridge2 = &config->bridges[j];
if(!strcmp(bridge1->local_clientid, bridge2->local_clientid)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Bridge local_clientid "
"'%s' is not unique. Try changing or setting the "
"local_clientid value for one of the bridges.",
bridge1->local_clientid);
return MOSQ_ERR_INVAL;
}
}
}
#endif
return MOSQ_ERR_SUCCESS;
}

@ -254,6 +254,15 @@ int handle__publish(struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
/* Check if demand concern bridge dynamic */
if(!strncmp(msg->topic, "$BRIDGE/", 8)){
rc = bridge__dynamic_analyse(&db, msg->topic, msg->payload, msg->payloadlen);
if(rc == MOSQ_ERR_BRIDGE_DYNA ){
log__printf(NULL, MOSQ_LOG_DEBUG, "PUBLISH Invalid bridge dynamic configuration on $BRIDGE.");
rc = 0; /* To not disturbe normal publish management */
}
}
if(!strncmp(msg->topic, "$CONTROL/", 9)){
#ifdef WITH_CONTROL
rc = control__process(context, msg);
@ -364,4 +373,3 @@ process_bad_message:
}
return rc;
}

@ -4,12 +4,12 @@ Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR EDL-1.0
Contributors:
@ -715,13 +715,20 @@ void log__internal(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
#ifdef WITH_BRIDGE
void bridge__start_all(void);
int bridge__new(struct mosquitto__bridge *bridge);
int bridge__del(struct mosquitto_db *db, int index);
void bridge__cleanup(struct mosquitto *context);
int bridge__connect(struct mosquitto *context);
int bridge__disconnect(struct mosquitto_db *db, struct mosquitto *context);
int bridge__connect_step1(struct mosquitto *context);
int bridge__connect_step2(struct mosquitto *context);
int bridge__connect_step3(struct mosquitto *context);
int bridge__on_connect(struct mosquitto *context);
void bridge__packet_cleanup(struct mosquitto *context);
int bridge__dynamic_analyse(struct mosquitto_db *db, char *topic, void* payload, uint32_t payloadlen);
int bridge__dynamic_parse_payload_new(struct mosquitto_db *db, void* payload, struct mosquitto__config *config);
int bridge__dynamic_parse_payload_new_json(struct mosquitto_db *db, void* payload, struct mosquitto__config *config);
int bridge__dynamic_parse_payload_del(void* payload, struct mosquitto_db *db, int *index);
int bridge__dynamic_parse_payload_del_json(void* payload, struct mosquitto_db *db, int *index);
void bridge_check(void);
int bridge__register_local_connections(void);
int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix);
@ -852,4 +859,3 @@ void xtreport(void);
#endif
#endif

@ -4,12 +4,12 @@ Copyright (c) 2011-2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR EDL-1.0
Contributors:
@ -257,7 +257,7 @@ int security__load_v4(struct mosquitto__auth_plugin *plugin, struct mosquitto_op
plugin->auth_start_v4 = (FUNC_auth_plugin_auth_start_v4)LIB_SYM(lib, "mosquitto_auth_start");
plugin->auth_continue_v4 = (FUNC_auth_plugin_auth_continue_v4)LIB_SYM(lib, "mosquitto_auth_continue");
if(plugin->auth_start_v4){
if(plugin->auth_continue_v4){
log__printf(NULL, MOSQ_LOG_INFO,
@ -669,8 +669,19 @@ static int acl__check_dollar(const char *topic, int access)
}else{
return MOSQ_ERR_ACL_DENIED;
}
}else if(!strncmp(topic, "$BRIDGE", 7)){
/* Check if demand concern bridge dynamic */
if((strncmp("$BRIDGE/new",topic,11))==0){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge New");
return MOSQ_ERR_SUCCESS;
}
if((strncmp("$BRIDGE/del",topic,11))==0) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge Del");
return MOSQ_ERR_SUCCESS;
}
}else{
/* This is an unknown $ topic, for the moment just defer to actual tests. */
log__printf(NULL, MOSQ_LOG_DEBUG, "Topic with $ forbidden");
return MOSQ_ERR_SUCCESS;
}
}
@ -695,7 +706,7 @@ int mosquitto_acl_check(struct mosquitto *context, const char *topic, uint32_t p
rc = acl__check_dollar(topic, access);
if(rc) return rc;
/*
/*
* If no plugins exist we should accept at this point so set rc to success.
*/
rc = MOSQ_ERR_SUCCESS;
@ -782,7 +793,7 @@ int mosquitto_unpwd_check(struct mosquitto *context)
}
for(i=0; i<opts->auth_plugin_config_count; i++){
if(opts->auth_plugin_configs[i].plugin.version == 4
if(opts->auth_plugin_configs[i].plugin.version == 4
&& opts->auth_plugin_configs[i].plugin.unpwd_check_v4){
rc = opts->auth_plugin_configs[i].plugin.unpwd_check_v4(
@ -824,7 +835,7 @@ int mosquitto_unpwd_check(struct mosquitto *context)
if(context->username == NULL &&
((db.config->per_listener_settings && context->listener->security_options.allow_anonymous != false)
|| (!db.config->per_listener_settings && db.config->security_options.allow_anonymous != false))){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_AUTH;

Loading…
Cancel
Save